Cleanup and simplification of read interval sharding.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2944 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
ee913eca07
commit
1ef1091f7c
|
|
@ -45,6 +45,7 @@ class BAMFileReader2
|
|||
private BinaryCodec mStream = null;
|
||||
// Underlying compressed data stream.
|
||||
private final BlockCompressedInputStream mCompressedInputStream;
|
||||
private SAMFileReader mFileReader = null;
|
||||
private SAMFileHeader mFileHeader = null;
|
||||
// Populated if the file is seekable and an index exists
|
||||
private BAMFileIndex2 mFileIndex = null;
|
||||
|
|
@ -100,6 +101,14 @@ class BAMFileReader2
|
|||
mFirstRecordPointer = mCompressedInputStream.getFilePointer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the reader reading this file.
|
||||
* @param reader The source reader.
|
||||
*/
|
||||
void setReader(SAMFileReader reader) {
|
||||
mFileReader = reader;
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (mStream != null) {
|
||||
mStream.close();
|
||||
|
|
@ -415,11 +424,16 @@ class BAMFileReader2
|
|||
|
||||
void advance() {
|
||||
try {
|
||||
long startCoordinate = mCompressedInputStream.getFilePointer();
|
||||
mNextRecord = getNextRecord();
|
||||
long stopCoordinate = mCompressedInputStream.getFilePointer();
|
||||
|
||||
if (mNextRecord != null) {
|
||||
++this.samRecordIndex;
|
||||
// Because some decoding is done lazily, the record needs to remember the validation stringency.
|
||||
mNextRecord.setReader(mFileReader);
|
||||
mNextRecord.setValidationStringency(mValidationStringency);
|
||||
mNextRecord.setCoordinates(new Chunk(startCoordinate,stopCoordinate));
|
||||
|
||||
if (mValidationStringency != ValidationStringency.SILENT) {
|
||||
final List<SAMValidationError> validationErrors = mNextRecord.isValid();
|
||||
|
|
|
|||
|
|
@ -1,34 +0,0 @@
|
|||
package net.sf.samtools;
|
||||
|
||||
/**
|
||||
* Represents the position of a block on disk.
|
||||
*
|
||||
* @author mhanna
|
||||
* @version 0.1
|
||||
*/
|
||||
public class Block {
|
||||
public final long position;
|
||||
public final int compressedBlockSize;
|
||||
public final long uncompressedBlockSize;
|
||||
|
||||
/**
|
||||
* Create a block, loading no data into memory.
|
||||
* @param position Position of this block on disk.s
|
||||
* @param compressedBlockSize Size of the block on disk; if compressedData is present, should match compressedData.length.
|
||||
* @param uncompressedBlockSize Size of the data in the block.
|
||||
*/
|
||||
public Block(final long position, final int compressedBlockSize, final long uncompressedBlockSize) {
|
||||
this.position = position;
|
||||
this.compressedBlockSize = compressedBlockSize;
|
||||
this.uncompressedBlockSize = uncompressedBlockSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a string representation of the block.
|
||||
* @return A string indicating position and size.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Block: pos = %d, compressed size = %d, uncompressed size = %d",position,compressedBlockSize,uncompressedBlockSize);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,104 +0,0 @@
|
|||
package net.sf.samtools;
|
||||
|
||||
import net.sf.samtools.util.BlockCompressedStreamConstants;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.FileInputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
/**
|
||||
* Read an individual block from the BGZF file.
|
||||
*
|
||||
* @author mhanna
|
||||
* @version 0.1
|
||||
*/
|
||||
public class BlockReader {
|
||||
/**
|
||||
* File channel from which to read.
|
||||
*/
|
||||
private final FileChannel channel;
|
||||
|
||||
/**
|
||||
* Store buffered information from the file temporarily.
|
||||
*/
|
||||
private final ByteBuffer buffer;
|
||||
|
||||
/**
|
||||
* Create a new block reader. Block readers can operate independently on the same input file.
|
||||
* @param inputStream InputStream from which to read.
|
||||
*/
|
||||
public BlockReader(final FileInputStream inputStream) {
|
||||
this.channel = inputStream.getChannel();
|
||||
this.buffer = ByteBuffer.allocateDirect(BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the block reader's channel.
|
||||
* @throws IOException On failure to close channel.
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
this.channel.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the given offset is at the end of the file.
|
||||
* @param position Position at which to start reading. Must be at the beginning of a block.
|
||||
* @return True if we're at the end of the file (or at the end of the data stream), false otherwise.
|
||||
* @throws IOException If whether the position is past the end of the file can't be determined.
|
||||
*/
|
||||
public boolean eof(long position) throws IOException {
|
||||
return (channel.size() - position) <= 0 ||
|
||||
(channel.size() - position) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the chunk information at a given position.
|
||||
* @param position Position at which the chunk begins.
|
||||
* @return Chunk of the BGZF file, starting at position.
|
||||
* @throws IOException if the chunk could not be read.
|
||||
*/
|
||||
public Block getBlockAt(long position) throws IOException {
|
||||
buffer.rewind();
|
||||
buffer.limit(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
|
||||
int count = channel.read(buffer,position);
|
||||
if (count == 0) {
|
||||
throw new IOException("Tried to read chunk past end of file");
|
||||
}
|
||||
if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) {
|
||||
throw new IOException("Premature end of file");
|
||||
}
|
||||
buffer.flip();
|
||||
|
||||
// Validate GZIP header
|
||||
if (buffer.get() != BlockCompressedStreamConstants.GZIP_ID1 ||
|
||||
buffer.get() != (byte)BlockCompressedStreamConstants.GZIP_ID2 ||
|
||||
buffer.get() != BlockCompressedStreamConstants.GZIP_CM_DEFLATE ||
|
||||
buffer.get() != BlockCompressedStreamConstants.GZIP_FLG) {
|
||||
throw new SAMFormatException("Invalid GZIP header");
|
||||
}
|
||||
// Skip MTIME, XFL, OS fields
|
||||
buffer.position(buffer.position() + 6);
|
||||
if (buffer.getShort() != BlockCompressedStreamConstants.GZIP_XLEN) {
|
||||
throw new SAMFormatException("Invalid GZIP header");
|
||||
}
|
||||
// Skip blocksize subfield intro
|
||||
buffer.position(buffer.position() + 4);
|
||||
// Read ushort
|
||||
final int compressedBlockSize = (buffer.getShort() & 0xffff) + 1;
|
||||
if (compressedBlockSize < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || compressedBlockSize > buffer.capacity()) {
|
||||
throw new IOException("Unexpected compressed block length: " + compressedBlockSize);
|
||||
}
|
||||
|
||||
// Read the uncompressed block size
|
||||
buffer.rewind();
|
||||
buffer.limit(4);
|
||||
channel.read(buffer,position+compressedBlockSize-4);
|
||||
buffer.flip();
|
||||
final int uncompressedBlockSize = buffer.getInt();
|
||||
|
||||
return new Block(position,compressedBlockSize,uncompressedBlockSize);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,112 +0,0 @@
|
|||
package net.sf.samtools;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* Represents a segment of a given block. A block segment can easily be
|
||||
* converted or compared to either a block or a chunk.
|
||||
*/
|
||||
class BlockSegment {
|
||||
/**
|
||||
* Offset to the start of this block within the file.
|
||||
*/
|
||||
private final long position;
|
||||
|
||||
/**
|
||||
* Starting coordinate within this segment.
|
||||
*/
|
||||
private final long blockStart;
|
||||
|
||||
/**
|
||||
* Last coordinate within this segment.
|
||||
*/
|
||||
private final long blockStop;
|
||||
|
||||
/**
|
||||
* Create a new block segment from the given block.
|
||||
* @param block the block to use as the starting point for this segment.
|
||||
*/
|
||||
public BlockSegment(Block block) {
|
||||
this(block.position,0,block.uncompressedBlockSize-1);
|
||||
}
|
||||
|
||||
BlockSegment(final long position,final long blockStart,final long blockStop) {
|
||||
this.position = position;
|
||||
this.blockStart = blockStart;
|
||||
this.blockStop = blockStop;
|
||||
}
|
||||
|
||||
/**
|
||||
* The size of the block segment.
|
||||
* @return Number of bytes represented by this block segment.
|
||||
*/
|
||||
public long size() {
|
||||
return blockStop - blockStart + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the given segment to a chunk.
|
||||
* @return the chunk equivalent of this block.
|
||||
*/
|
||||
public Chunk toChunk() {
|
||||
return new Chunk(position << 16 | blockStart,position << 16 | blockStop);
|
||||
}
|
||||
|
||||
/**
|
||||
* Does this block segment overlap the given chunk?
|
||||
* @param chunk The chunk to test.
|
||||
* @return true if the chunk is w/i the block; false otherwise.
|
||||
*/
|
||||
public boolean overlaps(Chunk chunk) {
|
||||
Chunk segmentAsChunk = toChunk();
|
||||
|
||||
// Chunk is completely encompasses the given block.
|
||||
if(chunk.getChunkStart() < segmentAsChunk.getChunkStart() &&
|
||||
chunk.getChunkEnd() > segmentAsChunk.getChunkEnd())
|
||||
return true;
|
||||
|
||||
// Chunk starts w/i block.
|
||||
if(chunk.getChunkStart() >= segmentAsChunk.getChunkStart() &&
|
||||
chunk.getChunkStart() <= segmentAsChunk.getChunkEnd())
|
||||
return true;
|
||||
|
||||
// Chunk ends w/i block.
|
||||
if(chunk.getChunkEnd() >= segmentAsChunk.getChunkStart() &&
|
||||
chunk.getChunkEnd() <= segmentAsChunk.getChunkEnd())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subtracts the given chunk from the block segment.
|
||||
* @param chunk The chunk to subtract.
|
||||
* @return The block segment(s) derived from subtracting the chunk.
|
||||
*/
|
||||
public List<BlockSegment> minus(Chunk chunk) {
|
||||
// If no overlap exists, just return a new instance of the current block segment.
|
||||
if(!overlaps(chunk))
|
||||
return Collections.singletonList(this);
|
||||
|
||||
List<BlockSegment> differenceSegments = new ArrayList<BlockSegment>();
|
||||
Chunk segmentAsChunk = toChunk();
|
||||
|
||||
// If the segment begins before the chunk, build out a segment from the start of the block to just before
|
||||
// the start of the chunk or the end of the block segment, whichever comes first.
|
||||
if(segmentAsChunk.getChunkStart() < chunk.getChunkStart()) {
|
||||
long segmentStop = (position == chunk.getChunkStart()>>16) ? (chunk.getChunkStart()&0xFFFF)-1 : blockStop;
|
||||
differenceSegments.add(new BlockSegment(position,blockStart,segmentStop));
|
||||
}
|
||||
|
||||
// If the segment ends after the chunk, build out a segment from either after the end of the chunk or the
|
||||
// start of the block segment, whichever comes last.
|
||||
if(segmentAsChunk.getChunkEnd() > chunk.getChunkEnd()) {
|
||||
long segmentStart = (position == chunk.getChunkEnd()>>16) ? (chunk.getChunkEnd()&0xFFFF)+1 : blockStart;
|
||||
differenceSegments.add(new BlockSegment(position,segmentStart,blockStop));
|
||||
}
|
||||
|
||||
return differenceSegments;
|
||||
}
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ public class Chunk implements Cloneable,Comparable<Chunk> {
|
|||
private long mChunkStart;
|
||||
private long mChunkEnd;
|
||||
|
||||
Chunk(final long start, final long end) {
|
||||
public Chunk(final long start, final long end) {
|
||||
mChunkStart = start;
|
||||
mChunkEnd = end;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,6 +73,7 @@ public class SAMFileReader2 extends SAMFileReader {
|
|||
|
||||
try {
|
||||
BAMFileReader2 reader = new BAMFileReader2(file,eagerDecode,getDefaultValidationStringency());
|
||||
reader.setReader(this);
|
||||
BAMFileIndex2 index = new BAMFileIndex2(indexFile != null ? indexFile : findIndexFileFromParent(file));
|
||||
reader.setFileIndex(index);
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -9,6 +9,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||
|
||||
/**
|
||||
* A common interface for shards that natively understand the BAM format.
|
||||
|
|
@ -21,7 +22,7 @@ public interface BAMFormatAwareShard extends Shard {
|
|||
* Get the list of chunks delimiting this shard.
|
||||
* @return a list of chunks that contain data for this shard.
|
||||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getChunks();
|
||||
public Map<SAMReaderID,List<Chunk>> getChunks();
|
||||
|
||||
/**
|
||||
* Returns true if this shard is meant to buffer reads, rather
|
||||
|
|
@ -30,6 +31,12 @@ public interface BAMFormatAwareShard extends Shard {
|
|||
*/
|
||||
public boolean buffersReads();
|
||||
|
||||
/**
|
||||
* Checks to see whether the buffer is empty.
|
||||
* @return True if the buffer is empty.
|
||||
*/
|
||||
public boolean isBufferEmpty();
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import org.broadinstitute.sting.utils.StingException;
|
|||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
|
||||
import org.broadinstitute.sting.gatk.Reads;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||
|
||||
/**
|
||||
* Expresses a shard of read data in block format.
|
||||
|
|
@ -27,7 +28,7 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
/**
|
||||
* The data backing the next chunks to deliver to the traversal engine.
|
||||
*/
|
||||
private final Map<SAMFileReader2,List<Chunk>> chunks;
|
||||
private final Map<SAMReaderID,List<Chunk>> chunks;
|
||||
|
||||
/**
|
||||
* The reads making up this shard.
|
||||
|
|
@ -45,7 +46,7 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
*/
|
||||
private final Shard.ShardType shardType;
|
||||
|
||||
public BlockDelimitedReadShard(Reads sourceInfo, Map<SAMFileReader2,List<Chunk>> chunks, SamRecordFilter filter, Shard.ShardType shardType) {
|
||||
public BlockDelimitedReadShard(Reads sourceInfo, Map<SAMReaderID,List<Chunk>> chunks, SamRecordFilter filter, Shard.ShardType shardType) {
|
||||
this.sourceInfo = sourceInfo;
|
||||
this.chunks = chunks;
|
||||
this.filter = filter;
|
||||
|
|
@ -61,6 +62,14 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
*/
|
||||
public boolean isBufferEmpty() {
|
||||
return reads.size() == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
|
|
@ -95,7 +104,7 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
* Get the list of chunks delimiting this shard.
|
||||
* @return a list of chunks that contain data for this shard.
|
||||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getChunks() {
|
||||
public Map<SAMReaderID,List<Chunk>> getChunks() {
|
||||
return Collections.unmodifiableMap(chunks);
|
||||
}
|
||||
|
||||
|
|
@ -114,7 +123,7 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(Map.Entry<SAMFileReader2,List<Chunk>> entry: chunks.entrySet()) {
|
||||
for(Map.Entry<SAMReaderID,List<Chunk>> entry: chunks.entrySet()) {
|
||||
sb.append(entry.getKey());
|
||||
sb.append(": ");
|
||||
for(Chunk chunk : entry.getValue()) {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import org.broadinstitute.sting.utils.StingException;
|
|||
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||
|
||||
/**
|
||||
* A read shard strategy that delimits based on the number of
|
||||
|
|
@ -28,18 +29,15 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
*/
|
||||
protected final BlockDrivenSAMDataSource dataSource;
|
||||
|
||||
private Shard nextShard = null;
|
||||
|
||||
/** our storage of the genomic locations they'd like to shard over */
|
||||
private final List<FilePointer> filePointers = new ArrayList<FilePointer>();
|
||||
|
||||
/**
|
||||
* Position of the last shard in the file.
|
||||
*/
|
||||
private Map<SAMFileReader2,Chunk> position;
|
||||
|
||||
/**
|
||||
* True if the set of SAM readers is at the end of the stream. False otherwise.
|
||||
*/
|
||||
private boolean atEndOfStream = false;
|
||||
private Map<SAMReaderID,Chunk> position;
|
||||
|
||||
/**
|
||||
* Create a new read shard strategy, loading read shards from the given BAM file.
|
||||
|
|
@ -53,6 +51,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
this.position = this.dataSource.getCurrentPosition();
|
||||
if(locations != null)
|
||||
filePointers.addAll(IntervalSharder.shardIntervals(this.dataSource,locations.toList(),this.dataSource.getNumIndexLevels()-1));
|
||||
advance();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -60,7 +59,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
* @return True if any more data is available. False otherwise.
|
||||
*/
|
||||
public boolean hasNext() {
|
||||
return !atEndOfStream;
|
||||
return nextShard != null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -70,48 +69,61 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
*/
|
||||
public Shard next() {
|
||||
if(!hasNext())
|
||||
throw new NoSuchElementException("No such element available: SAM reader has arrived at last shard.");
|
||||
throw new NoSuchElementException("No next read shard available");
|
||||
Shard currentShard = nextShard;
|
||||
advance();
|
||||
return currentShard;
|
||||
}
|
||||
|
||||
Map<SAMFileReader2,List<Chunk>> shardPosition = null;
|
||||
public void advance() {
|
||||
Map<SAMReaderID,List<Chunk>> shardPosition = null;
|
||||
nextShard = null;
|
||||
SamRecordFilter filter = null;
|
||||
|
||||
if(!filePointers.isEmpty()) {
|
||||
boolean foundData = false;
|
||||
for(FilePointer filePointer: filePointers) {
|
||||
shardPosition = dataSource.getFilePointersBounding(filePointer.bin);
|
||||
for(SAMFileReader2 reader: shardPosition.keySet()) {
|
||||
List<Chunk> chunks = shardPosition.get(reader);
|
||||
Chunk filePosition = position.get(reader);
|
||||
for(Chunk chunk: chunks) {
|
||||
if(filePosition.getChunkStart() > chunk.getChunkEnd())
|
||||
chunks.remove(chunk);
|
||||
else {
|
||||
if(filePosition.getChunkStart() > chunk.getChunkStart())
|
||||
chunk.setChunkStart(filePosition.getChunkStart());
|
||||
foundData = true;
|
||||
}
|
||||
|
||||
Map<SAMReaderID,List<Chunk>> selectedReaders = new HashMap<SAMReaderID,List<Chunk>>();
|
||||
for(SAMReaderID id: shardPosition.keySet()) {
|
||||
List<Chunk> chunks = shardPosition.get(id);
|
||||
List<Chunk> selectedChunks = new ArrayList<Chunk>();
|
||||
Chunk filePosition = position.get(id);
|
||||
for(Chunk chunk: chunks)
|
||||
if(filePosition.getChunkStart() <= chunk.getChunkStart())
|
||||
selectedChunks.add(chunk);
|
||||
else if(filePosition.getChunkStart() > chunk.getChunkStart() && filePosition.getChunkStart() < chunk.getChunkEnd()) {
|
||||
selectedChunks.add(new Chunk(filePosition.getChunkStart(),chunk.getChunkEnd()));
|
||||
}
|
||||
if(selectedChunks.size() > 0)
|
||||
selectedReaders.put(id,selectedChunks);
|
||||
}
|
||||
if(foundData) {
|
||||
filter = new ReadOverlapFilter(filePointer.locations);
|
||||
break;
|
||||
if(selectedReaders.size() > 0) {
|
||||
filter = new ReadOverlapFilter(filePointer.locations);
|
||||
|
||||
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),selectedReaders,filter,Shard.ShardType.READ);
|
||||
dataSource.fillShard(shard);
|
||||
|
||||
if(!shard.isBufferEmpty()) {
|
||||
nextShard = shard;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
// TODO: This level of processing should not be necessary.
|
||||
shardPosition = new HashMap<SAMFileReader2,List<Chunk>>();
|
||||
for(Map.Entry<SAMFileReader2,Chunk> entry: position.entrySet())
|
||||
shardPosition = new HashMap<SAMReaderID,List<Chunk>>();
|
||||
for(Map.Entry<SAMReaderID,Chunk> entry: position.entrySet())
|
||||
shardPosition.put(entry.getKey(),Collections.singletonList(entry.getValue()));
|
||||
filter = null;
|
||||
|
||||
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),shardPosition,filter,Shard.ShardType.READ);
|
||||
dataSource.fillShard(shard);
|
||||
nextShard = !shard.isBufferEmpty() ? shard : null;
|
||||
}
|
||||
|
||||
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),shardPosition,filter,Shard.ShardType.READ);
|
||||
atEndOfStream = dataSource.fillShard(shard);
|
||||
|
||||
this.position = dataSource.getCurrentPosition();
|
||||
|
||||
return shard;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -128,4 +140,5 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
|||
public Iterator<Shard> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package org.broadinstitute.sting.gatk.datasources.shards;
|
|||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||
import net.sf.samtools.Chunk;
|
||||
import net.sf.samtools.SAMFileReader2;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
|
|
@ -44,7 +45,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
|
|||
/**
|
||||
* A list of the chunks associated with this shard.
|
||||
*/
|
||||
private final Map<SAMFileReader2,List<Chunk>> chunks;
|
||||
private final Map<SAMReaderID,List<Chunk>> chunks;
|
||||
|
||||
/**
|
||||
* An IndexDelimitedLocusShard can be used either for LOCUS or LOCUS_INTERVAL shard types.
|
||||
|
|
@ -58,7 +59,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
|
|||
* @param chunks Chunks associated with that interval.
|
||||
* @param shardType Type of the shard; must be either LOCUS or LOCUS_INTERVAL.
|
||||
*/
|
||||
IndexDelimitedLocusShard(List<GenomeLoc> intervals, Map<SAMFileReader2,List<Chunk>> chunks, ShardType shardType) {
|
||||
IndexDelimitedLocusShard(List<GenomeLoc> intervals, Map<SAMReaderID,List<Chunk>> chunks, ShardType shardType) {
|
||||
super(intervals);
|
||||
this.chunks = chunks;
|
||||
if(shardType != ShardType.LOCUS && shardType != ShardType.LOCUS_INTERVAL)
|
||||
|
|
@ -70,7 +71,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
|
|||
* Gets the chunks associated with this locus shard.
|
||||
* @return A list of the chunks to use when retrieving locus data.
|
||||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getChunks() {
|
||||
public Map<SAMReaderID,List<Chunk>> getChunks() {
|
||||
return chunks;
|
||||
}
|
||||
|
||||
|
|
@ -81,6 +82,12 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
|
|||
*/
|
||||
public boolean buffersReads() { return false; }
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
*/
|
||||
public boolean isBufferEmpty() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
/**
|
||||
* Returns true if the read buffer is currently full.
|
||||
* @return True if this shard's buffer is full (and the shard can buffer reads).
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import org.broadinstitute.sting.utils.GenomeLoc;
|
|||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
|
@ -110,7 +111,7 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
|
|||
*/
|
||||
public IndexDelimitedLocusShard next() {
|
||||
FilePointer nextFilePointer = filePointerIterator.next();
|
||||
Map<SAMFileReader2,List<Chunk>> chunksBounding = dataSource!=null ? dataSource.getFilePointersBounding(nextFilePointer.bin) : null;
|
||||
Map<SAMReaderID,List<Chunk>> chunksBounding = dataSource!=null ? dataSource.getFilePointersBounding(nextFilePointer.bin) : null;
|
||||
return new IndexDelimitedLocusShard(nextFilePointer.locations,chunksBounding,Shard.ShardType.LOCUS_INTERVAL);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,6 +43,11 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
*/
|
||||
private final Map<SAMReaderID,ReadGroupMapping> mergedReadGroupMappings = new HashMap<SAMReaderID,ReadGroupMapping>();
|
||||
|
||||
/**
|
||||
* How far along is each reader?
|
||||
*/
|
||||
private final Map<SAMReaderID,Chunk> readerPositions = new HashMap<SAMReaderID,Chunk>();
|
||||
|
||||
/**
|
||||
* Create a new block-aware SAM data source given the supplied read metadata.
|
||||
* @param reads The read metadata.
|
||||
|
|
@ -55,10 +60,13 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
|
||||
initializeReaderPositions(readers);
|
||||
|
||||
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
|
||||
mergedHeader = headerMerger.getMergedHeader();
|
||||
hasReadGroupCollisions = headerMerger.hasReadGroupCollisions();
|
||||
|
||||
// cache the read group id (original) -> read group id (merged) mapping.
|
||||
for(SAMReaderID id: readerIDs) {
|
||||
SAMFileReader reader = readers.getReader(id);
|
||||
ReadGroupMapping mapping = new ReadGroupMapping();
|
||||
|
|
@ -109,15 +117,15 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
* @param bin The bin for which to load data.
|
||||
* @return A map of the file pointers bounding the bin.
|
||||
*/
|
||||
public Map<SAMFileReader2,List<Chunk>> getFilePointersBounding(Bin bin) {
|
||||
public Map<SAMReaderID,List<Chunk>> getFilePointersBounding(Bin bin) {
|
||||
SAMReaders readers = resourcePool.getReadersWithoutLocking();
|
||||
Map<SAMFileReader2,List<Chunk>> filePointers = new HashMap<SAMFileReader2,List<Chunk>>();
|
||||
for(SAMFileReader reader: readers) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)reader;
|
||||
Map<SAMReaderID,List<Chunk>> filePointers = new HashMap<SAMReaderID,List<Chunk>>();
|
||||
for(SAMReaderID id: getReaderIDs()) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)readers.getReader(id);
|
||||
if(bin != null)
|
||||
filePointers.put(reader2,reader2.getFilePointersBounding(bin));
|
||||
filePointers.put(id,reader2.getFilePointersBounding(bin));
|
||||
else
|
||||
filePointers.put(reader2,Collections.<Chunk>emptyList());
|
||||
filePointers.put(id,Collections.<Chunk>emptyList());
|
||||
}
|
||||
return filePointers;
|
||||
}
|
||||
|
|
@ -126,14 +134,8 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
* Retrieves the current position within the BAM file.
|
||||
* @return A mapping of reader to current position.
|
||||
*/
|
||||
public Map<SAMFileReader2,Chunk> getCurrentPosition() {
|
||||
SAMReaders readers = resourcePool.getReadersWithoutLocking();
|
||||
Map<SAMFileReader2,Chunk> currentPositions = new HashMap<SAMFileReader2,Chunk>();
|
||||
for(SAMFileReader reader: readers) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)reader;
|
||||
currentPositions.put(reader2,reader2.getCurrentPosition());
|
||||
}
|
||||
return currentPositions;
|
||||
public Map<SAMReaderID,Chunk> getCurrentPosition() {
|
||||
return readerPositions;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -200,7 +202,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
* @param shard Shard to fill.
|
||||
* @return true if at the end of the stream. False otherwise.
|
||||
*/
|
||||
public boolean fillShard(BAMFormatAwareShard shard) {
|
||||
public void fillShard(BAMFormatAwareShard shard) {
|
||||
if(!shard.buffersReads())
|
||||
throw new StingException("Attempting to fill a non-buffering shard.");
|
||||
|
||||
|
|
@ -208,17 +210,40 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
// know why this is. Please add a comment here if you do.
|
||||
boolean enableVerification = shard instanceof ReadShard;
|
||||
|
||||
CloseableIterator<SAMRecord> iterator = getIterator(shard,enableVerification);
|
||||
while(!shard.isBufferFull() && iterator.hasNext())
|
||||
shard.addRead(iterator.next());
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
|
||||
boolean atEndOfStream = !iterator.hasNext();
|
||||
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,enableVerification);
|
||||
while(!shard.isBufferFull() && iterator.hasNext()) {
|
||||
SAMRecord read = iterator.next();
|
||||
Chunk endChunk = new Chunk(read.getCoordinates().getChunkEnd(),Long.MAX_VALUE);
|
||||
shard.addRead(read);
|
||||
readerPositions.put(getReaderID(readers,read),endChunk);
|
||||
}
|
||||
|
||||
iterator.close();
|
||||
|
||||
return atEndOfStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the reader associated with the given read.
|
||||
* @param readers Available readers.
|
||||
* @param read
|
||||
* @return
|
||||
*/
|
||||
private SAMReaderID getReaderID(SAMReaders readers, SAMRecord read) {
|
||||
for(SAMReaderID id: getReaderIDs()) {
|
||||
if(readers.getReader(id) == read.getReader())
|
||||
return id;
|
||||
}
|
||||
throw new StingException("Unable to find id for reader associated with read " + read.getReadName());
|
||||
}
|
||||
|
||||
private void initializeReaderPositions(SAMReaders readers) {
|
||||
for(SAMReaderID id: getReaderIDs()) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)readers.getReader(id);
|
||||
readerPositions.put(id,reader2.getCurrentPosition());
|
||||
}
|
||||
}
|
||||
|
||||
public StingSAMIterator seek(Shard shard) {
|
||||
if(!(shard instanceof BAMFormatAwareShard))
|
||||
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
|
||||
|
|
@ -228,20 +253,19 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
return bamAwareShard.iterator();
|
||||
}
|
||||
else {
|
||||
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
|
||||
// know why this is. Please add a comment here if you do.
|
||||
boolean enableVerification = shard instanceof ReadShard;
|
||||
return getIterator(bamAwareShard,enableVerification);
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
|
||||
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards, because
|
||||
//
|
||||
return getIterator(readers,bamAwareShard,shard instanceof ReadShard);
|
||||
}
|
||||
}
|
||||
|
||||
private StingSAMIterator getIterator(BAMFormatAwareShard shard, boolean enableVerification) {
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
|
||||
private StingSAMIterator getIterator(SAMReaders readers, BAMFormatAwareShard shard, boolean enableVerification) {
|
||||
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
|
||||
for(SAMFileReader reader: readers) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)reader;
|
||||
List<Chunk> chunks = shard.getChunks().get(reader2);
|
||||
for(SAMReaderID id: getReaderIDs()) {
|
||||
SAMFileReader2 reader2 = (SAMFileReader2)readers.getReader(id);
|
||||
List<Chunk> chunks = shard.getChunks().get(id);
|
||||
readerToIteratorMap.put(reader2,reader2.iterator(chunks));
|
||||
}
|
||||
|
||||
|
|
@ -372,6 +396,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
/**
|
||||
* Retrieve the reader from the data structure.
|
||||
* @param id The ID of the reader to retrieve.
|
||||
* @return the reader associated with the given id.
|
||||
*/
|
||||
public SAMFileReader getReader(SAMReaderID id) {
|
||||
if(!readers.containsKey(id))
|
||||
|
|
@ -379,17 +404,6 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
|
|||
return readers.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to get the header associated with an individual ID.
|
||||
* @param id ID for which to retrieve the header.
|
||||
* @return Header for this SAM file.
|
||||
*/
|
||||
public SAMFileHeader getHeader(SAMReaderID id) {
|
||||
if(!readers.containsKey(id))
|
||||
throw new NoSuchElementException("No reader is associated with id " + id);
|
||||
return readers.get(id).getFileHeader();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an iterator over all readers in this structure.
|
||||
* @return An iterator over readers.
|
||||
|
|
|
|||
|
|
@ -4,12 +4,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import net.sf.samtools.AlignmentBlock;
|
||||
import net.sf.samtools.Cigar;
|
||||
import net.sf.samtools.SAMFileHeader;
|
||||
import net.sf.samtools.SAMReadGroupRecord;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import net.sf.samtools.SAMValidationError;
|
||||
import net.sf.samtools.*;
|
||||
|
||||
/**
|
||||
* @author ebanks
|
||||
|
|
@ -332,6 +327,8 @@ public class GATKSAMRecord extends SAMRecord {
|
|||
|
||||
public void setValidationStringency(net.sf.samtools.SAMFileReader.ValidationStringency validationStringency) { mRecord.setValidationStringency(validationStringency); }
|
||||
|
||||
public SAMFileReader getReader() { return mRecord.getReader(); }
|
||||
|
||||
public SAMFileHeader getHeader() { return mRecord.getHeader(); }
|
||||
|
||||
public void setHeader(SAMFileHeader samFileHeader) { mRecord.setHeader(samFileHeader); }
|
||||
|
|
@ -355,4 +352,6 @@ public class GATKSAMRecord extends SAMRecord {
|
|||
public Object clone() throws CloneNotSupportedException { return mRecord.clone(); }
|
||||
|
||||
public String toString() { return mRecord.toString(); }
|
||||
|
||||
public Chunk getCoordinates() { return mRecord.getCoordinates(); }
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue