diff --git a/java/src/net/sf/samtools/BAMChunkIterator.java b/java/src/net/sf/samtools/BAMChunkIterator.java new file mode 100644 index 000000000..8a2571cd5 --- /dev/null +++ b/java/src/net/sf/samtools/BAMChunkIterator.java @@ -0,0 +1,73 @@ +package net.sf.samtools; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.nio.channels.FileChannel; +import java.io.IOException; + +/** + * Walks over a BAM file, discovering and returning the starting location of each block + * in chunk format. + * + * @author mhanna + * @version 0.1 + */ +public class BAMChunkIterator implements Iterator { + /** + * File channel from which to read chunks. + */ + private BlockReader blockReader; + + /** + * What is the current position of this block within the BAM file? + */ + private long position = 0; + + /** + * Iterate through the BAM chunks in a file. + * @param channel File channel to use when accessing the BAM. + */ + public BAMChunkIterator(FileChannel channel) { + this.blockReader = new BlockReader(channel); + } + + /** + * Are there other chunks to retrieve in this file? + * @return True if other chunks are available, false otherwise. + */ + public boolean hasNext() { + try { + return !blockReader.eof(position); + } catch(IOException ex) { + throw new SAMException("Unable to check file for a next BAM record", ex); + } + } + + /** + * Get the next chunk from the iterator. + * @return The next chunk. + * @throw NoSuchElementException if no next chunk is available. + */ + public Chunk next() { + if(!hasNext()) + throw new NoSuchElementException("No next chunk is available."); + + Chunk chunk = null; + try { + chunk = blockReader.getChunkAt(position); + position = (chunk.getChunkEnd() >> 16) + 1; + } + catch(IOException ex) { + throw new SAMException("Unable to completely read chunk at end of file.", ex); + } + return chunk; + } + + /** + * Remove a chunk from the iterator. + * @throws UnsupportedOperationException always. + */ + public void remove() { + throw new UnsupportedOperationException("BAMChunkIterator: Cannot remove a BAM chunk."); + } +} diff --git a/java/src/net/sf/samtools/BlockReader.java b/java/src/net/sf/samtools/BlockReader.java new file mode 100644 index 000000000..a2f91080e --- /dev/null +++ b/java/src/net/sf/samtools/BlockReader.java @@ -0,0 +1,91 @@ +package net.sf.samtools; + +import net.sf.samtools.util.BlockCompressedStreamConstants; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + +/** + * Read an individual block or chunk from the BGZF file. + * + * @author mhanna + * @version 0.1 + */ +public class BlockReader { + /** + * File channel from which to read. + */ + private final FileChannel channel; + + /** + * Store buffered information from the file temporarily. + */ + private final ByteBuffer buffer; + + /** + * Create a new block reader. Block readers can operate independently on the same input file. + * @param channel File channel from which to read. + */ + public BlockReader(final FileChannel channel) { + this.channel = channel; + this.buffer = ByteBuffer.allocateDirect(BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); + buffer.order(ByteOrder.LITTLE_ENDIAN); + } + + /** + * Determine whether the given offset is at the end of the file. + * @param position Position at which to start reading. Must be at the beginning of a block. + * @return True if we're at the end of the file (or at the end of the data stream), false otherwise. + * @throws IOException If whether the position is past the end of the file can't be determined. + */ + public boolean eof(long position) throws IOException { + return (channel.size() - position) <= 0 || + (channel.size() - position) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; + } + + /** + * Load the chunk information at a given position. + * @param position Position at which the chunk begins. + * @return Chunk of the BGZF file, starting at position. + * @throws IOException if the chunk could not be read. + */ + public Chunk getChunkAt(long position) throws IOException { + buffer.rewind(); + buffer.limit(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); + int count = channel.read(buffer,position); + if (count == 0) { + throw new IOException("Tried to read chunk past end of file"); + } + if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) { + throw new IOException("Premature end of file"); + } + buffer.flip(); + + // Validate GZIP header + if (buffer.get() != BlockCompressedStreamConstants.GZIP_ID1 || + buffer.get() != (byte)BlockCompressedStreamConstants.GZIP_ID2 || + buffer.get() != BlockCompressedStreamConstants.GZIP_CM_DEFLATE || + buffer.get() != BlockCompressedStreamConstants.GZIP_FLG) { + throw new SAMFormatException("Invalid GZIP header"); + } + // Skip MTIME, XFL, OS fields + buffer.position(buffer.position() + 6); + if (buffer.getShort() != BlockCompressedStreamConstants.GZIP_XLEN) { + throw new SAMFormatException("Invalid GZIP header"); + } + // Skip blocksize subfield intro + buffer.position(buffer.position() + 4); + // Read ushort + final int totalBlockSize = (buffer.getShort() & 0xffff) + 1; + if (totalBlockSize < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || totalBlockSize > buffer.capacity()) { + throw new IOException("Unexpected compressed block length: " + totalBlockSize); + } + + final long chunkStart = position << 16; + final long chunkEnd = (position+totalBlockSize-1) << 16; + + return new Chunk(chunkStart,chunkEnd); + } +} diff --git a/java/src/net/sf/samtools/Chunk.java b/java/src/net/sf/samtools/Chunk.java new file mode 100644 index 000000000..a7bb1d565 --- /dev/null +++ b/java/src/net/sf/samtools/Chunk.java @@ -0,0 +1,63 @@ +package net.sf.samtools; + +/** + * Represents a chunk stolen from the BAM file. Originally a private static inner class within + * BAMFileIndex; now breaking it out so that the sharding system can use it. + * + * @author mhanna + * @version 0.1 + */ +class Chunk implements Comparable { + + private long mChunkStart; + private long mChunkEnd; + + Chunk(final long start, final long end) { + mChunkStart = start; + mChunkEnd = end; + } + + long getChunkStart() { + return mChunkStart; + } + + void setChunkStart(final long value) { + mChunkStart = value; + } + + long getChunkEnd() { + return mChunkEnd; + } + + void setChunkEnd(final long value) { + mChunkEnd = value; + } + + public int compareTo(final Chunk chunk) { + int result = Long.signum(mChunkStart - chunk.mChunkStart); + if (result == 0) { + result = Long.signum(mChunkEnd - chunk.mChunkEnd); + } + return result; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final Chunk chunk = (Chunk) o; + + if (mChunkEnd != chunk.mChunkEnd) return false; + if (mChunkStart != chunk.mChunkStart) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (mChunkStart ^ (mChunkStart >>> 32)); + result = 31 * result + (int) (mChunkEnd ^ (mChunkEnd >>> 32)); + return result; + } +} diff --git a/java/src/net/sf/samtools/ChunkTestHarness.java b/java/src/net/sf/samtools/ChunkTestHarness.java new file mode 100644 index 000000000..dd37b347e --- /dev/null +++ b/java/src/net/sf/samtools/ChunkTestHarness.java @@ -0,0 +1,51 @@ +package net.sf.samtools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** + * Test harness for playing with sharding by BGZF block. + * + * @author mhanna + * @version 0.1 + */ +public class ChunkTestHarness { + private static void usage() { + System.out.println("Usage: ChunkTestHarness .bam"); + System.exit(1); + } + + public static void main(String args[]) throws IOException { + if(args.length == 0) + usage(); + + String bamFileName = args[0]; + if(!bamFileName.endsWith(".bam")) + usage(); + + File bamFile = new File(bamFileName); + if(!bamFile.exists()) + usage(); + + FileInputStream bamInputStream = new FileInputStream(bamFile); + FileChannel bamInputChannel = bamInputStream.getChannel(); + + BAMChunkIterator chunkIterator = new BAMChunkIterator(bamInputChannel); + long chunkCount = 0; + + long startTime = System.currentTimeMillis(); + while(chunkIterator.hasNext()) { + Chunk chunk = chunkIterator.next(); + chunkCount++; + System.out.printf("Chunk: [%d,%d)\tByte offsets: [%d,%d)%n",chunk.getChunkStart(), + chunk.getChunkEnd(), + chunk.getChunkStart()>>16, + chunk.getChunkEnd()>>16); + } + long endTime = System.currentTimeMillis(); + + System.out.printf("Number of chunks: %d; elasped time: %dms%n", chunkCount, endTime-startTime); + } +}