Mediocre implementation of reader pooling within the SAM data source. Will fix this week.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2915 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-03-02 22:35:02 +00:00
parent 74a5223b11
commit 104f4f7383
4 changed files with 258 additions and 59 deletions

View File

@ -38,6 +38,8 @@ import org.broadinstitute.sting.utils.StingException;
* Class for reading and querying SAM/BAM files. Delegates to appropriate concrete implementation. * Class for reading and querying SAM/BAM files. Delegates to appropriate concrete implementation.
*/ */
public class SAMFileReader2 extends SAMFileReader { public class SAMFileReader2 extends SAMFileReader {
private final File sourceFile;
/** /**
* Prepare to read a SAM or BAM file. If the given file is a BAM, and has a companion BAI index file * Prepare to read a SAM or BAM file. If the given file is a BAM, and has a companion BAI index file
*/ */
@ -66,6 +68,7 @@ public class SAMFileReader2 extends SAMFileReader {
*/ */
public SAMFileReader2(final File file, final File indexFile, final boolean eagerDecode){ public SAMFileReader2(final File file, final File indexFile, final boolean eagerDecode){
super(file,indexFile,eagerDecode); super(file,indexFile,eagerDecode);
this.sourceFile = file;
close(); close();
try { try {
@ -181,6 +184,17 @@ public class SAMFileReader2 extends SAMFileReader {
catch(NoSuchMethodException ex) { catch(NoSuchMethodException ex) {
throw new StingException("Unable to run method findIndexFile",ex); throw new StingException("Unable to run method findIndexFile",ex);
} }
}
@Override
public boolean equals(Object other) {
if(other == null) return false;
if(!(other instanceof SAMFileReader2)) return false;
return this.sourceFile.equals(((SAMFileReader2)other).sourceFile);
}
@Override
public int hashCode() {
return sourceFile.hashCode();
} }
} }

View File

@ -133,8 +133,19 @@ public class LocusReferenceView extends ReferenceView {
initializeReferenceSequence(GenomeLocParser.createGenomeLoc(bounds.getContig(), expandedStart, expandedStop)); initializeReferenceSequence(GenomeLocParser.createGenomeLoc(bounds.getContig(), expandedStart, expandedStop));
} }
/**
* Initialize the bounds of this shard, trimming the bounds so that they match the reference.
* @param provider Provider covering the appropriate locus.
*/
private void initializeBounds(ShardDataProvider provider) { private void initializeBounds(ShardDataProvider provider) {
bounds = provider.getLocus(); if(provider.getLocus() != null) {
long sequenceLength = reference.getSequenceDictionary().getSequence(provider.getLocus().getContig()).getSequenceLength();
bounds = GenomeLocParser.createGenomeLoc(provider.getLocus().getContig(),
Math.max(provider.getLocus().getStart(),1),
Math.min(provider.getLocus().getStop(),sequenceLength));
}
else
bounds = null;
} }
/** /**

View File

@ -24,7 +24,15 @@ import java.io.File;
*/ */
public class BlockDrivenSAMDataSource extends SAMDataSource { public class BlockDrivenSAMDataSource extends SAMDataSource {
private final SamFileHeaderMerger headerMerger; /**
* A collection of readers driving the merging process.
*/
private final SAMResourcePool resourcePool;
/**
* The merged header.
*/
private final SAMFileHeader header;
/** /**
* Create a new block-aware SAM data source given the supplied read metadata. * Create a new block-aware SAM data source given the supplied read metadata.
@ -35,22 +43,33 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
logger.warn("Experimental sharding is enabled. Many use cases are not supported. Please use with care."); logger.warn("Experimental sharding is enabled. Many use cases are not supported. Please use with care.");
Collection<SAMFileReader> readers = new ArrayList<SAMFileReader>(); resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
for(File readsFile: reads.getReadsFiles()) { Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
SAMFileReader2 reader = new SAMFileReader2(readsFile); header = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true).getMergedHeader();
reader.setValidationStringency(reads.getValidationStringency()); resourcePool.releaseReaders(readers);
readers.add(reader);
}
this.headerMerger = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true);
} }
public boolean hasIndex() { public boolean hasIndex() {
for(SAMFileReader reader: headerMerger.getReaders()) { Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
try {
return hasIndex(readers);
}
finally {
resourcePool.releaseReaders(readers);
}
}
/**
* Report whether a given collection of SAM file readers is indexed.
* @param readers The collection of readers.
* @return True if the given collection of readers is indexed.
*/
private boolean hasIndex(Collection<SAMFileReader> readers) {
for(SAMFileReader reader: readers) {
if(!reader.hasIndex()) if(!reader.hasIndex())
return false; return false;
} }
return true; return true;
} }
/** /**
@ -59,12 +78,19 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return A map of reader back to bin. * @return A map of reader back to bin.
*/ */
public List<Bin> getOverlappingBins(final GenomeLoc location) { public List<Bin> getOverlappingBins(final GenomeLoc location) {
if(headerMerger.getReaders().size() == 0) Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
return Collections.emptyList();
// All readers will have the same bin structure, so just use the first bin as an example. try {
SAMFileReader2 reader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); if(readers.size() == 0)
return reader.getOverlappingBins(location.getContig(),(int)location.getStart(),(int)location.getStop()); return Collections.emptyList();
// All readers will have the same bin structure, so just use the first bin as an example.
SAMFileReader2 reader = (SAMFileReader2)readers.iterator().next();
return reader.getOverlappingBins(location.getContig(),(int)location.getStart(),(int)location.getStop());
}
finally {
resourcePool.releaseReaders(readers);
}
} }
/** /**
@ -73,12 +99,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return A map of the file pointers bounding the bin. * @return A map of the file pointers bounding the bin.
*/ */
public Map<SAMFileReader2,List<Chunk>> getFilePointersBounding(Bin bin) { public Map<SAMFileReader2,List<Chunk>> getFilePointersBounding(Bin bin) {
Map<SAMFileReader2,List<Chunk>> filePointers = new HashMap<SAMFileReader2,List<Chunk>>(); Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
for(SAMFileReader reader: headerMerger.getReaders()) { try {
SAMFileReader2 reader2 = (SAMFileReader2)reader; Map<SAMFileReader2,List<Chunk>> filePointers = new HashMap<SAMFileReader2,List<Chunk>>();
filePointers.put(reader2,reader2.getFilePointersBounding(bin)); for(SAMFileReader reader: readers) {
SAMFileReader2 reader2 = (SAMFileReader2)reader;
filePointers.put(reader2,reader2.getFilePointersBounding(bin));
}
return filePointers;
}
finally {
resourcePool.releaseReaders(readers);
} }
return filePointers;
} }
/** /**
@ -86,12 +118,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return A mapping of reader to current position. * @return A mapping of reader to current position.
*/ */
public Map<SAMFileReader2,Chunk> getCurrentPosition() { public Map<SAMFileReader2,Chunk> getCurrentPosition() {
Map<SAMFileReader2,Chunk> currentPositions = new HashMap<SAMFileReader2,Chunk>(); Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
for(SAMFileReader reader: headerMerger.getReaders()) { try {
SAMFileReader2 reader2 = (SAMFileReader2)reader; Map<SAMFileReader2,Chunk> currentPositions = new HashMap<SAMFileReader2,Chunk>();
currentPositions.put(reader2,reader2.getCurrentPosition()); for(SAMFileReader reader: readers) {
SAMFileReader2 reader2 = (SAMFileReader2)reader;
currentPositions.put(reader2,reader2.getCurrentPosition());
}
return currentPositions;
}
finally {
resourcePool.releaseReaders(readers);
} }
return currentPositions;
} }
/** /**
@ -99,12 +137,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return Number of levels in this index. * @return Number of levels in this index.
*/ */
public int getNumIndexLevels() { public int getNumIndexLevels() {
if(headerMerger.getReaders().size() == 0) Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
throw new StingException("Unable to determine number of index levels; no BAMs are present."); try {
if(!hasIndex()) if(readers.size() == 0)
throw new SAMException("Unable to determine number of index levels; BAM file index is not present."); throw new StingException("Unable to determine number of index levels; no BAMs are present.");
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); if(!hasIndex(readers))
return firstReader.getNumIndexLevels(); throw new SAMException("Unable to determine number of index levels; BAM file index is not present.");
SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next();
return firstReader.getNumIndexLevels();
}
finally {
resourcePool.releaseReaders(readers);
}
} }
/** /**
@ -113,12 +157,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return the level associated with the given bin number. * @return the level associated with the given bin number.
*/ */
public int getLevelForBin(final Bin bin) { public int getLevelForBin(final Bin bin) {
if(headerMerger.getReaders().size() == 0) Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
throw new StingException("Unable to determine number of level for bin; no BAMs are present."); try {
if(!hasIndex()) if(readers.size() == 0)
throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); throw new StingException("Unable to determine number of level for bin; no BAMs are present.");
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); if(!hasIndex(readers))
return firstReader.getLevelForBin(bin); throw new SAMException("Unable to determine number of level for bin; BAM file index is not present.");
SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next();
return firstReader.getLevelForBin(bin);
}
finally {
resourcePool.releaseReaders(readers);
}
} }
/** /**
@ -127,12 +177,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return The last position that the given bin can represent. * @return The last position that the given bin can represent.
*/ */
public int getFirstLocusInBin(final Bin bin) { public int getFirstLocusInBin(final Bin bin) {
if(headerMerger.getReaders().size() == 0) Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
throw new StingException("Unable to determine number of level for bin; no BAMs are present."); try {
if(!hasIndex()) if(readers.size() == 0)
throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); throw new StingException("Unable to determine number of level for bin; no BAMs are present.");
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); if(!hasIndex(readers))
return firstReader.getFirstLocusInBin(bin); throw new SAMException("Unable to determine number of level for bin; BAM file index is not present.");
SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next();
return firstReader.getFirstLocusInBin(bin);
}
finally {
resourcePool.releaseReaders(readers);
}
} }
/** /**
@ -141,12 +197,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return The last position that the given bin can represent. * @return The last position that the given bin can represent.
*/ */
public int getLastLocusInBin(final Bin bin) { public int getLastLocusInBin(final Bin bin) {
if(headerMerger.getReaders().size() == 0) Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
throw new StingException("Unable to determine number of level for bin; no BAMs are present."); try {
if(!hasIndex()) if(readers.size() == 0)
throw new SAMException("Unable to determine number of level for bin; BAM file index is not present."); throw new StingException("Unable to determine number of level for bin; no BAMs are present.");
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next(); if(!hasIndex(readers))
return firstReader.getLastLocusInBin(bin); throw new SAMException("Unable to determine number of level for bin; BAM file index is not present.");
SAMFileReader2 firstReader = (SAMFileReader2)readers.iterator().next();
return firstReader.getLastLocusInBin(bin);
}
finally {
resourcePool.releaseReaders(readers);
}
} }
/** /**
@ -162,7 +224,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
// know why this is. Please add a comment here if you do. // know why this is. Please add a comment here if you do.
boolean enableVerification = shard instanceof ReadShard; boolean enableVerification = shard instanceof ReadShard;
CloseableIterator<SAMRecord> iterator = getIterator(shard,false,enableVerification); CloseableIterator<SAMRecord> iterator = getIterator(shard,enableVerification);
while(!shard.isBufferFull() && iterator.hasNext()) while(!shard.isBufferFull() && iterator.hasNext())
shard.addRead(iterator.next()); shard.addRead(iterator.next());
@ -185,24 +247,29 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
// know why this is. Please add a comment here if you do. // know why this is. Please add a comment here if you do.
boolean enableVerification = shard instanceof ReadShard; boolean enableVerification = shard instanceof ReadShard;
return getIterator(bamAwareShard,true,enableVerification); return getIterator(bamAwareShard,enableVerification);
} }
} }
private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean addIntervalFilter, boolean enableVerification) { private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean enableVerification) {
Collection<SAMFileReader> readers = resourcePool.getAvailableReaders();
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>(); Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
for(Map.Entry<SAMFileReader2,List<Chunk>> chunksByReader: shard.getChunks().entrySet()) { for(SAMFileReader reader: readers) {
SAMFileReader2 reader = chunksByReader.getKey(); SAMFileReader2 reader2 = (SAMFileReader2)reader;
readerToIteratorMap.put(reader,reader.iterator(shard.getChunks().get(reader))); List<Chunk> chunks = shard.getChunks().get(reader2);
readerToIteratorMap.put(reader2,reader2.iterator(chunks));
} }
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true);
// Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set. // Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set.
CloseableIterator<SAMRecord> iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true); CloseableIterator<SAMRecord> iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true);
if(shard.getFilter() != null) if(shard.getFilter() != null)
iterator = new FilteringIterator(iterator,shard.getFilter()); iterator = new FilteringIterator(iterator,shard.getFilter());
return applyDecoratingIterators(enableVerification, return applyDecoratingIterators(enableVerification,
StingSAMIteratorAdapter.adapt(reads,iterator), new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,iterator)),
reads.getDownsamplingFraction(), reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION), reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters()); reads.getSupplementalFilters());
@ -213,7 +280,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return The merged header. * @return The merged header.
*/ */
public SAMFileHeader getHeader() { public SAMFileHeader getHeader() {
return headerMerger.getMergedHeader(); return header;
} }
/** /**
@ -239,4 +306,110 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) { public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) {
throw new UnsupportedOperationException("Getting read group ID from this experimental SAM reader is not currently supported."); throw new UnsupportedOperationException("Getting read group ID from this experimental SAM reader is not currently supported.");
} }
private class SAMResourcePool {
/**
* How many entries can be cached in this resource pool?
*/
private final int maxEntries;
/**
* All iterators of this reference-ordered data.
*/
private List<SAMFileReaders> allResources = new ArrayList<SAMFileReaders>();
/**
* All iterators that are not currently in service.
*/
private List<SAMFileReaders> availableResources = new ArrayList<SAMFileReaders>();
public SAMResourcePool(final int maxEntries) {
this.maxEntries = maxEntries;
}
/**
* Choose a set of readers from the pool to use for this query. When complete,
* @return
*/
public synchronized Collection<SAMFileReader> getAvailableReaders() {
if(availableResources.size() == 0)
createNewResource();
SAMFileReaders readers = availableResources.get(0);
availableResources.remove(readers);
return readers;
}
public synchronized void releaseReaders(Collection<SAMFileReader> readers) {
if(!allResources.contains(readers))
throw new StingException("Tried to return readers from the pool that didn't originate in the pool.");
availableResources.add((SAMFileReaders)readers);
}
private synchronized void createNewResource() {
if(allResources.size() > maxEntries)
throw new StingException("Cannot create a new resource pool. All resources are in use.");
SAMFileReaders readers = new SAMFileReaders(reads);
allResources.add(readers);
availableResources.add(readers);
}
/**
* A collection of readers derived from a reads metadata structure.
*/
private class SAMFileReaders extends ArrayList<SAMFileReader> {
/**
* Derive a new set of readers from the Reads metadata.
* @param sourceInfo Metadata for the reads to load.
*/
public SAMFileReaders(Reads sourceInfo) {
for(File readsFile: sourceInfo.getReadsFiles()) {
SAMFileReader2 reader = new SAMFileReader2(readsFile);
reader.setValidationStringency(sourceInfo.getValidationStringency());
add(reader);
}
}
}
}
private class ReleasingIterator implements StingSAMIterator {
/**
* The resource acting as the source of the data.
*/
private final Collection<SAMFileReader> resource;
/**
* The iterator to wrap.
*/
private final StingSAMIterator wrappedIterator;
public Reads getSourceInfo() {
return wrappedIterator.getSourceInfo();
}
public ReleasingIterator( Collection<SAMFileReader> resource, StingSAMIterator wrapped ) {
this.resource = resource;
this.wrappedIterator = wrapped;
}
public ReleasingIterator iterator() {
return this;
}
public void remove() {
throw new UnsupportedOperationException("Can't remove from a StingSAMIterator");
}
public void close() {
wrappedIterator.close();
resourcePool.releaseReaders(resource);
}
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecord next() {
return wrappedIterator.next();
}
}
} }

View File

@ -45,6 +45,7 @@ import java.io.File;
* @author hanna * @author hanna
* @version 0.1 * @version 0.1
*/ */
@Deprecated
class SAMResourcePool extends ResourcePool<ReadStreamResource, StingSAMIterator> { class SAMResourcePool extends ResourcePool<ReadStreamResource, StingSAMIterator> {
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(SAMResourcePool.class); protected static Logger logger = Logger.getLogger(SAMResourcePool.class);