Merged bug fix from Stable into Unstable

This commit is contained in:
Matt Hanna 2012-01-08 22:04:44 -05:00
commit fda1795791
2 changed files with 124 additions and 61 deletions

View File

@ -31,13 +31,13 @@ import net.sf.samtools.util.BlockCompressedFilePointerUtil;
import net.sf.samtools.util.BlockCompressedInputStream;
import net.sf.samtools.util.RuntimeEOFException;
import net.sf.samtools.util.SeekableStream;
import org.broad.tribble.util.BlockCompressedStreamConstants;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
/**
@ -120,6 +120,12 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
// TODO: Kill the region when all we want to do is start at the beginning of the stream and run to the end of the stream.
this.position = new SAMReaderPosition(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE)));
// The block offsets / block positions guarantee that the ending offset/position in the data structure maps to
// the point in the file just following the last read. These two arrays should never be empty; initializing
// to 0 to match the position above.
this.blockOffsets.add(0);
this.blockPositions.add(0L);
try {
if(validate) {
System.out.printf("BlockInputStream %s: BGZF block validation mode activated%n",this);
@ -143,34 +149,52 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
public long getFilePointer() {
long filePointer;
synchronized(lock) {
if(buffer.remaining() > 0) {
// If there's data in the buffer, figure out from whence it came.
final long blockAddress = blockPositions.size() > 0 ? blockPositions.get(0) : 0;
final int blockOffset = buffer.position();
filePointer = blockAddress << 16 | blockOffset;
}
else {
// Otherwise, find the next position to load.
filePointer = position.getBlockAddress() << 16;
}
// Find the current block within the input stream.
int blockIndex;
for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() >= blockOffsets.get(blockIndex + 1); blockIndex++)
;
filePointer = blockPositions.get(blockIndex) + (buffer.position()-blockOffsets.get(blockIndex));
}
if(validatingInputStream != null && filePointer != validatingInputStream.getFilePointer())
throw new ReviewedStingException(String.format("Position of input stream is invalid; expected (block address, block offset) = (%d,%d), got (%d,%d)",
BlockCompressedFilePointerUtil.getBlockAddress(filePointer),BlockCompressedFilePointerUtil.getBlockOffset(filePointer),
BlockCompressedFilePointerUtil.getBlockAddress(validatingInputStream.getFilePointer()),BlockCompressedFilePointerUtil.getBlockOffset(validatingInputStream.getFilePointer())));
// if(validatingInputStream != null && filePointer != validatingInputStream.getFilePointer())
// throw new ReviewedStingException(String.format("Position of input stream is invalid; expected (block address, block offset) = (%d,%d), got (%d,%d)",
// BlockCompressedFilePointerUtil.getBlockAddress(validatingInputStream.getFilePointer()),BlockCompressedFilePointerUtil.getBlockOffset(validatingInputStream.getFilePointer()),
// BlockCompressedFilePointerUtil.getBlockAddress(filePointer),BlockCompressedFilePointerUtil.getBlockOffset(filePointer)));
return filePointer;
}
public void seek(long target) {
// TODO: Validate the seek point.
//System.out.printf("Thread %s, BlockInputStream %s: seeking to block %d, offset %d%n",Thread.currentThread().getId(),this,BlockCompressedFilePointerUtil.getBlockAddress(target),BlockCompressedFilePointerUtil.getBlockOffset(target));
synchronized(lock) {
clearBuffers();
position.advancePosition(BlockCompressedFilePointerUtil.getBlockAddress(target));
waitForBufferFill();
buffer.position(BlockCompressedFilePointerUtil.getBlockOffset(target));
// Ensure that the position filled in by submitAccessPlan() is in sync with the seek target just specified.
position.advancePosition(target);
// If the position advances past the end of the target, that must mean that we seeked to a point at the end
// of one of the chunk list's subregions. Make a note of our current position and punt on loading any data.
if(target < position.getBlockAddress() << 16) {
blockOffsets.clear();
blockOffsets.add(0);
blockPositions.clear();
blockPositions.add(target);
}
else {
waitForBufferFill();
// A buffer fill will load the relevant data from the shard, but the buffer position still needs to be
// advanced as appropriate.
Iterator<Integer> blockOffsetIterator = blockOffsets.descendingIterator();
Iterator<Long> blockPositionIterator = blockPositions.descendingIterator();
while(blockOffsetIterator.hasNext() && blockPositionIterator.hasNext()) {
final int blockOffset = blockOffsetIterator.next();
final long blockPosition = blockPositionIterator.next();
if((blockPosition >> 16) == (target >> 16) && (blockPosition&0xFFFF) < (target&0xFFFF)) {
buffer.position(blockOffset + (int)(target&0xFFFF)-(int)(blockPosition&0xFFFF));
break;
}
}
}
if(validatingInputStream != null) {
try {
@ -191,14 +215,17 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
buffer.clear();
buffer.limit(0);
// Clear everything except the last block offset / position
blockOffsets.clear();
blockPositions.clear();
blockOffsets.add(0);
while(blockPositions.size() > 1)
blockPositions.removeFirst();
}
public boolean eof() {
synchronized(lock) {
// TODO: Handle multiple empty BGZF blocks at end of the file.
return position != null && position.getBlockAddress() >= length;
return position != null && (position.getBlockAddress() < 0 || position.getBlockAddress() >= length);
}
}
@ -216,7 +243,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
// Assume that the access plan is going to tell us to start where we are and move forward.
// If this isn't the case, we'll soon receive a seek request and the buffer will be forced to reset.
if(this.position != null && position.getBlockAddress() < this.position.getBlockAddress())
position.advancePosition(this.position.getBlockAddress());
position.advancePosition(this.position.getBlockAddress() << 16);
}
this.position = position;
}
@ -225,14 +252,15 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
// Compact buffer to maximize storage space.
int bytesToRemove = 0;
// Look ahead to see if we can compact away the first block in the series.
while(blockOffsets.size() > 1 && buffer.position() < blockOffsets.get(1)) {
bytesToRemove += blockOffsets.remove();
// Look ahead to see if we can compact away the first blocks in the series.
while(blockOffsets.size() > 1 && buffer.position() >= blockOffsets.get(1)) {
blockOffsets.remove();
blockPositions.remove();
bytesToRemove = blockOffsets.peek();
}
// If we end up with an empty block at the end of the series, compact this as well.
if(buffer.remaining() == 0 && !blockOffsets.isEmpty() && buffer.position() >= blockOffsets.peek()) {
if(buffer.remaining() == 0 && blockOffsets.size() > 1 && buffer.position() >= blockOffsets.peek()) {
bytesToRemove += buffer.position();
blockOffsets.remove();
blockPositions.remove();
@ -241,11 +269,17 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
int finalBufferStart = buffer.position() - bytesToRemove;
int finalBufferSize = buffer.remaining();
// Position the buffer to remove the unneeded data, and compact it away.
buffer.position(bytesToRemove);
buffer.compact();
// Reset the limits for reading.
buffer.position(finalBufferStart);
buffer.limit(finalBufferStart+finalBufferSize);
// Shift everything in the offset buffer down to accommodate the bytes removed from the buffer.
for(int i = 0; i < blockOffsets.size(); i++)
blockOffsets.set(i,blockOffsets.get(i)-bytesToRemove);
}
/**
@ -253,6 +287,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
* MUST be called from a thread that is NOT the reader thread.
* @param incomingBuffer The data being pushed into this input stream.
* @param position target position for the data.
* @param filePosition the current position of the file pointer
*/
public void copyIntoBuffer(final ByteBuffer incomingBuffer, final SAMReaderPosition position, final long filePosition) {
synchronized(lock) {
@ -262,7 +297,12 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
buffer.limit(buffer.capacity());
// Advance the position to take the most recent read into account.
long lastReadPosition = position.getBlockAddress();
final long lastBlockAddress = position.getBlockAddress();
final int blockOffsetStart = position.getFirstOffsetInBlock();
final int blockOffsetEnd = position.getLastOffsetInBlock();
// Where did this read end? It either ended in the middle of a block (for a bounding chunk) or it ended at the start of the next block.
final long endOfRead = (blockOffsetEnd < incomingBuffer.remaining()) ? (lastBlockAddress << 16) | blockOffsetEnd : filePosition << 16;
byte[] validBytes = null;
if(validatingInputStream != null) {
@ -277,7 +317,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
incomingBuffer.position(pos);
long currentFilePointer = validatingInputStream.getFilePointer();
validatingInputStream.seek(lastReadPosition << 16);
validatingInputStream.seek(lastBlockAddress << 16);
validatingInputStream.read(validBytes);
validatingInputStream.seek(currentFilePointer);
@ -286,7 +326,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
}
this.position = position;
position.advancePosition(filePosition);
position.advancePosition(filePosition << 16);
if(buffer.remaining() < incomingBuffer.remaining()) {
//System.out.printf("Thread %s: waiting for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n",Thread.currentThread().getId(),buffer.remaining(),incomingBuffer.remaining());
@ -294,12 +334,21 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
//System.out.printf("Thread %s: waited for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n", Thread.currentThread().getId(), buffer.remaining(), incomingBuffer.remaining());
}
// Queue list of block offsets / block positions.
// Remove the last position in the list and add in the last read position, in case the two are different.
blockOffsets.removeLast();
blockOffsets.add(buffer.position());
blockPositions.add(lastReadPosition);
blockPositions.removeLast();
blockPositions.add(lastBlockAddress << 16 | blockOffsetStart);
// Stream the buffer into the data stream.
incomingBuffer.position(blockOffsetStart);
incomingBuffer.limit(Math.min(incomingBuffer.limit(),blockOffsetEnd));
buffer.put(incomingBuffer);
// Then, add the last position read to the very end of the list, just past the end of the last buffer.
blockOffsets.add(buffer.position());
blockPositions.add(endOfRead);
// Set up the buffer for reading.
buffer.flip();
bufferFilled = true;
@ -380,23 +429,21 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
lock.notify();
}
if(validatingInputStream != null) {
byte[] validBytes = new byte[length];
try {
validatingInputStream.read(validBytes,offset,length);
for(int i = offset; i < offset+length; i++) {
if(bytes[i] != validBytes[i]) {
System.out.printf("Thread %s: preparing to throw an exception because contents don't match%n",Thread.currentThread().getId());
throw new ReviewedStingException(String.format("Thread %s: blockInputStream %s attempting to return wrong set of bytes; mismatch at offset %d",Thread.currentThread().getId(),this,i));
}
}
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to validate against Picard input stream",ex);
}
}
// if(validatingInputStream != null) {
// byte[] validBytes = new byte[length];
// try {
// validatingInputStream.read(validBytes,offset,length);
// for(int i = offset; i < offset+length; i++) {
// if(bytes[i] != validBytes[i])
// throw new ReviewedStingException(String.format("Thread %s: blockInputStream %s attempting to return wrong set of bytes; mismatch at offset %d",Thread.currentThread().getId(),this,i));
// }
// }
// catch(IOException ex) {
// throw new ReviewedStingException("Unable to validate against Picard input stream",ex);
// }
// }
return length - remaining;
return eof() ? -1 : length-remaining;
}
public void close() {
@ -424,7 +471,6 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
lock.wait();
}
catch(InterruptedException ex) {
// TODO: handle me.
throw new ReviewedStingException("Interrupt occurred waiting for buffer to fill",ex);
}

View File

@ -75,6 +75,22 @@ class SAMReaderPosition {
return nextBlockAddress;
}
/**
* Retrieves the first offset of interest in the block returned by getBlockAddress().
* @return First block of interest in this segment.
*/
public int getFirstOffsetInBlock() {
return (nextBlockAddress == positionIterator.peek().getBlockStart()) ? positionIterator.peek().getBlockOffsetStart() : 0;
}
/**
* Retrieves the last offset of interest in the block returned by getBlockAddress().
* @return First block of interest in this segment.
*/
public int getLastOffsetInBlock() {
return (nextBlockAddress == positionIterator.peek().getBlockEnd()) ? positionIterator.peek().getBlockOffsetEnd() : 65536;
}
public void reset() {
initialize();
}
@ -95,26 +111,27 @@ class SAMReaderPosition {
* @param filePosition The current position within the file.
*/
void advancePosition(final long filePosition) {
nextBlockAddress = filePosition;
nextBlockAddress = filePosition >> 16;
// Check the current file position against the iterator; if the iterator is before the current file position,
// draw the iterator forward. Remember when performing the check that coordinates are half-open!
try {
while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) {
positionIterator.next();
// Check to see if the iterator has more data available.
if(positionIterator.hasNext() && filePosition < positionIterator.peek().getBlockStart()) {
nextBlockAddress = positionIterator.peek().getBlockStart();
break;
}
while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) {
positionIterator.next();
// If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block.
if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart()) {
nextBlockAddress = positionIterator.peek().getBlockStart();
//System.out.printf("SAMReaderPosition: next block address advanced to %d%n",nextBlockAddress);
break;
}
}
catch(Exception ex) {
throw new ReviewedStingException("");
}
// If we've shot off the end of the block pointer, notify consumers that iteration is complete.
if(!positionIterator.hasNext())
nextBlockAddress = -1;
}
private boolean isFilePositionPastEndOfChunk(final long filePosition, final GATKChunk chunk) {
return (filePosition > chunk.getBlockEnd() || (filePosition == chunk.getBlockEnd() && chunk.getBlockOffsetEnd() == 0));
return filePosition >= chunk.getChunkEnd();
}
}