diff --git a/java/src/net/sf/samtools/SAMFileReader2.java b/java/src/net/sf/samtools/SAMFileReader2.java index 766b63271..a9377e17d 100644 --- a/java/src/net/sf/samtools/SAMFileReader2.java +++ b/java/src/net/sf/samtools/SAMFileReader2.java @@ -38,6 +38,8 @@ import org.broadinstitute.sting.utils.StingException; * Class for reading and querying SAM/BAM files. Delegates to appropriate concrete implementation. */ public class SAMFileReader2 extends SAMFileReader { + private final File sourceFile; + /** * Prepare to read a SAM or BAM file. If the given file is a BAM, and has a companion BAI index file */ @@ -66,6 +68,7 @@ public class SAMFileReader2 extends SAMFileReader { */ public SAMFileReader2(final File file, final File indexFile, final boolean eagerDecode){ super(file,indexFile,eagerDecode); + this.sourceFile = file; close(); try { @@ -181,6 +184,17 @@ public class SAMFileReader2 extends SAMFileReader { catch(NoSuchMethodException ex) { throw new StingException("Unable to run method findIndexFile",ex); } + } + @Override + public boolean equals(Object other) { + if(other == null) return false; + if(!(other instanceof SAMFileReader2)) return false; + return this.sourceFile.equals(((SAMFileReader2)other).sourceFile); + } + + @Override + public int hashCode() { + return sourceFile.hashCode(); } } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusReferenceView.java b/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusReferenceView.java index 64b38bf6b..a46cafb78 100755 --- a/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusReferenceView.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusReferenceView.java @@ -133,8 +133,19 @@ public class LocusReferenceView extends ReferenceView { initializeReferenceSequence(GenomeLocParser.createGenomeLoc(bounds.getContig(), expandedStart, expandedStop)); } + /** + * Initialize the bounds of this shard, trimming the bounds so that they match the reference. + * @param provider Provider covering the appropriate locus. + */ private void initializeBounds(ShardDataProvider provider) { - bounds = provider.getLocus(); + if(provider.getLocus() != null) { + long sequenceLength = reference.getSequenceDictionary().getSequence(provider.getLocus().getContig()).getSequenceLength(); + bounds = GenomeLocParser.createGenomeLoc(provider.getLocus().getContig(), + Math.max(provider.getLocus().getStart(),1), + Math.min(provider.getLocus().getStop(),sequenceLength)); + } + else + bounds = null; } /** diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java index 0b369c9da..ac6e5d2fa 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java @@ -24,7 +24,15 @@ import java.io.File; */ public class BlockDrivenSAMDataSource extends SAMDataSource { - private final SamFileHeaderMerger headerMerger; + /** + * A collection of readers driving the merging process. + */ + private final SAMResourcePool resourcePool; + + /** + * The merged header. + */ + private final SAMFileHeader header; /** * Create a new block-aware SAM data source given the supplied read metadata. @@ -35,22 +43,33 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { logger.warn("Experimental sharding is enabled. Many use cases are not supported. Please use with care."); - Collection readers = new ArrayList(); - for(File readsFile: reads.getReadsFiles()) { - SAMFileReader2 reader = new SAMFileReader2(readsFile); - reader.setValidationStringency(reads.getValidationStringency()); - readers.add(reader); - } - - this.headerMerger = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true); + resourcePool = new SAMResourcePool(Integer.MAX_VALUE); + Collection readers = resourcePool.getAvailableReaders(); + header = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true).getMergedHeader(); + resourcePool.releaseReaders(readers); } public boolean hasIndex() { - for(SAMFileReader reader: headerMerger.getReaders()) { + Collection readers = resourcePool.getAvailableReaders(); + try { + return hasIndex(readers); + } + finally { + resourcePool.releaseReaders(readers); + } + } + + /** + * Report whether a given collection of SAM file readers is indexed. + * @param readers The collection of readers. + * @return True if the given collection of readers is indexed. + */ + private boolean hasIndex(Collection readers) { + for(SAMFileReader reader: readers) { if(!reader.hasIndex()) return false; } - return true; + return true; } /** @@ -59,12 +78,19 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return A map of reader back to bin. */ public List getOverlappingBins(final GenomeLoc location) { - if(headerMerger.getReaders().size() == 0) - return Collections.emptyList(); + Collection readers = resourcePool.getAvailableReaders(); - // All readers will have the same bin structure, so just use the first bin as an example. - SAMFileReader2 reader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); - return reader.getOverlappingBins(location.getContig(),(int)location.getStart(),(int)location.getStop()); + try { + if(readers.size() == 0) + return Collections.emptyList(); + + // All readers will have the same bin structure, so just use the first bin as an example. + SAMFileReader2 reader = (SAMFileReader2)readers.iterator().next(); + return reader.getOverlappingBins(location.getContig(),(int)location.getStart(),(int)location.getStop()); + } + finally { + resourcePool.releaseReaders(readers); + } } /** @@ -73,12 +99,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return A map of the file pointers bounding the bin. */ public Map> getFilePointersBounding(Bin bin) { - Map> filePointers = new HashMap>(); - for(SAMFileReader reader: headerMerger.getReaders()) { - SAMFileReader2 reader2 = (SAMFileReader2)reader; - filePointers.put(reader2,reader2.getFilePointersBounding(bin)); + Collection readers = resourcePool.getAvailableReaders(); + try { + Map> filePointers = new HashMap>(); + for(SAMFileReader reader: readers) { + SAMFileReader2 reader2 = (SAMFileReader2)reader; + filePointers.put(reader2,reader2.getFilePointersBounding(bin)); + } + return filePointers; + } + finally { + resourcePool.releaseReaders(readers); } - return filePointers; } /** @@ -86,12 +118,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return A mapping of reader to current position. */ public Map getCurrentPosition() { - Map currentPositions = new HashMap(); - for(SAMFileReader reader: headerMerger.getReaders()) { - SAMFileReader2 reader2 = (SAMFileReader2)reader; - currentPositions.put(reader2,reader2.getCurrentPosition()); + Collection readers = resourcePool.getAvailableReaders(); + try { + Map currentPositions = new HashMap(); + for(SAMFileReader reader: readers) { + SAMFileReader2 reader2 = (SAMFileReader2)reader; + currentPositions.put(reader2,reader2.getCurrentPosition()); + } + return currentPositions; + } + finally { + resourcePool.releaseReaders(readers); } - return currentPositions; } /** @@ -99,12 +137,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return Number of levels in this index. */ public int getNumIndexLevels() { - if(headerMerger.getReaders().size() == 0) - throw new StingException("Unable to determine number of index levels; no BAMs are present."); - if(!hasIndex()) - throw new SAMException("Unable to determine number of index levels; BAM file index is not present."); - SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); - return firstReader.getNumIndexLevels(); + Collection readers = resourcePool.getAvailableReaders(); + try { + if(readers.size() == 0) + throw new StingException("Unable to determine number of index levels; no BAMs are present."); + if(!hasIndex(readers)) + throw new SAMException("Unable to determine number of index levels; BAM file index is not present."); + SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next(); + return firstReader.getNumIndexLevels(); + } + finally { + resourcePool.releaseReaders(readers); + } } /** @@ -113,12 +157,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return the level associated with the given bin number. */ public int getLevelForBin(final Bin bin) { - if(headerMerger.getReaders().size() == 0) - throw new StingException("Unable to determine number of level for bin; no BAMs are present."); - if(!hasIndex()) - throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); - SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); - return firstReader.getLevelForBin(bin); + Collection readers = resourcePool.getAvailableReaders(); + try { + if(readers.size() == 0) + throw new StingException("Unable to determine number of level for bin; no BAMs are present."); + if(!hasIndex(readers)) + throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); + SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next(); + return firstReader.getLevelForBin(bin); + } + finally { + resourcePool.releaseReaders(readers); + } } /** @@ -127,12 +177,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return The last position that the given bin can represent. */ public int getFirstLocusInBin(final Bin bin) { - if(headerMerger.getReaders().size() == 0) - throw new StingException("Unable to determine number of level for bin; no BAMs are present."); - if(!hasIndex()) - throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); - SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); - return firstReader.getFirstLocusInBin(bin); + Collection readers = resourcePool.getAvailableReaders(); + try { + if(readers.size() == 0) + throw new StingException("Unable to determine number of level for bin; no BAMs are present."); + if(!hasIndex(readers)) + throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); + SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next(); + return firstReader.getFirstLocusInBin(bin); + } + finally { + resourcePool.releaseReaders(readers); + } } /** @@ -141,12 +197,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return The last position that the given bin can represent. */ public int getLastLocusInBin(final Bin bin) { - if(headerMerger.getReaders().size() == 0) - throw new StingException("Unable to determine number of level for bin; no BAMs are present."); - if(!hasIndex()) - throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); - SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); - return firstReader.getLastLocusInBin(bin); + Collection readers = resourcePool.getAvailableReaders(); + try { + if(readers.size() == 0) + throw new StingException("Unable to determine number of level for bin; no BAMs are present."); + if(!hasIndex(readers)) + throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); + SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next(); + return firstReader.getLastLocusInBin(bin); + } + finally { + resourcePool.releaseReaders(readers); + } } /** @@ -162,7 +224,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { // know why this is. Please add a comment here if you do. boolean enableVerification = shard instanceof ReadShard; - CloseableIterator iterator = getIterator(shard,false,enableVerification); + CloseableIterator iterator = getIterator(shard,enableVerification); while(!shard.isBufferFull() && iterator.hasNext()) shard.addRead(iterator.next()); @@ -185,24 +247,29 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't // know why this is. Please add a comment here if you do. boolean enableVerification = shard instanceof ReadShard; - return getIterator(bamAwareShard,true,enableVerification); + return getIterator(bamAwareShard,enableVerification); } } - private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean addIntervalFilter, boolean enableVerification) { + private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean enableVerification) { + Collection readers = resourcePool.getAvailableReaders(); + Map> readerToIteratorMap = new HashMap>(); - for(Map.Entry> chunksByReader: shard.getChunks().entrySet()) { - SAMFileReader2 reader = chunksByReader.getKey(); - readerToIteratorMap.put(reader,reader.iterator(shard.getChunks().get(reader))); + for(SAMFileReader reader: readers) { + SAMFileReader2 reader2 = (SAMFileReader2)reader; + List chunks = shard.getChunks().get(reader2); + readerToIteratorMap.put(reader2,reader2.iterator(chunks)); } + SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true); + // Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set. CloseableIterator iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true); if(shard.getFilter() != null) iterator = new FilteringIterator(iterator,shard.getFilter()); return applyDecoratingIterators(enableVerification, - StingSAMIteratorAdapter.adapt(reads,iterator), + new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,iterator)), reads.getDownsamplingFraction(), reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION), reads.getSupplementalFilters()); @@ -213,7 +280,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { * @return The merged header. */ public SAMFileHeader getHeader() { - return headerMerger.getMergedHeader(); + return header; } /** @@ -239,4 +306,110 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) { throw new UnsupportedOperationException("Getting read group ID from this experimental SAM reader is not currently supported."); } + + private class SAMResourcePool { + /** + * How many entries can be cached in this resource pool? + */ + private final int maxEntries; + + /** + * All iterators of this reference-ordered data. + */ + private List allResources = new ArrayList(); + + /** + * All iterators that are not currently in service. + */ + private List availableResources = new ArrayList(); + + public SAMResourcePool(final int maxEntries) { + this.maxEntries = maxEntries; + } + + /** + * Choose a set of readers from the pool to use for this query. When complete, + * @return + */ + public synchronized Collection getAvailableReaders() { + if(availableResources.size() == 0) + createNewResource(); + SAMFileReaders readers = availableResources.get(0); + availableResources.remove(readers); + return readers; + } + + public synchronized void releaseReaders(Collection readers) { + if(!allResources.contains(readers)) + throw new StingException("Tried to return readers from the pool that didn't originate in the pool."); + availableResources.add((SAMFileReaders)readers); + } + + private synchronized void createNewResource() { + if(allResources.size() > maxEntries) + throw new StingException("Cannot create a new resource pool. All resources are in use."); + SAMFileReaders readers = new SAMFileReaders(reads); + allResources.add(readers); + availableResources.add(readers); + } + + /** + * A collection of readers derived from a reads metadata structure. + */ + private class SAMFileReaders extends ArrayList { + /** + * Derive a new set of readers from the Reads metadata. + * @param sourceInfo Metadata for the reads to load. + */ + public SAMFileReaders(Reads sourceInfo) { + for(File readsFile: sourceInfo.getReadsFiles()) { + SAMFileReader2 reader = new SAMFileReader2(readsFile); + reader.setValidationStringency(sourceInfo.getValidationStringency()); + add(reader); + } + } + } + } + + private class ReleasingIterator implements StingSAMIterator { + /** + * The resource acting as the source of the data. + */ + private final Collection resource; + + /** + * The iterator to wrap. + */ + private final StingSAMIterator wrappedIterator; + + public Reads getSourceInfo() { + return wrappedIterator.getSourceInfo(); + } + + public ReleasingIterator( Collection resource, StingSAMIterator wrapped ) { + this.resource = resource; + this.wrappedIterator = wrapped; + } + + public ReleasingIterator iterator() { + return this; + } + + public void remove() { + throw new UnsupportedOperationException("Can't remove from a StingSAMIterator"); + } + + public void close() { + wrappedIterator.close(); + resourcePool.releaseReaders(resource); + } + + public boolean hasNext() { + return wrappedIterator.hasNext(); + } + + public SAMRecord next() { + return wrappedIterator.next(); + } + } } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMResourcePool.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMResourcePool.java index 070a01077..5d6e73775 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMResourcePool.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMResourcePool.java @@ -45,6 +45,7 @@ import java.io.File; * @author hanna * @version 0.1 */ +@Deprecated class SAMResourcePool extends ResourcePool { /** our log, which we want to capture anything from this class */ protected static Logger logger = Logger.getLogger(SAMResourcePool.class);