diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java index e377f865d..a95eb7b8e 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java @@ -31,13 +31,13 @@ import net.sf.samtools.util.BlockCompressedFilePointerUtil; import net.sf.samtools.util.BlockCompressedInputStream; import net.sf.samtools.util.RuntimeEOFException; import net.sf.samtools.util.SeekableStream; -import org.broad.tribble.util.BlockCompressedStreamConstants; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; +import java.util.Iterator; import java.util.LinkedList; /** @@ -120,6 +120,12 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { // TODO: Kill the region when all we want to do is start at the beginning of the stream and run to the end of the stream. this.position = new SAMReaderPosition(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE))); + // The block offsets / block positions guarantee that the ending offset/position in the data structure maps to + // the point in the file just following the last read. These two arrays should never be empty; initializing + // to 0 to match the position above. + this.blockOffsets.add(0); + this.blockPositions.add(0L); + try { if(validate) { System.out.printf("BlockInputStream %s: BGZF block validation mode activated%n",this); @@ -143,34 +149,52 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { public long getFilePointer() { long filePointer; synchronized(lock) { - if(buffer.remaining() > 0) { - // If there's data in the buffer, figure out from whence it came. - final long blockAddress = blockPositions.size() > 0 ? blockPositions.get(0) : 0; - final int blockOffset = buffer.position(); - filePointer = blockAddress << 16 | blockOffset; - } - else { - // Otherwise, find the next position to load. - filePointer = position.getBlockAddress() << 16; - } + // Find the current block within the input stream. + int blockIndex; + for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() >= blockOffsets.get(blockIndex + 1); blockIndex++) + ; + filePointer = blockPositions.get(blockIndex) + (buffer.position()-blockOffsets.get(blockIndex)); } - if(validatingInputStream != null && filePointer != validatingInputStream.getFilePointer()) - throw new ReviewedStingException(String.format("Position of input stream is invalid; expected (block address, block offset) = (%d,%d), got (%d,%d)", - BlockCompressedFilePointerUtil.getBlockAddress(filePointer),BlockCompressedFilePointerUtil.getBlockOffset(filePointer), - BlockCompressedFilePointerUtil.getBlockAddress(validatingInputStream.getFilePointer()),BlockCompressedFilePointerUtil.getBlockOffset(validatingInputStream.getFilePointer()))); +// if(validatingInputStream != null && filePointer != validatingInputStream.getFilePointer()) +// throw new ReviewedStingException(String.format("Position of input stream is invalid; expected (block address, block offset) = (%d,%d), got (%d,%d)", +// BlockCompressedFilePointerUtil.getBlockAddress(validatingInputStream.getFilePointer()),BlockCompressedFilePointerUtil.getBlockOffset(validatingInputStream.getFilePointer()), +// BlockCompressedFilePointerUtil.getBlockAddress(filePointer),BlockCompressedFilePointerUtil.getBlockOffset(filePointer))); return filePointer; } public void seek(long target) { - // TODO: Validate the seek point. //System.out.printf("Thread %s, BlockInputStream %s: seeking to block %d, offset %d%n",Thread.currentThread().getId(),this,BlockCompressedFilePointerUtil.getBlockAddress(target),BlockCompressedFilePointerUtil.getBlockOffset(target)); synchronized(lock) { clearBuffers(); - position.advancePosition(BlockCompressedFilePointerUtil.getBlockAddress(target)); - waitForBufferFill(); - buffer.position(BlockCompressedFilePointerUtil.getBlockOffset(target)); + + // Ensure that the position filled in by submitAccessPlan() is in sync with the seek target just specified. + position.advancePosition(target); + + // If the position advances past the end of the target, that must mean that we seeked to a point at the end + // of one of the chunk list's subregions. Make a note of our current position and punt on loading any data. + if(target < position.getBlockAddress() << 16) { + blockOffsets.clear(); + blockOffsets.add(0); + blockPositions.clear(); + blockPositions.add(target); + } + else { + waitForBufferFill(); + // A buffer fill will load the relevant data from the shard, but the buffer position still needs to be + // advanced as appropriate. + Iterator blockOffsetIterator = blockOffsets.descendingIterator(); + Iterator blockPositionIterator = blockPositions.descendingIterator(); + while(blockOffsetIterator.hasNext() && blockPositionIterator.hasNext()) { + final int blockOffset = blockOffsetIterator.next(); + final long blockPosition = blockPositionIterator.next(); + if((blockPosition >> 16) == (target >> 16) && (blockPosition&0xFFFF) < (target&0xFFFF)) { + buffer.position(blockOffset + (int)(target&0xFFFF)-(int)(blockPosition&0xFFFF)); + break; + } + } + } if(validatingInputStream != null) { try { @@ -191,14 +215,17 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { buffer.clear(); buffer.limit(0); + // Clear everything except the last block offset / position blockOffsets.clear(); - blockPositions.clear(); + blockOffsets.add(0); + while(blockPositions.size() > 1) + blockPositions.removeFirst(); } public boolean eof() { synchronized(lock) { // TODO: Handle multiple empty BGZF blocks at end of the file. - return position != null && position.getBlockAddress() >= length; + return position != null && (position.getBlockAddress() < 0 || position.getBlockAddress() >= length); } } @@ -216,7 +243,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { // Assume that the access plan is going to tell us to start where we are and move forward. // If this isn't the case, we'll soon receive a seek request and the buffer will be forced to reset. if(this.position != null && position.getBlockAddress() < this.position.getBlockAddress()) - position.advancePosition(this.position.getBlockAddress()); + position.advancePosition(this.position.getBlockAddress() << 16); } this.position = position; } @@ -225,14 +252,15 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { // Compact buffer to maximize storage space. int bytesToRemove = 0; - // Look ahead to see if we can compact away the first block in the series. - while(blockOffsets.size() > 1 && buffer.position() < blockOffsets.get(1)) { - bytesToRemove += blockOffsets.remove(); + // Look ahead to see if we can compact away the first blocks in the series. + while(blockOffsets.size() > 1 && buffer.position() >= blockOffsets.get(1)) { + blockOffsets.remove(); blockPositions.remove(); + bytesToRemove = blockOffsets.peek(); } // If we end up with an empty block at the end of the series, compact this as well. - if(buffer.remaining() == 0 && !blockOffsets.isEmpty() && buffer.position() >= blockOffsets.peek()) { + if(buffer.remaining() == 0 && blockOffsets.size() > 1 && buffer.position() >= blockOffsets.peek()) { bytesToRemove += buffer.position(); blockOffsets.remove(); blockPositions.remove(); @@ -241,11 +269,17 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { int finalBufferStart = buffer.position() - bytesToRemove; int finalBufferSize = buffer.remaining(); + // Position the buffer to remove the unneeded data, and compact it away. buffer.position(bytesToRemove); buffer.compact(); + // Reset the limits for reading. buffer.position(finalBufferStart); buffer.limit(finalBufferStart+finalBufferSize); + + // Shift everything in the offset buffer down to accommodate the bytes removed from the buffer. + for(int i = 0; i < blockOffsets.size(); i++) + blockOffsets.set(i,blockOffsets.get(i)-bytesToRemove); } /** @@ -253,6 +287,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { * MUST be called from a thread that is NOT the reader thread. * @param incomingBuffer The data being pushed into this input stream. * @param position target position for the data. + * @param filePosition the current position of the file pointer */ public void copyIntoBuffer(final ByteBuffer incomingBuffer, final SAMReaderPosition position, final long filePosition) { synchronized(lock) { @@ -262,7 +297,12 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { buffer.limit(buffer.capacity()); // Advance the position to take the most recent read into account. - long lastReadPosition = position.getBlockAddress(); + final long lastBlockAddress = position.getBlockAddress(); + final int blockOffsetStart = position.getFirstOffsetInBlock(); + final int blockOffsetEnd = position.getLastOffsetInBlock(); + + // Where did this read end? It either ended in the middle of a block (for a bounding chunk) or it ended at the start of the next block. + final long endOfRead = (blockOffsetEnd < incomingBuffer.remaining()) ? (lastBlockAddress << 16) | blockOffsetEnd : filePosition << 16; byte[] validBytes = null; if(validatingInputStream != null) { @@ -277,7 +317,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { incomingBuffer.position(pos); long currentFilePointer = validatingInputStream.getFilePointer(); - validatingInputStream.seek(lastReadPosition << 16); + validatingInputStream.seek(lastBlockAddress << 16); validatingInputStream.read(validBytes); validatingInputStream.seek(currentFilePointer); @@ -286,7 +326,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { } this.position = position; - position.advancePosition(filePosition); + position.advancePosition(filePosition << 16); if(buffer.remaining() < incomingBuffer.remaining()) { //System.out.printf("Thread %s: waiting for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n",Thread.currentThread().getId(),buffer.remaining(),incomingBuffer.remaining()); @@ -294,12 +334,21 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { //System.out.printf("Thread %s: waited for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n", Thread.currentThread().getId(), buffer.remaining(), incomingBuffer.remaining()); } - // Queue list of block offsets / block positions. + // Remove the last position in the list and add in the last read position, in case the two are different. + blockOffsets.removeLast(); blockOffsets.add(buffer.position()); - blockPositions.add(lastReadPosition); + blockPositions.removeLast(); + blockPositions.add(lastBlockAddress << 16 | blockOffsetStart); + // Stream the buffer into the data stream. + incomingBuffer.position(blockOffsetStart); + incomingBuffer.limit(Math.min(incomingBuffer.limit(),blockOffsetEnd)); buffer.put(incomingBuffer); + // Then, add the last position read to the very end of the list, just past the end of the last buffer. + blockOffsets.add(buffer.position()); + blockPositions.add(endOfRead); + // Set up the buffer for reading. buffer.flip(); bufferFilled = true; @@ -380,23 +429,21 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { lock.notify(); } - if(validatingInputStream != null) { - byte[] validBytes = new byte[length]; - try { - validatingInputStream.read(validBytes,offset,length); - for(int i = offset; i < offset+length; i++) { - if(bytes[i] != validBytes[i]) { - System.out.printf("Thread %s: preparing to throw an exception because contents don't match%n",Thread.currentThread().getId()); - throw new ReviewedStingException(String.format("Thread %s: blockInputStream %s attempting to return wrong set of bytes; mismatch at offset %d",Thread.currentThread().getId(),this,i)); - } - } - } - catch(IOException ex) { - throw new ReviewedStingException("Unable to validate against Picard input stream",ex); - } - } +// if(validatingInputStream != null) { +// byte[] validBytes = new byte[length]; +// try { +// validatingInputStream.read(validBytes,offset,length); +// for(int i = offset; i < offset+length; i++) { +// if(bytes[i] != validBytes[i]) +// throw new ReviewedStingException(String.format("Thread %s: blockInputStream %s attempting to return wrong set of bytes; mismatch at offset %d",Thread.currentThread().getId(),this,i)); +// } +// } +// catch(IOException ex) { +// throw new ReviewedStingException("Unable to validate against Picard input stream",ex); +// } +// } - return length - remaining; + return eof() ? -1 : length-remaining; } public void close() { @@ -424,7 +471,6 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { lock.wait(); } catch(InterruptedException ex) { - // TODO: handle me. throw new ReviewedStingException("Interrupt occurred waiting for buffer to fill",ex); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java index f9f6539a7..0a6173c1e 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java @@ -75,6 +75,22 @@ class SAMReaderPosition { return nextBlockAddress; } + /** + * Retrieves the first offset of interest in the block returned by getBlockAddress(). + * @return First block of interest in this segment. + */ + public int getFirstOffsetInBlock() { + return (nextBlockAddress == positionIterator.peek().getBlockStart()) ? positionIterator.peek().getBlockOffsetStart() : 0; + } + + /** + * Retrieves the last offset of interest in the block returned by getBlockAddress(). + * @return First block of interest in this segment. + */ + public int getLastOffsetInBlock() { + return (nextBlockAddress == positionIterator.peek().getBlockEnd()) ? positionIterator.peek().getBlockOffsetEnd() : 65536; + } + public void reset() { initialize(); } @@ -95,26 +111,27 @@ class SAMReaderPosition { * @param filePosition The current position within the file. */ void advancePosition(final long filePosition) { - nextBlockAddress = filePosition; + nextBlockAddress = filePosition >> 16; // Check the current file position against the iterator; if the iterator is before the current file position, // draw the iterator forward. Remember when performing the check that coordinates are half-open! - try { - while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) { - positionIterator.next(); - // Check to see if the iterator has more data available. - if(positionIterator.hasNext() && filePosition < positionIterator.peek().getBlockStart()) { - nextBlockAddress = positionIterator.peek().getBlockStart(); - break; - } + while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) { + positionIterator.next(); + + // If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block. + if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart()) { + nextBlockAddress = positionIterator.peek().getBlockStart(); + //System.out.printf("SAMReaderPosition: next block address advanced to %d%n",nextBlockAddress); + break; } } - catch(Exception ex) { - throw new ReviewedStingException(""); - } + + // If we've shot off the end of the block pointer, notify consumers that iteration is complete. + if(!positionIterator.hasNext()) + nextBlockAddress = -1; } private boolean isFilePositionPastEndOfChunk(final long filePosition, final GATKChunk chunk) { - return (filePosition > chunk.getBlockEnd() || (filePosition == chunk.getBlockEnd() && chunk.getBlockOffsetEnd() == 0)); + return filePosition >= chunk.getChunkEnd(); } }