From e82946e5c95712e1e358168106e81aff903da02f Mon Sep 17 00:00:00 2001 From: David Roazen Date: Tue, 25 Sep 2012 11:37:17 -0400 Subject: [PATCH] ExperimentalReadShardBalancer: create one monolithic FilePointer per contig Merge all FilePointers for each contig into a single, merged, optimized FilePointer representing all regions to visit in all BAM files for a given contig. This helps us in several ways: -It allows us to create a single, persistent set of iterators for each contig, finally and definitively eliminating all Shard/FilePointer boundary issues for the new experimental ReadWalker downsampling -We no longer need to track low-level file positions in the sharding system (which was no longer possible anyway given the new experimental downsampling system) -We no longer revisit BAM file chunks that we've visited in the past -- all BAM file access is purely sequential -We no longer need to constantly recreate our full chain of read iterators There are also potential dangers: -We hold more BAM index data in memory at once. Given that we merge and optimize the index data during the merge, and only hold one contig's worth of data at a time, this does not appear to be a major issue. TODO: confirm this! -With a huge number of samples and intervals, the FilePointer merge operation might become expensive. With the latest implementation, this does not appear to be an issue even with a huge number of intervals (for one sample, at least), but if it turns out to be a problem for > 1 sample there are things we can do. Still TODO: unit tests for the new FilePointer.union() method --- .../reads/ExperimentalReadShardBalancer.java | 160 ++++++++++-------- .../gatk/datasources/reads/FilePointer.java | 94 +++++++++- 2 files changed, 179 insertions(+), 75 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java index 4d1d2a533..6c064cf86 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java @@ -25,15 +25,40 @@ package org.broadinstitute.sting.gatk.datasources.reads; import net.sf.picard.util.PeekableIterator; -import net.sf.samtools.SAMFileSpan; import net.sf.samtools.SAMRecord; import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import java.util.*; /** - * Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards + * Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards. + * + * When processing FilePointers, our strategy is to aggregate all FilePointers for each contig + * together into one monolithic FilePointer, create one persistent set of read iterators over + * that monolithic FilePointer, and repeatedly use that persistent set of read iterators to + * fill read shards with reads. + * + * This strategy has several important advantages: + * + * 1. We avoid issues with file span overlap. FilePointers that are more granular than a whole + * contig will have regions that overlap with other FilePointers on the same contig, due + * to the limited granularity of BAM index data. By creating only one FilePointer per contig, + * we avoid having to track how much of each file region we've visited (as we did in the + * former implementation), we avoid expensive non-sequential access patterns in the files, + * and we avoid having to repeatedly re-create our iterator chain for every small region + * of interest. + * + * 2. We avoid boundary issues with the engine-level downsampling. Since we create a single + * persistent set of read iterators (which include the downsampling iterator(s)) per contig, + * the downsampling process is never interrupted by FilePointer or Shard boundaries, and never + * loses crucial state information while downsampling within a contig. + * + * TODO: There is also at least one important disadvantage: + * + * 1. We load more BAM index data into memory at once, and this work is done upfront before processing + * the next contig, creating a delay before traversal of each contig. This delay may be + * compensated for by the gains listed in #1 above, and we may be no worse off overall in + * terms of total runtime, but we need to verify this empirically. * * @author David Roazen */ @@ -55,17 +80,16 @@ public class ExperimentalReadShardBalancer extends ShardBalancer { /** * The file pointer currently being processed. */ - private FilePointer currentFilePointer = null; + private FilePointer currentContigFilePointer = null; /** - * Iterator over the reads from the current file pointer. The same iterator will be + * Iterator over the reads from the current contig's file pointer. The same iterator will be * used to fill all shards associated with a given file pointer */ - private PeekableIterator currentFilePointerReadsIterator = null; + private PeekableIterator currentContigReadsIterator = null; { - if ( filePointers.hasNext() ) - currentFilePointer = filePointers.next(); + createNextContigFilePointer(); advance(); } @@ -85,93 +109,87 @@ public class ExperimentalReadShardBalancer extends ShardBalancer { nextShard = null; // May need multiple iterations to fill the next shard if all reads in current file spans get filtered/downsampled away - while ( nextShard == null && currentFilePointer != null ) { + while ( nextShard == null && currentContigFilePointer != null ) { // If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one): - if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) { + if ( currentContigReadsIterator != null && ! currentContigReadsIterator.hasNext() ) { // Close the old, exhausted chain of iterators to release resources - currentFilePointerReadsIterator.close(); + currentContigReadsIterator.close(); - do { - advanceFilePointer(); - } while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers + // Advance to the FilePointer for the next contig + createNextContigFilePointer(); // We'll need to create a fresh iterator for this file pointer when we create the first // shard for it below. - currentFilePointerReadsIterator = null; + currentContigReadsIterator = null; } - // At this point if currentFilePointer is non-null we know it is also non-empty. Our - // currentFilePointerReadsIterator may be null or non-null depending on whether or not + // At this point our currentContigReadsIterator may be null or non-null depending on whether or not // this is our first shard for this file pointer. - if ( currentFilePointer != null ) { - Shard shard = new ReadShard(parser,readsDataSource,currentFilePointer.fileSpans,currentFilePointer.locations,currentFilePointer.isRegionUnmapped); + if ( currentContigFilePointer != null ) { + Shard shard = new ReadShard(parser,readsDataSource, currentContigFilePointer.fileSpans, currentContigFilePointer.locations, currentContigFilePointer.isRegionUnmapped); - // Create a new reads iterator only when we've just advanced to a new file pointer. It's - // essential that the iterators persist across all shards that share the same file pointer + // Create a new reads iterator only when we've just advanced to the file pointer for the next + // contig. It's essential that the iterators persist across all shards that share the same contig // to allow the downsampling to work properly. - if ( currentFilePointerReadsIterator == null ) { - currentFilePointerReadsIterator = new PeekableIterator(readsDataSource.getIterator(shard)); + if ( currentContigReadsIterator == null ) { + currentContigReadsIterator = new PeekableIterator(readsDataSource.getIterator(shard)); } - if ( currentFilePointerReadsIterator.hasNext() ) { - shard.fill(currentFilePointerReadsIterator); + if ( currentContigReadsIterator.hasNext() ) { + shard.fill(currentContigReadsIterator); nextShard = shard; } } } } - private void advanceFilePointer() { - FilePointer previousFilePointer = currentFilePointer; - currentFilePointer = filePointers.hasNext() ? filePointers.next() : null; - - // TODO: This is a purely defensive measure to guard against the possibility of overlap - // TODO: between FilePointers. When overlap is detected, remove the overlapping regions from - // TODO: the newly-current FilePointer. - // TODO: If we later discover that overlap is theoretically impossible, this step becomes - // TODO: unnecessary and should be removed. - if ( currentFilePointer != null && previousFilePointer != null && - previousFilePointer.hasFileSpansOverlappingWith(currentFilePointer) ) { - - logger.debug(String.format("%s: found consecutive overlapping FilePointers [%s] and [%s]", getClass().getSimpleName(), previousFilePointer, currentFilePointer)); - - Map previousFileSpans = previousFilePointer.getFileSpans(); - Map trimmedFileSpans = new HashMap(currentFilePointer.getFileSpans().size()); - - for ( Map.Entry fileSpanEntry : currentFilePointer.getFileSpans().entrySet() ) { - // find the corresponding file span from the previous FilePointer - SAMFileSpan previousFileSpan = previousFileSpans.get(fileSpanEntry.getKey()); - - if ( previousFileSpan == null ) { - // no match, so no trimming required - trimmedFileSpans.put(fileSpanEntry.getKey(), fileSpanEntry.getValue()); - } - else { - // match, so remove any overlapping regions (regions before the start of the - // region immediately following the previous file span) - SAMFileSpan trimmedSpan = fileSpanEntry.getValue().removeContentsBefore(previousFileSpan.getContentsFollowing()); - trimmedFileSpans.put(fileSpanEntry.getKey(), trimmedSpan); - } - } - - // Replace the current file pointer with its trimmed equivalent - currentFilePointer = new FilePointer(trimmedFileSpans, currentFilePointer.locations); - } - } - /** - * Detects whether the list of file spans contain any read data. - * @param selectedSpans Mapping of readers to file spans. - * @return True if file spans are completely empty; false otherwise. + * Aggregate all FilePointers for the next contig together into one monolithic FilePointer + * to avoid boundary issues with visiting the same file regions more than once (since more + * granular FilePointers will have regions that overlap with other nearby FilePointers due + * to the nature of BAM indices). + * + * By creating one persistent set of iterators per contig we also avoid boundary artifacts + * in the engine-level downsampling. + * + * TODO: This FilePointer aggregation should ideally be done at the BAMSchedule level for + * TODO: read traversals, as there's little point in the BAMSchedule emitting extremely + * TODO: granular FilePointers if we're just going to union them. The BAMSchedule should + * TODO: emit one FilePointer per contig for read traversals (but, crucially, NOT for + * TODO: locus traversals). */ - private boolean isEmpty(Map selectedSpans) { - for(SAMFileSpan fileSpan: selectedSpans.values()) { - if(!fileSpan.isEmpty()) - return false; + private void createNextContigFilePointer() { + currentContigFilePointer = null; + List nextContigFilePointers = new ArrayList(); + + logger.info("Loading BAM index data for next contig"); + + while ( filePointers.hasNext() ) { + // If this is the first FP we've seen, or we're dealing with mapped regions and the next FP is on the + // same contig as previous FPs, or all our FPs are unmapped, add the next FP to the list of FPs to merge + if ( nextContigFilePointers.isEmpty() || + (! nextContigFilePointers.get(0).isRegionUnmapped && ! filePointers.peek().isRegionUnmapped && + nextContigFilePointers.get(0).getContigIndex() == filePointers.peek().getContigIndex()) || + (nextContigFilePointers.get(0).isRegionUnmapped && filePointers.peek().isRegionUnmapped) ) { + + nextContigFilePointers.add(filePointers.next()); + } + else { + break; // next FilePointer is on a different contig or has different mapped/unmapped status, + // save it for next time + } + } + + if ( ! nextContigFilePointers.isEmpty() ) { + currentContigFilePointer = FilePointer.union(nextContigFilePointers, parser); + } + + if ( currentContigFilePointer != null ) { + logger.info("Done loading BAM index data for next contig"); + logger.debug(String.format("Next contig FilePointer: %s", currentContigFilePointer)); } - return true; } public void remove() { diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java index b0fbc05bf..50f4e0273 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java @@ -26,7 +26,9 @@ package org.broadinstitute.sting.gatk.datasources.reads; import net.sf.picard.util.PeekableIterator; import net.sf.samtools.GATKBAMFileSpan; +import net.sf.samtools.GATKChunk; import net.sf.samtools.SAMFileSpan; +import net.sf.samtools.SAMRecord; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.Utils; @@ -48,15 +50,19 @@ public class FilePointer { */ protected final boolean isRegionUnmapped; - public FilePointer(final GenomeLoc... locations) { - this.locations.addAll(Arrays.asList(locations)); + public FilePointer( List locations ) { + this.locations.addAll(locations); this.isRegionUnmapped = checkUnmappedStatus(); + validateLocations(); + } + + public FilePointer( final GenomeLoc... locations ) { + this(Arrays.asList(locations)); } public FilePointer( Map fileSpans, List locations ) { + this(locations); this.fileSpans.putAll(fileSpans); - this.locations.addAll(locations); - this.isRegionUnmapped = checkUnmappedStatus(); } private boolean checkUnmappedStatus() { @@ -74,6 +80,22 @@ public class FilePointer { return foundUnmapped; } + private void validateLocations() { + if ( isRegionUnmapped ) { + return; + } + + Integer previousContigIndex = null; + + for ( GenomeLoc location : locations ) { + if ( previousContigIndex != null && previousContigIndex != location.getContigIndex() ) { + throw new ReviewedStingException("File pointers must contain intervals from at most one contig"); + } + + previousContigIndex = location.getContigIndex(); + } + } + /** * Returns an immutable view of this FilePointer's file spans * @@ -91,6 +113,16 @@ public class FilePointer { return Collections.unmodifiableList(locations); } + /** + * Returns the index of the contig into which this FilePointer points (a FilePointer can represent + * regions in at most one contig). + * + * @return the index of the contig into which this FilePointer points + */ + public int getContigIndex() { + return locations.size() > 0 ? locations.get(0).getContigIndex() : SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX; + } + @Override public boolean equals(final Object other) { if(!(other instanceof FilePointer)) @@ -121,11 +153,13 @@ public class FilePointer { public void addLocation(final GenomeLoc location) { this.locations.add(location); checkUnmappedStatus(); + validateLocations(); } public void addLocations( final List locations ) { this.locations.addAll(locations); checkUnmappedStatus(); + validateLocations(); } public void addFileSpans(final SAMReaderID id, final SAMFileSpan fileSpan) { @@ -243,6 +277,58 @@ public class FilePointer { combined.addFileSpans(initialElement.getKey(),fileSpan); } + /** + * Efficiently generate the union of the n FilePointers passed in. Much more efficient than + * combining two FilePointers at a time using the combine() method above. + * + * IMPORTANT: the FilePointers to be unioned must either all represent regions on the + * same contig, or all be unmapped, since we cannot create FilePointers with a mix of + * contigs or with mixed mapped/unmapped regions. + * + * @param filePointers the FilePointers to union + * @param parser our GenomeLocParser + * @return the union of the FilePointers passed in + */ + public static FilePointer union( List filePointers, GenomeLocParser parser ) { + if ( filePointers == null || filePointers.isEmpty() ) { + return new FilePointer(); + } + + Map> fileChunks = new HashMap>(); + List locations = new ArrayList(); + + // First extract all intervals and file chunks from the FilePointers into unsorted, unmerged collections + for ( FilePointer filePointer : filePointers ) { + locations.addAll(filePointer.getLocations()); + + for ( Map.Entry fileSpanEntry : filePointer.getFileSpans().entrySet() ) { + GATKBAMFileSpan fileSpan = (GATKBAMFileSpan)fileSpanEntry.getValue(); + + if ( fileChunks.containsKey(fileSpanEntry.getKey()) ) { + fileChunks.get(fileSpanEntry.getKey()).addAll(fileSpan.getGATKChunks()); + } + else { + fileChunks.put(fileSpanEntry.getKey(), fileSpan.getGATKChunks()); + } + } + } + + // Now sort and merge the intervals + List sortedMergedLocations = new ArrayList(); + sortedMergedLocations.addAll(IntervalUtils.sortAndMergeIntervals(parser, locations, IntervalMergingRule.ALL)); + + // For each BAM file, convert from an unsorted, unmerged list of chunks to a GATKBAMFileSpan containing + // the sorted, merged union of the chunks for that file + Map mergedFileSpans = new HashMap(fileChunks.size()); + for ( Map.Entry> fileChunksEntry : fileChunks.entrySet() ) { + List unmergedChunks = fileChunksEntry.getValue(); + mergedFileSpans.put(fileChunksEntry.getKey(), + (new GATKBAMFileSpan(unmergedChunks.toArray(new GATKChunk[unmergedChunks.size()]))).union(new GATKBAMFileSpan())); + } + + return new FilePointer(mergedFileSpans, sortedMergedLocations); + } + /** * Returns true if any of the file spans in this FilePointer overlap their counterparts in * the other FilePointer. "Overlap" is defined as having an overlapping extent (the region