diff --git a/java/src/net/sf/samtools/BAMFileReader2.java b/java/src/net/sf/samtools/BAMFileReader2.java index 5f21f6e13..f0412892f 100644 --- a/java/src/net/sf/samtools/BAMFileReader2.java +++ b/java/src/net/sf/samtools/BAMFileReader2.java @@ -200,6 +200,10 @@ class BAMFileReader2 return (filePointers != null) ? Chunk.toChunkList(filePointers) : Collections.emptyList(); } + public Long getFilePointer() { + return mCompressedInputStream.getFilePointer(); + } + /** * Prepare to iterate through the SAMRecords that match the given interval. * Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed diff --git a/java/src/net/sf/samtools/SAMFileReader2.java b/java/src/net/sf/samtools/SAMFileReader2.java index 2f5ae4ece..766b63271 100644 --- a/java/src/net/sf/samtools/SAMFileReader2.java +++ b/java/src/net/sf/samtools/SAMFileReader2.java @@ -24,7 +24,6 @@ package net.sf.samtools; import net.sf.samtools.util.CloseableIterator; -import net.sf.picard.PicardException; import java.io.*; import java.util.List; @@ -152,6 +151,12 @@ public class SAMFileReader2 extends SAMFileReader { return reader.getFilePointersBounding(bin); } + public Chunk getCurrentPosition() { + // TODO: Add sanity checks so that we're not doing this against an unsupported BAM file. + BAMFileReader2 reader = (BAMFileReader2)JVMUtils.getFieldValue(getField("mReader"),this); + return new Chunk(reader.getFilePointer(),Long.MAX_VALUE); + } + private Field getField(String fieldName) { try { return getClass().getSuperclass().getDeclaredField(fieldName); diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BAMFormatAwareShard.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BAMFormatAwareShard.java index 08e1c3dff..5508d6db0 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BAMFormatAwareShard.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BAMFormatAwareShard.java @@ -2,11 +2,13 @@ package org.broadinstitute.sting.gatk.datasources.shards; import net.sf.samtools.Chunk; import net.sf.samtools.SAMFileReader2; +import net.sf.samtools.SAMRecord; import java.util.List; import java.util.Map; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; /** * A common interface for shards that natively understand the BAM format. @@ -21,6 +23,32 @@ public interface BAMFormatAwareShard extends Shard { */ public Map> getChunks(); + /** + * Returns true if this shard is meant to buffer reads, rather + * than just holding pointers to their locations. + * @return True if this shard can buffer reads. False otherwise. + */ + public boolean buffersReads(); + + /** + * Returns true if the read buffer is currently full. + * @return True if this shard's buffer is full (and the shard can buffer reads). + */ + public boolean isBufferFull(); + + /** + * Adds a read to the read buffer. + * @param read Add a read to the internal shard buffer. + */ + public void addRead(SAMRecord read); + + /** + * Assuming this iterator buffers reads, an iterator to the reads + * stored in the shard. + * @return An iterator over the reads stored in the shard. + */ + public StingSAMIterator iterator(); + /** * Get the bounds of the current shard. Current bounds * will be the unfiltered extents of the current shard, from diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java index 8f82929f7..00426bb15 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java @@ -2,12 +2,15 @@ package org.broadinstitute.sting.gatk.datasources.shards; import net.sf.samtools.Chunk; import net.sf.samtools.SAMFileReader2; +import net.sf.samtools.SAMRecord; -import java.util.List; -import java.util.Map; -import java.util.Collections; +import java.util.*; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.StingException; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; +import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter; +import org.broadinstitute.sting.gatk.Reads; /** * Expresses a shard of read data in block format. @@ -17,26 +20,62 @@ import org.broadinstitute.sting.utils.GenomeLoc; */ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAwareShard { /** - * The reader associated with this shard. + * Information about the origins of reads. */ - private final SAMFileReader2 reader; - - /** - * The list of chunks to retrieve when loading this shard. - */ - private final List chunks; + private final Reads sourceInfo; - public BlockDelimitedReadShard(SAMFileReader2 reader, List chunks) { - this.reader = reader; + /** + * The data backing the next chunks to deliver to the traversal engine. + */ + private final Map> chunks; + + /** + * The reads making up this shard. + */ + private final Collection reads = new ArrayList(BlockDelimitedReadShardStrategy.MAX_READS); + + public BlockDelimitedReadShard(Reads sourceInfo, Map> chunks) { + this.sourceInfo = sourceInfo; this.chunks = chunks; } + /** + * Returns true if this shard is meant to buffer reads, rather + * than just holding pointers to their locations. + * @return True if this shard can buffer reads. False otherwise. + */ + public boolean buffersReads() { + return true; + } + + /** + * Returns true if the read buffer is currently full. + * @return True if this shard's buffer is full (and the shard can buffer reads). + */ + public boolean isBufferFull() { + return reads.size() > BlockDelimitedReadShardStrategy.MAX_READS; + } + + /** + * Adds a read to the read buffer. + * @param read Add a read to the internal shard buffer. + */ + public void addRead(SAMRecord read) { + if(isBufferFull()) + throw new StingException("Cannot store more reads in buffer: out of space"); + reads.add(read); + } + + public StingSAMIterator iterator() { + return StingSAMIteratorAdapter.adapt(sourceInfo,reads.iterator()); + } + /** * Get the list of chunks delimiting this shard. * @return a list of chunks that contain data for this shard. */ public Map> getChunks() { - return Collections.singletonMap(reader,chunks); + return Collections.unmodifiableMap(chunks); } /** @@ -56,9 +95,14 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware @Override public String toString() { StringBuilder sb = new StringBuilder(); - for(Chunk chunk : chunks) { - sb.append(chunk); - sb.append(' '); + for(Map.Entry> entry: chunks.entrySet()) { + sb.append(entry.getKey()); + sb.append(": "); + for(Chunk chunk : entry.getValue()) { + sb.append(chunk); + sb.append(' '); + } + sb.append(';'); } return sb.toString(); } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java index a37bcf1e8..3a8808dcf 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java @@ -3,11 +3,10 @@ package org.broadinstitute.sting.gatk.datasources.shards; import net.sf.samtools.*; import java.util.*; -import java.io.File; -import java.io.IOException; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource; /** * A read shard strategy that delimits based on the number of @@ -18,45 +17,35 @@ import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource */ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { /** - * Number of blocks in a given shard. + * What is the maximum number of reads which should go into a read shard. */ - protected int blockCount = 100; + protected static final int MAX_READS = 10000; /** * The data source used to shard. */ - protected final SAMDataSource dataSource; + protected final BlockDrivenSAMDataSource dataSource; /** - * The actual chunks streaming into the file. + * Position of the last shard in the file. */ - private final BAMChunkIterator chunkIterator; + private Map position; /** - * The data backing the next chunks to deliver to the traversal engine. + * True if the set of SAM readers is at the end of the stream. False otherwise. */ - private final List nextChunks; + private boolean atEndOfStream = false; /** * Create a new read shard strategy, loading read shards from the given BAM file. * @param dataSource Data source from which to load shards. */ public BlockDelimitedReadShardStrategy(SAMDataSource dataSource) { - this.dataSource = dataSource; + if(!(dataSource instanceof BlockDrivenSAMDataSource)) + throw new StingException("Block-delimited read shard strategy cannot work without a block-driven data source."); - if(dataSource.getReadsInfo().getReadsFiles().size() > 1) - throw new UnsupportedOperationException("Experimental sharding only works with a single BAM at the moment."); - File bamFile = dataSource.getReadsInfo().getReadsFiles().get(0); - try { - Chunk headerLocation = BAMFileHeaderLoader.load(bamFile).getLocation(); - chunkIterator = new BAMChunkIterator(new BAMBlockIterator(bamFile),Arrays.asList(BAMFileHeaderLoader.preambleLocation,headerLocation)); - } - catch(IOException ex) { - throw new StingException("Unable to open BAM file for sharding."); - } - nextChunks = new ArrayList(); - - advance(); + this.dataSource = (BlockDrivenSAMDataSource)dataSource; + this.position = this.dataSource.getCurrentPosition(); } /** @@ -64,7 +53,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { * @return True if any more data is available. False otherwise. */ public boolean hasNext() { - return nextChunks.size() > 0; + return !atEndOfStream; } /** @@ -76,11 +65,16 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { if(!hasNext()) throw new NoSuchElementException("No such element available: SAM reader has arrived at last shard."); - if(dataSource.getReaders().size() == 0) - throw new StingException("Unable to shard with no readers present."); + // TODO: This level of processing should not be necessary. + Map> boundedPosition = new HashMap>(); + for(Map.Entry entry: position.entrySet()) + boundedPosition.put(entry.getKey(),Collections.singletonList(entry.getValue())); + + BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),boundedPosition); + atEndOfStream = dataSource.fillShard(shard); + + this.position = dataSource.getCurrentPosition(); - Shard shard = new BlockDelimitedReadShard((SAMFileReader2)dataSource.getReaders().iterator().next(),Collections.unmodifiableList(new ArrayList(nextChunks))); - advance(); return shard; } @@ -98,12 +92,4 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { public Iterator iterator() { return this; } - - private void advance() { - nextChunks.clear(); - int chunksCopied = 0; - - while(chunksCopied++ < blockCount && chunkIterator.hasNext()) - nextChunks.add(chunkIterator.next()); - } } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/IndexDelimitedLocusShard.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/IndexDelimitedLocusShard.java index e751aab11..7b89b7b98 100755 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/IndexDelimitedLocusShard.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/IndexDelimitedLocusShard.java @@ -3,8 +3,10 @@ package org.broadinstitute.sting.gatk.datasources.shards; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import net.sf.samtools.Chunk; import net.sf.samtools.SAMFileReader2; +import net.sf.samtools.SAMRecord; import java.util.List; import java.util.Map; @@ -64,6 +66,27 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa this.shardType = shardType; } + /** + * Returns true if this shard is meant to buffer reads, rather + * than just holding pointers to their locations. + * @return True if this shard can buffer reads. False otherwise. + */ + public boolean buffersReads() { return false; } + + /** + * Returns true if the read buffer is currently full. + * @return True if this shard's buffer is full (and the shard can buffer reads). + */ + public boolean isBufferFull() { throw new UnsupportedOperationException("This shard does not buffer reads."); } + + /** + * Adds a read to the read buffer. + * @param read Add a read to the internal shard buffer. + */ + public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); } + + public StingSAMIterator iterator() { throw new UnsupportedOperationException("This shard does not buffer reads."); } + /** * Gets the chunks associated with this locus shard. * @return A list of the chunks to use when retrieving locus data. diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java index 59ea2a2ab..8c89634d8 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java @@ -81,6 +81,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { return filePointers; } + /** + * Retrieves the current position within the BAM file. + * @return A mapping of reader to current position. + */ + public Map getCurrentPosition() { + Map currentPositions = new HashMap(); + for(SAMFileReader reader: headerMerger.getReaders()) { + SAMFileReader2 reader2 = (SAMFileReader2)reader; + currentPositions.put(reader2,reader2.getCurrentPosition()); + } + return currentPositions; + } /** * Get the number of levels employed by this index. @@ -137,36 +149,65 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { return firstReader.getLastLocusInBin(bin); } + /** + * Fill the given buffering shard with reads. + * @param shard Shard to fill. + * @return true if at the end of the stream. False otherwise. + */ + public boolean fillShard(BAMFormatAwareShard shard) { + if(!shard.buffersReads()) + throw new StingException("Attempting to fill a non-buffering shard."); + + // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't + // know why this is. Please add a comment here if you do. + boolean enableVerification = shard instanceof ReadShard; + + CloseableIterator iterator = getIterator(shard,false,enableVerification); + while(!shard.isBufferFull() && iterator.hasNext()) + shard.addRead(iterator.next()); + + boolean atEndOfStream = !iterator.hasNext(); + + iterator.close(); + + return atEndOfStream; + } public StingSAMIterator seek(Shard shard) { if(!(shard instanceof BAMFormatAwareShard)) throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass()); BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard; - // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't - // know why this is. Please add a comment here if you do. - boolean enableVerification = shard instanceof ReadShard; - - if(shard instanceof ReadShard && reads.getReadsFiles().size() > 1) - throw new StingException("Experimental read sharding cannot handle multiple BAM files at this point."); + if(bamAwareShard.buffersReads()) { + return bamAwareShard.iterator(); + } + else { + // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't + // know why this is. Please add a comment here if you do. + boolean enableVerification = shard instanceof ReadShard; + return getIterator(bamAwareShard,true,enableVerification); + } + } + private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean addIntervalFilter, boolean enableVerification) { Map> readerToIteratorMap = new HashMap>(); - for(Map.Entry> chunksByReader: bamAwareShard.getChunks().entrySet()) { + for(Map.Entry> chunksByReader: shard.getChunks().entrySet()) { SAMFileReader2 reader = chunksByReader.getKey(); - readerToIteratorMap.put(reader,reader.iterator(bamAwareShard.getChunks().get(reader))); + readerToIteratorMap.put(reader,reader.iterator(shard.getChunks().get(reader))); } // Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set. - MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true); - FilteringIterator filteringIterator = new FilteringIterator(mergingIterator,new IntervalOverlappingFilter(shard.getGenomeLocs())); + CloseableIterator iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true); + if(addIntervalFilter) + iterator = new FilteringIterator(iterator,new IntervalOverlappingFilter(shard.getGenomeLocs())); return applyDecoratingIterators(enableVerification, - StingSAMIteratorAdapter.adapt(reads,filteringIterator), + StingSAMIteratorAdapter.adapt(reads,iterator), reads.getDownsamplingFraction(), reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION), - reads.getSupplementalFilters()); + reads.getSupplementalFilters()); } - + /** * Gets the merged header from the SAM file. * @return The merged header.