First step in introducing a patch to Picard: create our ideal interface into the BAM file for sharding.

This commit can iterate over the BAM file, pulling out information about the blocks in the file without actually loading
or decompressing the reads.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2434 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-12-23 21:35:08 +00:00
parent 770093a40e
commit 87ff2b15d4
4 changed files with 278 additions and 0 deletions

View File

@ -0,0 +1,73 @@
package net.sf.samtools;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.nio.channels.FileChannel;
import java.io.IOException;
/**
* Walks over a BAM file, discovering and returning the starting location of each block
* in chunk format.
*
* @author mhanna
* @version 0.1
*/
public class BAMChunkIterator implements Iterator<Chunk> {
/**
* File channel from which to read chunks.
*/
private BlockReader blockReader;
/**
* What is the current position of this block within the BAM file?
*/
private long position = 0;
/**
* Iterate through the BAM chunks in a file.
* @param channel File channel to use when accessing the BAM.
*/
public BAMChunkIterator(FileChannel channel) {
this.blockReader = new BlockReader(channel);
}
/**
* Are there other chunks to retrieve in this file?
* @return True if other chunks are available, false otherwise.
*/
public boolean hasNext() {
try {
return !blockReader.eof(position);
} catch(IOException ex) {
throw new SAMException("Unable to check file for a next BAM record", ex);
}
}
/**
* Get the next chunk from the iterator.
* @return The next chunk.
* @throw NoSuchElementException if no next chunk is available.
*/
public Chunk next() {
if(!hasNext())
throw new NoSuchElementException("No next chunk is available.");
Chunk chunk = null;
try {
chunk = blockReader.getChunkAt(position);
position = (chunk.getChunkEnd() >> 16) + 1;
}
catch(IOException ex) {
throw new SAMException("Unable to completely read chunk at end of file.", ex);
}
return chunk;
}
/**
* Remove a chunk from the iterator.
* @throws UnsupportedOperationException always.
*/
public void remove() {
throw new UnsupportedOperationException("BAMChunkIterator: Cannot remove a BAM chunk.");
}
}

View File

@ -0,0 +1,91 @@
package net.sf.samtools;
import net.sf.samtools.util.BlockCompressedStreamConstants;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
/**
* Read an individual block or chunk from the BGZF file.
*
* @author mhanna
* @version 0.1
*/
public class BlockReader {
/**
* File channel from which to read.
*/
private final FileChannel channel;
/**
* Store buffered information from the file temporarily.
*/
private final ByteBuffer buffer;
/**
* Create a new block reader. Block readers can operate independently on the same input file.
* @param channel File channel from which to read.
*/
public BlockReader(final FileChannel channel) {
this.channel = channel;
this.buffer = ByteBuffer.allocateDirect(BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
buffer.order(ByteOrder.LITTLE_ENDIAN);
}
/**
* Determine whether the given offset is at the end of the file.
* @param position Position at which to start reading. Must be at the beginning of a block.
* @return True if we're at the end of the file (or at the end of the data stream), false otherwise.
* @throws IOException If whether the position is past the end of the file can't be determined.
*/
public boolean eof(long position) throws IOException {
return (channel.size() - position) <= 0 ||
(channel.size() - position) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
}
/**
* Load the chunk information at a given position.
* @param position Position at which the chunk begins.
* @return Chunk of the BGZF file, starting at position.
* @throws IOException if the chunk could not be read.
*/
public Chunk getChunkAt(long position) throws IOException {
buffer.rewind();
buffer.limit(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
int count = channel.read(buffer,position);
if (count == 0) {
throw new IOException("Tried to read chunk past end of file");
}
if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) {
throw new IOException("Premature end of file");
}
buffer.flip();
// Validate GZIP header
if (buffer.get() != BlockCompressedStreamConstants.GZIP_ID1 ||
buffer.get() != (byte)BlockCompressedStreamConstants.GZIP_ID2 ||
buffer.get() != BlockCompressedStreamConstants.GZIP_CM_DEFLATE ||
buffer.get() != BlockCompressedStreamConstants.GZIP_FLG) {
throw new SAMFormatException("Invalid GZIP header");
}
// Skip MTIME, XFL, OS fields
buffer.position(buffer.position() + 6);
if (buffer.getShort() != BlockCompressedStreamConstants.GZIP_XLEN) {
throw new SAMFormatException("Invalid GZIP header");
}
// Skip blocksize subfield intro
buffer.position(buffer.position() + 4);
// Read ushort
final int totalBlockSize = (buffer.getShort() & 0xffff) + 1;
if (totalBlockSize < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || totalBlockSize > buffer.capacity()) {
throw new IOException("Unexpected compressed block length: " + totalBlockSize);
}
final long chunkStart = position << 16;
final long chunkEnd = (position+totalBlockSize-1) << 16;
return new Chunk(chunkStart,chunkEnd);
}
}

View File

@ -0,0 +1,63 @@
package net.sf.samtools;
/**
* Represents a chunk stolen from the BAM file. Originally a private static inner class within
* BAMFileIndex; now breaking it out so that the sharding system can use it.
*
* @author mhanna
* @version 0.1
*/
class Chunk implements Comparable<Chunk> {
private long mChunkStart;
private long mChunkEnd;
Chunk(final long start, final long end) {
mChunkStart = start;
mChunkEnd = end;
}
long getChunkStart() {
return mChunkStart;
}
void setChunkStart(final long value) {
mChunkStart = value;
}
long getChunkEnd() {
return mChunkEnd;
}
void setChunkEnd(final long value) {
mChunkEnd = value;
}
public int compareTo(final Chunk chunk) {
int result = Long.signum(mChunkStart - chunk.mChunkStart);
if (result == 0) {
result = Long.signum(mChunkEnd - chunk.mChunkEnd);
}
return result;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Chunk chunk = (Chunk) o;
if (mChunkEnd != chunk.mChunkEnd) return false;
if (mChunkStart != chunk.mChunkStart) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (mChunkStart ^ (mChunkStart >>> 32));
result = 31 * result + (int) (mChunkEnd ^ (mChunkEnd >>> 32));
return result;
}
}

View File

@ -0,0 +1,51 @@
package net.sf.samtools;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* Test harness for playing with sharding by BGZF block.
*
* @author mhanna
* @version 0.1
*/
public class ChunkTestHarness {
private static void usage() {
System.out.println("Usage: ChunkTestHarness <filename>.bam");
System.exit(1);
}
public static void main(String args[]) throws IOException {
if(args.length == 0)
usage();
String bamFileName = args[0];
if(!bamFileName.endsWith(".bam"))
usage();
File bamFile = new File(bamFileName);
if(!bamFile.exists())
usage();
FileInputStream bamInputStream = new FileInputStream(bamFile);
FileChannel bamInputChannel = bamInputStream.getChannel();
BAMChunkIterator chunkIterator = new BAMChunkIterator(bamInputChannel);
long chunkCount = 0;
long startTime = System.currentTimeMillis();
while(chunkIterator.hasNext()) {
Chunk chunk = chunkIterator.next();
chunkCount++;
System.out.printf("Chunk: [%d,%d)\tByte offsets: [%d,%d)%n",chunk.getChunkStart(),
chunk.getChunkEnd(),
chunk.getChunkStart()>>16,
chunk.getChunkEnd()>>16);
}
long endTime = System.currentTimeMillis();
System.out.printf("Number of chunks: %d; elasped time: %dms%n", chunkCount, endTime-startTime);
}
}