Initial refactoring of read traversal to make it easier to drop in intervalled reads traversal.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2894 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
9a6b384adb
commit
1017a38f38
|
|
@ -200,6 +200,10 @@ class BAMFileReader2
|
|||
return (filePointers != null) ? Chunk.toChunkList(filePointers) : Collections.<Chunk>emptyList();
|
||||
}
|
||||
|
||||
public Long getFilePointer() {
|
||||
return mCompressedInputStream.getFilePointer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to iterate through the SAMRecords that match the given interval.
|
||||
* Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@
|
|||
package net.sf.samtools;
|
||||
|
||||
import net.sf.samtools.util.CloseableIterator;
|
||||
import net.sf.picard.PicardException;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.List;
|
||||
|
|
@ -152,6 +151,12 @@ public class SAMFileReader2 extends SAMFileReader {
|
|||
return reader.getFilePointersBounding(bin);
|
||||
}
|
||||
|
||||
public Chunk getCurrentPosition() {
|
||||
// TODO: Add sanity checks so that we're not doing this against an unsupported BAM file.
|
||||
BAMFileReader2 reader = (BAMFileReader2)JVMUtils.getFieldValue(getField("mReader"),this);
|
||||
return new Chunk(reader.getFilePointer(),Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
private Field getField(String fieldName) {
|
||||
try {
|
||||
return getClass().getSuperclass().getDeclaredField(fieldName);
|
||||
|
|
|
|||
|
|
@ -2,11 +2,13 @@ package org.broadinstitute.sting.gatk.datasources.shards;
|
|||
|
||||
import net.sf.samtools.Chunk;
|
||||
import net.sf.samtools.SAMFileReader2;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
|
||||
/**
|
||||
* A common interface for shards that natively understand the BAM format.
|
||||
|
|
@ -21,6 +23,32 @@ public interface BAMFormatAwareShard extends Shard {
|
|||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getChunks();
|
||||
|
||||
/**
|
||||
* Returns true if this shard is meant to buffer reads, rather
|
||||
* than just holding pointers to their locations.
|
||||
* @return True if this shard can buffer reads. False otherwise.
|
||||
*/
|
||||
public boolean buffersReads();
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
*/
|
||||
public boolean isBufferFull();
|
||||
|
||||
/**
|
||||
* Adds a read to the read buffer.
|
||||
* @param read Add a read to the internal shard buffer.
|
||||
*/
|
||||
public void addRead(SAMRecord read);
|
||||
|
||||
/**
|
||||
* Assuming this iterator buffers reads, an iterator to the reads
|
||||
* stored in the shard.
|
||||
* @return An iterator over the reads stored in the shard.
|
||||
*/
|
||||
public StingSAMIterator iterator();
|
||||
|
||||
/**
|
||||
* Get the bounds of the current shard. Current bounds
|
||||
* will be the unfiltered extents of the current shard, from
|
||||
|
|
|
|||
|
|
@ -2,12 +2,15 @@ package org.broadinstitute.sting.gatk.datasources.shards;
|
|||
|
||||
import net.sf.samtools.Chunk;
|
||||
import net.sf.samtools.SAMFileReader2;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
import java.util.*;
|
||||
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
|
||||
import org.broadinstitute.sting.gatk.Reads;
|
||||
|
||||
/**
|
||||
* Expresses a shard of read data in block format.
|
||||
|
|
@ -17,26 +20,62 @@ import org.broadinstitute.sting.utils.GenomeLoc;
|
|||
*/
|
||||
public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAwareShard {
|
||||
/**
|
||||
* The reader associated with this shard.
|
||||
* Information about the origins of reads.
|
||||
*/
|
||||
private final SAMFileReader2 reader;
|
||||
|
||||
/**
|
||||
* The list of chunks to retrieve when loading this shard.
|
||||
*/
|
||||
private final List<Chunk> chunks;
|
||||
private final Reads sourceInfo;
|
||||
|
||||
public BlockDelimitedReadShard(SAMFileReader2 reader, List<Chunk> chunks) {
|
||||
this.reader = reader;
|
||||
/**
|
||||
* The data backing the next chunks to deliver to the traversal engine.
|
||||
*/
|
||||
private final Map<SAMFileReader2,List<Chunk>> chunks;
|
||||
|
||||
/**
|
||||
* The reads making up this shard.
|
||||
*/
|
||||
private final Collection<SAMRecord> reads = new ArrayList<SAMRecord>(BlockDelimitedReadShardStrategy.MAX_READS);
|
||||
|
||||
public BlockDelimitedReadShard(Reads sourceInfo, Map<SAMFileReader2,List<Chunk>> chunks) {
|
||||
this.sourceInfo = sourceInfo;
|
||||
this.chunks = chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this shard is meant to buffer reads, rather
|
||||
* than just holding pointers to their locations.
|
||||
* @return True if this shard can buffer reads. False otherwise.
|
||||
*/
|
||||
public boolean buffersReads() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
*/
|
||||
public boolean isBufferFull() {
|
||||
return reads.size() > BlockDelimitedReadShardStrategy.MAX_READS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a read to the read buffer.
|
||||
* @param read Add a read to the internal shard buffer.
|
||||
*/
|
||||
public void addRead(SAMRecord read) {
|
||||
if(isBufferFull())
|
||||
throw new StingException("Cannot store more reads in buffer: out of space");
|
||||
reads.add(read);
|
||||
}
|
||||
|
||||
public StingSAMIterator iterator() {
|
||||
return StingSAMIteratorAdapter.adapt(sourceInfo,reads.iterator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of chunks delimiting this shard.
|
||||
* @return a list of chunks that contain data for this shard.
|
||||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getChunks() {
|
||||
return Collections.singletonMap(reader,chunks);
|
||||
return Collections.unmodifiableMap(chunks);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -56,9 +95,14 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(Chunk chunk : chunks) {
|
||||
sb.append(chunk);
|
||||
sb.append(' ');
|
||||
for(Map.Entry<SAMFileReader2,List<Chunk>> entry: chunks.entrySet()) {
|
||||
sb.append(entry.getKey());
|
||||
sb.append(": ");
|
||||
for(Chunk chunk : entry.getValue()) {
|
||||
sb.append(chunk);
|
||||
sb.append(' ');
|
||||
}
|
||||
sb.append(';');
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,11 +3,10 @@ package org.broadinstitute.sting.gatk.datasources.shards;
|
|||
import net.sf.samtools.*;
|
||||
|
||||
import java.util.*;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
|
||||
|
||||
/**
|
||||
* A read shard strategy that delimits based on the number of
|
||||
|
|
@ -18,45 +17,35 @@ import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource
|
|||
*/
|
||||
public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
||||
/**
|
||||
* Number of blocks in a given shard.
|
||||
* What is the maximum number of reads which should go into a read shard.
|
||||
*/
|
||||
protected int blockCount = 100;
|
||||
protected static final int MAX_READS = 10000;
|
||||
|
||||
/**
|
||||
* The data source used to shard.
|
||||
*/
|
||||
protected final SAMDataSource dataSource;
|
||||
protected final BlockDrivenSAMDataSource dataSource;
|
||||
|
||||
/**
|
||||
* The actual chunks streaming into the file.
|
||||
* Position of the last shard in the file.
|
||||
*/
|
||||
private final BAMChunkIterator chunkIterator;
|
||||
private Map<SAMFileReader2,Chunk> position;
|
||||
|
||||
/**
|
||||
* The data backing the next chunks to deliver to the traversal engine.
|
||||
* True if the set of SAM readers is at the end of the stream. False otherwise.
|
||||
*/
|
||||
private final List<Chunk> nextChunks;
|
||||
private boolean atEndOfStream = false;
|
||||
|
||||
/**
|
||||
* Create a new read shard strategy, loading read shards from the given BAM file.
|
||||
* @param dataSource Data source from which to load shards.
|
||||
*/
|
||||
public BlockDelimitedReadShardStrategy(SAMDataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
if(!(dataSource instanceof BlockDrivenSAMDataSource))
|
||||
throw new StingException("Block-delimited read shard strategy cannot work without a block-driven data source.");
|
||||
|
||||
if(dataSource.getReadsInfo().getReadsFiles().size() > 1)
|
||||
throw new UnsupportedOperationException("Experimental sharding only works with a single BAM at the moment.");
|
||||
File bamFile = dataSource.getReadsInfo().getReadsFiles().get(0);
|
||||
try {
|
||||
Chunk headerLocation = BAMFileHeaderLoader.load(bamFile).getLocation();
|
||||
chunkIterator = new BAMChunkIterator(new BAMBlockIterator(bamFile),Arrays.asList(BAMFileHeaderLoader.preambleLocation,headerLocation));
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new StingException("Unable to open BAM file for sharding.");
|
||||
}
|
||||
nextChunks = new ArrayList<Chunk>();
|
||||
|
||||
advance();
|
||||
this.dataSource = (BlockDrivenSAMDataSource)dataSource;
|
||||
this.position = this.dataSource.getCurrentPosition();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -64,7 +53,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
* @return True if any more data is available. False otherwise.
|
||||
*/
|
||||
public boolean hasNext() {
|
||||
return nextChunks.size() > 0;
|
||||
return !atEndOfStream;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -76,11 +65,16 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
if(!hasNext())
|
||||
throw new NoSuchElementException("No such element available: SAM reader has arrived at last shard.");
|
||||
|
||||
if(dataSource.getReaders().size() == 0)
|
||||
throw new StingException("Unable to shard with no readers present.");
|
||||
// TODO: This level of processing should not be necessary.
|
||||
Map<SAMFileReader2,List<Chunk>> boundedPosition = new HashMap<SAMFileReader2,List<Chunk>>();
|
||||
for(Map.Entry<SAMFileReader2,Chunk> entry: position.entrySet())
|
||||
boundedPosition.put(entry.getKey(),Collections.singletonList(entry.getValue()));
|
||||
|
||||
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),boundedPosition);
|
||||
atEndOfStream = dataSource.fillShard(shard);
|
||||
|
||||
this.position = dataSource.getCurrentPosition();
|
||||
|
||||
Shard shard = new BlockDelimitedReadShard((SAMFileReader2)dataSource.getReaders().iterator().next(),Collections.unmodifiableList(new ArrayList<Chunk>(nextChunks)));
|
||||
advance();
|
||||
return shard;
|
||||
}
|
||||
|
||||
|
|
@ -98,12 +92,4 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
public Iterator<Shard> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
private void advance() {
|
||||
nextChunks.clear();
|
||||
int chunksCopied = 0;
|
||||
|
||||
while(chunksCopied++ < blockCount && chunkIterator.hasNext())
|
||||
nextChunks.add(chunkIterator.next());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,8 +3,10 @@ package org.broadinstitute.sting.gatk.datasources.shards;
|
|||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import net.sf.samtools.Chunk;
|
||||
import net.sf.samtools.SAMFileReader2;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -64,6 +66,27 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
|
|||
this.shardType = shardType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this shard is meant to buffer reads, rather
|
||||
* than just holding pointers to their locations.
|
||||
* @return True if this shard can buffer reads. False otherwise.
|
||||
*/
|
||||
public boolean buffersReads() { return false; }
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
*/
|
||||
public boolean isBufferFull() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
/**
|
||||
* Adds a read to the read buffer.
|
||||
* @param read Add a read to the internal shard buffer.
|
||||
*/
|
||||
public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
public StingSAMIterator iterator() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
/**
|
||||
* Gets the chunks associated with this locus shard.
|
||||
* @return A list of the chunks to use when retrieving locus data.
|
||||
|
|
|
|||
|
|
@ -81,6 +81,18 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
return filePointers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the current position within the BAM file.
|
||||
* @return A mapping of reader to current position.
|
||||
*/
|
||||
public Map<SAMFileReader2,Chunk> getCurrentPosition() {
|
||||
Map<SAMFileReader2,Chunk> currentPositions = new HashMap<SAMFileReader2,Chunk>();
|
||||
for(SAMFileReader reader: headerMerger.getReaders()) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)reader;
|
||||
currentPositions.put(reader2,reader2.getCurrentPosition());
|
||||
}
|
||||
return currentPositions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of levels employed by this index.
|
||||
|
|
@ -137,36 +149,65 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
return firstReader.getLastLocusInBin(bin);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fill the given buffering shard with reads.
|
||||
* @param shard Shard to fill.
|
||||
* @return true if at the end of the stream. False otherwise.
|
||||
*/
|
||||
public boolean fillShard(BAMFormatAwareShard shard) {
|
||||
if(!shard.buffersReads())
|
||||
throw new StingException("Attempting to fill a non-buffering shard.");
|
||||
|
||||
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
|
||||
// know why this is. Please add a comment here if you do.
|
||||
boolean enableVerification = shard instanceof ReadShard;
|
||||
|
||||
CloseableIterator<SAMRecord> iterator = getIterator(shard,false,enableVerification);
|
||||
while(!shard.isBufferFull() && iterator.hasNext())
|
||||
shard.addRead(iterator.next());
|
||||
|
||||
boolean atEndOfStream = !iterator.hasNext();
|
||||
|
||||
iterator.close();
|
||||
|
||||
return atEndOfStream;
|
||||
}
|
||||
|
||||
public StingSAMIterator seek(Shard shard) {
|
||||
if(!(shard instanceof BAMFormatAwareShard))
|
||||
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
|
||||
BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard;
|
||||
|
||||
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
|
||||
// know why this is. Please add a comment here if you do.
|
||||
boolean enableVerification = shard instanceof ReadShard;
|
||||
|
||||
if(shard instanceof ReadShard && reads.getReadsFiles().size() > 1)
|
||||
throw new StingException("Experimental read sharding cannot handle multiple BAM files at this point.");
|
||||
if(bamAwareShard.buffersReads()) {
|
||||
return bamAwareShard.iterator();
|
||||
}
|
||||
else {
|
||||
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
|
||||
// know why this is. Please add a comment here if you do.
|
||||
boolean enableVerification = shard instanceof ReadShard;
|
||||
return getIterator(bamAwareShard,true,enableVerification);
|
||||
}
|
||||
}
|
||||
|
||||
private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean addIntervalFilter, boolean enableVerification) {
|
||||
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
|
||||
for(Map.Entry<SAMFileReader2,List<Chunk>> chunksByReader: bamAwareShard.getChunks().entrySet()) {
|
||||
for(Map.Entry<SAMFileReader2,List<Chunk>> chunksByReader: shard.getChunks().entrySet()) {
|
||||
SAMFileReader2 reader = chunksByReader.getKey();
|
||||
readerToIteratorMap.put(reader,reader.iterator(bamAwareShard.getChunks().get(reader)));
|
||||
readerToIteratorMap.put(reader,reader.iterator(shard.getChunks().get(reader)));
|
||||
}
|
||||
|
||||
// Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set.
|
||||
MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true);
|
||||
FilteringIterator filteringIterator = new FilteringIterator(mergingIterator,new IntervalOverlappingFilter(shard.getGenomeLocs()));
|
||||
CloseableIterator<SAMRecord> iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true);
|
||||
if(addIntervalFilter)
|
||||
iterator = new FilteringIterator(iterator,new IntervalOverlappingFilter(shard.getGenomeLocs()));
|
||||
|
||||
return applyDecoratingIterators(enableVerification,
|
||||
StingSAMIteratorAdapter.adapt(reads,filteringIterator),
|
||||
StingSAMIteratorAdapter.adapt(reads,iterator),
|
||||
reads.getDownsamplingFraction(),
|
||||
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
|
||||
reads.getSupplementalFilters());
|
||||
reads.getSupplementalFilters());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets the merged header from the SAM file.
|
||||
* @return The merged header.
|
||||
|
|
|
|||
Loading…
Reference in New Issue