ExperimentalReadShardBalancer: create one monolithic FilePointer per contig

Merge all FilePointers for each contig into a single, merged, optimized FilePointer
representing all regions to visit in all BAM files for a given contig.

This helps us in several ways:

-It allows us to create a single, persistent set of iterators for each contig,
 finally and definitively eliminating all Shard/FilePointer boundary issues for
 the new experimental ReadWalker downsampling

-We no longer need to track low-level file positions in the sharding system (which
 was no longer possible anyway given the new experimental downsampling system)

-We no longer revisit BAM file chunks that we've visited in the past -- all BAM
 file access is purely sequential

-We no longer need to constantly recreate our full chain of read iterators

There are also potential dangers:

-We hold more BAM index data in memory at once. Given that we merge and optimize
 the index data during the merge, and only hold one contig's worth of data at a
 time, this does not appear to be a major issue. TODO: confirm this!

-With a huge number of samples and intervals, the FilePointer merge operation
 might become expensive. With the latest implementation, this does not
 appear to be an issue even with a huge number of intervals (for one sample, at least),
 but if it turns out to be a problem for > 1 sample there are things we can do.

Still TODO: unit tests for the new FilePointer.union() method
This commit is contained in:
David Roazen 2012-09-25 11:37:17 -04:00
parent abbe757907
commit e82946e5c9
2 changed files with 179 additions and 75 deletions

View File

@ -25,15 +25,40 @@
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import java.util.*;
/**
* Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards
* Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards.
*
* When processing FilePointers, our strategy is to aggregate all FilePointers for each contig
* together into one monolithic FilePointer, create one persistent set of read iterators over
* that monolithic FilePointer, and repeatedly use that persistent set of read iterators to
* fill read shards with reads.
*
* This strategy has several important advantages:
*
* 1. We avoid issues with file span overlap. FilePointers that are more granular than a whole
* contig will have regions that overlap with other FilePointers on the same contig, due
* to the limited granularity of BAM index data. By creating only one FilePointer per contig,
* we avoid having to track how much of each file region we've visited (as we did in the
* former implementation), we avoid expensive non-sequential access patterns in the files,
* and we avoid having to repeatedly re-create our iterator chain for every small region
* of interest.
*
* 2. We avoid boundary issues with the engine-level downsampling. Since we create a single
* persistent set of read iterators (which include the downsampling iterator(s)) per contig,
* the downsampling process is never interrupted by FilePointer or Shard boundaries, and never
* loses crucial state information while downsampling within a contig.
*
* TODO: There is also at least one important disadvantage:
*
* 1. We load more BAM index data into memory at once, and this work is done upfront before processing
* the next contig, creating a delay before traversal of each contig. This delay may be
* compensated for by the gains listed in #1 above, and we may be no worse off overall in
* terms of total runtime, but we need to verify this empirically.
*
* @author David Roazen
*/
@ -55,17 +80,16 @@ public class ExperimentalReadShardBalancer extends ShardBalancer {
/**
* The file pointer currently being processed.
*/
private FilePointer currentFilePointer = null;
private FilePointer currentContigFilePointer = null;
/**
* Iterator over the reads from the current file pointer. The same iterator will be
* Iterator over the reads from the current contig's file pointer. The same iterator will be
* used to fill all shards associated with a given file pointer
*/
private PeekableIterator<SAMRecord> currentFilePointerReadsIterator = null;
private PeekableIterator<SAMRecord> currentContigReadsIterator = null;
{
if ( filePointers.hasNext() )
currentFilePointer = filePointers.next();
createNextContigFilePointer();
advance();
}
@ -85,93 +109,87 @@ public class ExperimentalReadShardBalancer extends ShardBalancer {
nextShard = null;
// May need multiple iterations to fill the next shard if all reads in current file spans get filtered/downsampled away
while ( nextShard == null && currentFilePointer != null ) {
while ( nextShard == null && currentContigFilePointer != null ) {
// If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one):
if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) {
if ( currentContigReadsIterator != null && ! currentContigReadsIterator.hasNext() ) {
// Close the old, exhausted chain of iterators to release resources
currentFilePointerReadsIterator.close();
currentContigReadsIterator.close();
do {
advanceFilePointer();
} while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers
// Advance to the FilePointer for the next contig
createNextContigFilePointer();
// We'll need to create a fresh iterator for this file pointer when we create the first
// shard for it below.
currentFilePointerReadsIterator = null;
currentContigReadsIterator = null;
}
// At this point if currentFilePointer is non-null we know it is also non-empty. Our
// currentFilePointerReadsIterator may be null or non-null depending on whether or not
// At this point our currentContigReadsIterator may be null or non-null depending on whether or not
// this is our first shard for this file pointer.
if ( currentFilePointer != null ) {
Shard shard = new ReadShard(parser,readsDataSource,currentFilePointer.fileSpans,currentFilePointer.locations,currentFilePointer.isRegionUnmapped);
if ( currentContigFilePointer != null ) {
Shard shard = new ReadShard(parser,readsDataSource, currentContigFilePointer.fileSpans, currentContigFilePointer.locations, currentContigFilePointer.isRegionUnmapped);
// Create a new reads iterator only when we've just advanced to a new file pointer. It's
// essential that the iterators persist across all shards that share the same file pointer
// Create a new reads iterator only when we've just advanced to the file pointer for the next
// contig. It's essential that the iterators persist across all shards that share the same contig
// to allow the downsampling to work properly.
if ( currentFilePointerReadsIterator == null ) {
currentFilePointerReadsIterator = new PeekableIterator<SAMRecord>(readsDataSource.getIterator(shard));
if ( currentContigReadsIterator == null ) {
currentContigReadsIterator = new PeekableIterator<SAMRecord>(readsDataSource.getIterator(shard));
}
if ( currentFilePointerReadsIterator.hasNext() ) {
shard.fill(currentFilePointerReadsIterator);
if ( currentContigReadsIterator.hasNext() ) {
shard.fill(currentContigReadsIterator);
nextShard = shard;
}
}
}
}
private void advanceFilePointer() {
FilePointer previousFilePointer = currentFilePointer;
currentFilePointer = filePointers.hasNext() ? filePointers.next() : null;
// TODO: This is a purely defensive measure to guard against the possibility of overlap
// TODO: between FilePointers. When overlap is detected, remove the overlapping regions from
// TODO: the newly-current FilePointer.
// TODO: If we later discover that overlap is theoretically impossible, this step becomes
// TODO: unnecessary and should be removed.
if ( currentFilePointer != null && previousFilePointer != null &&
previousFilePointer.hasFileSpansOverlappingWith(currentFilePointer) ) {
logger.debug(String.format("%s: found consecutive overlapping FilePointers [%s] and [%s]", getClass().getSimpleName(), previousFilePointer, currentFilePointer));
Map<SAMReaderID, SAMFileSpan> previousFileSpans = previousFilePointer.getFileSpans();
Map<SAMReaderID, SAMFileSpan> trimmedFileSpans = new HashMap<SAMReaderID, SAMFileSpan>(currentFilePointer.getFileSpans().size());
for ( Map.Entry<SAMReaderID, SAMFileSpan> fileSpanEntry : currentFilePointer.getFileSpans().entrySet() ) {
// find the corresponding file span from the previous FilePointer
SAMFileSpan previousFileSpan = previousFileSpans.get(fileSpanEntry.getKey());
if ( previousFileSpan == null ) {
// no match, so no trimming required
trimmedFileSpans.put(fileSpanEntry.getKey(), fileSpanEntry.getValue());
}
else {
// match, so remove any overlapping regions (regions before the start of the
// region immediately following the previous file span)
SAMFileSpan trimmedSpan = fileSpanEntry.getValue().removeContentsBefore(previousFileSpan.getContentsFollowing());
trimmedFileSpans.put(fileSpanEntry.getKey(), trimmedSpan);
}
}
// Replace the current file pointer with its trimmed equivalent
currentFilePointer = new FilePointer(trimmedFileSpans, currentFilePointer.locations);
}
}
/**
* Detects whether the list of file spans contain any read data.
* @param selectedSpans Mapping of readers to file spans.
* @return True if file spans are completely empty; false otherwise.
* Aggregate all FilePointers for the next contig together into one monolithic FilePointer
* to avoid boundary issues with visiting the same file regions more than once (since more
* granular FilePointers will have regions that overlap with other nearby FilePointers due
* to the nature of BAM indices).
*
* By creating one persistent set of iterators per contig we also avoid boundary artifacts
* in the engine-level downsampling.
*
* TODO: This FilePointer aggregation should ideally be done at the BAMSchedule level for
* TODO: read traversals, as there's little point in the BAMSchedule emitting extremely
* TODO: granular FilePointers if we're just going to union them. The BAMSchedule should
* TODO: emit one FilePointer per contig for read traversals (but, crucially, NOT for
* TODO: locus traversals).
*/
private boolean isEmpty(Map<SAMReaderID,SAMFileSpan> selectedSpans) {
for(SAMFileSpan fileSpan: selectedSpans.values()) {
if(!fileSpan.isEmpty())
return false;
private void createNextContigFilePointer() {
currentContigFilePointer = null;
List<FilePointer> nextContigFilePointers = new ArrayList<FilePointer>();
logger.info("Loading BAM index data for next contig");
while ( filePointers.hasNext() ) {
// If this is the first FP we've seen, or we're dealing with mapped regions and the next FP is on the
// same contig as previous FPs, or all our FPs are unmapped, add the next FP to the list of FPs to merge
if ( nextContigFilePointers.isEmpty() ||
(! nextContigFilePointers.get(0).isRegionUnmapped && ! filePointers.peek().isRegionUnmapped &&
nextContigFilePointers.get(0).getContigIndex() == filePointers.peek().getContigIndex()) ||
(nextContigFilePointers.get(0).isRegionUnmapped && filePointers.peek().isRegionUnmapped) ) {
nextContigFilePointers.add(filePointers.next());
}
else {
break; // next FilePointer is on a different contig or has different mapped/unmapped status,
// save it for next time
}
}
if ( ! nextContigFilePointers.isEmpty() ) {
currentContigFilePointer = FilePointer.union(nextContigFilePointers, parser);
}
if ( currentContigFilePointer != null ) {
logger.info("Done loading BAM index data for next contig");
logger.debug(String.format("Next contig FilePointer: %s", currentContigFilePointer));
}
return true;
}
public void remove() {

View File

@ -26,7 +26,9 @@ package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.GATKBAMFileSpan;
import net.sf.samtools.GATKChunk;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.Utils;
@ -48,15 +50,19 @@ public class FilePointer {
*/
protected final boolean isRegionUnmapped;
public FilePointer(final GenomeLoc... locations) {
this.locations.addAll(Arrays.asList(locations));
public FilePointer( List<GenomeLoc> locations ) {
this.locations.addAll(locations);
this.isRegionUnmapped = checkUnmappedStatus();
validateLocations();
}
public FilePointer( final GenomeLoc... locations ) {
this(Arrays.asList(locations));
}
public FilePointer( Map<SAMReaderID,SAMFileSpan> fileSpans, List<GenomeLoc> locations ) {
this(locations);
this.fileSpans.putAll(fileSpans);
this.locations.addAll(locations);
this.isRegionUnmapped = checkUnmappedStatus();
}
private boolean checkUnmappedStatus() {
@ -74,6 +80,22 @@ public class FilePointer {
return foundUnmapped;
}
private void validateLocations() {
if ( isRegionUnmapped ) {
return;
}
Integer previousContigIndex = null;
for ( GenomeLoc location : locations ) {
if ( previousContigIndex != null && previousContigIndex != location.getContigIndex() ) {
throw new ReviewedStingException("File pointers must contain intervals from at most one contig");
}
previousContigIndex = location.getContigIndex();
}
}
/**
* Returns an immutable view of this FilePointer's file spans
*
@ -91,6 +113,16 @@ public class FilePointer {
return Collections.unmodifiableList(locations);
}
/**
* Returns the index of the contig into which this FilePointer points (a FilePointer can represent
* regions in at most one contig).
*
* @return the index of the contig into which this FilePointer points
*/
public int getContigIndex() {
return locations.size() > 0 ? locations.get(0).getContigIndex() : SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX;
}
@Override
public boolean equals(final Object other) {
if(!(other instanceof FilePointer))
@ -121,11 +153,13 @@ public class FilePointer {
public void addLocation(final GenomeLoc location) {
this.locations.add(location);
checkUnmappedStatus();
validateLocations();
}
public void addLocations( final List<GenomeLoc> locations ) {
this.locations.addAll(locations);
checkUnmappedStatus();
validateLocations();
}
public void addFileSpans(final SAMReaderID id, final SAMFileSpan fileSpan) {
@ -243,6 +277,58 @@ public class FilePointer {
combined.addFileSpans(initialElement.getKey(),fileSpan);
}
/**
* Efficiently generate the union of the n FilePointers passed in. Much more efficient than
* combining two FilePointers at a time using the combine() method above.
*
* IMPORTANT: the FilePointers to be unioned must either all represent regions on the
* same contig, or all be unmapped, since we cannot create FilePointers with a mix of
* contigs or with mixed mapped/unmapped regions.
*
* @param filePointers the FilePointers to union
* @param parser our GenomeLocParser
* @return the union of the FilePointers passed in
*/
public static FilePointer union( List<FilePointer> filePointers, GenomeLocParser parser ) {
if ( filePointers == null || filePointers.isEmpty() ) {
return new FilePointer();
}
Map<SAMReaderID, List<GATKChunk>> fileChunks = new HashMap<SAMReaderID, List<GATKChunk>>();
List<GenomeLoc> locations = new ArrayList<GenomeLoc>();
// First extract all intervals and file chunks from the FilePointers into unsorted, unmerged collections
for ( FilePointer filePointer : filePointers ) {
locations.addAll(filePointer.getLocations());
for ( Map.Entry<SAMReaderID, SAMFileSpan> fileSpanEntry : filePointer.getFileSpans().entrySet() ) {
GATKBAMFileSpan fileSpan = (GATKBAMFileSpan)fileSpanEntry.getValue();
if ( fileChunks.containsKey(fileSpanEntry.getKey()) ) {
fileChunks.get(fileSpanEntry.getKey()).addAll(fileSpan.getGATKChunks());
}
else {
fileChunks.put(fileSpanEntry.getKey(), fileSpan.getGATKChunks());
}
}
}
// Now sort and merge the intervals
List<GenomeLoc> sortedMergedLocations = new ArrayList<GenomeLoc>();
sortedMergedLocations.addAll(IntervalUtils.sortAndMergeIntervals(parser, locations, IntervalMergingRule.ALL));
// For each BAM file, convert from an unsorted, unmerged list of chunks to a GATKBAMFileSpan containing
// the sorted, merged union of the chunks for that file
Map<SAMReaderID, SAMFileSpan> mergedFileSpans = new HashMap<SAMReaderID, SAMFileSpan>(fileChunks.size());
for ( Map.Entry<SAMReaderID, List<GATKChunk>> fileChunksEntry : fileChunks.entrySet() ) {
List<GATKChunk> unmergedChunks = fileChunksEntry.getValue();
mergedFileSpans.put(fileChunksEntry.getKey(),
(new GATKBAMFileSpan(unmergedChunks.toArray(new GATKChunk[unmergedChunks.size()]))).union(new GATKBAMFileSpan()));
}
return new FilePointer(mergedFileSpans, sortedMergedLocations);
}
/**
* Returns true if any of the file spans in this FilePointer overlap their counterparts in
* the other FilePointer. "Overlap" is defined as having an overlapping extent (the region