A rethink of the existing BAM block extraction code: rather than working in

chunk space directly, stream data in block space, converting to chunk space
on demand.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2484 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-12-30 18:19:51 +00:00
parent 80658fd99e
commit 497ae700c4
7 changed files with 208 additions and 71 deletions

View File

@ -1,9 +1,18 @@
package net.sf.samtools; package net.sf.samtools;
import net.sf.samtools.util.BinaryCodec;
import net.sf.samtools.util.BlockCompressedInputStream;
import net.sf.samtools.util.StringLineReader;
import net.sf.samtools.util.CloseableIterator;
import net.sf.samtools.util.SeekableStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Arrays;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.io.IOException; import java.io.IOException;
import java.io.FileInputStream;
import java.io.File;
/** /**
* Walks over a BAM file, discovering and returning the starting location of each block * Walks over a BAM file, discovering and returning the starting location of each block
@ -12,7 +21,7 @@ import java.io.IOException;
* @author mhanna * @author mhanna
* @version 0.1 * @version 0.1
*/ */
public class BAMChunkIterator implements Iterator<Chunk> { public class BAMBlockIterator implements CloseableIterator<Block> {
/** /**
* File channel from which to read chunks. * File channel from which to read chunks.
*/ */
@ -25,10 +34,14 @@ public class BAMChunkIterator implements Iterator<Chunk> {
/** /**
* Iterate through the BAM chunks in a file. * Iterate through the BAM chunks in a file.
* @param channel File channel to use when accessing the BAM. * @param file stream File to use when accessing the BAM.
*/ */
public BAMChunkIterator(FileChannel channel) { public BAMBlockIterator(File file) throws IOException {
this.blockReader = new BlockReader(channel); FileInputStream inputStream = new FileInputStream(file);
this.blockReader = new BlockReader(inputStream);
}
public void close() {
} }
/** /**
@ -48,19 +61,19 @@ public class BAMChunkIterator implements Iterator<Chunk> {
* @return The next chunk. * @return The next chunk.
* @throw NoSuchElementException if no next chunk is available. * @throw NoSuchElementException if no next chunk is available.
*/ */
public Chunk next() { public Block next() {
if(!hasNext()) if(!hasNext())
throw new NoSuchElementException("No next chunk is available."); throw new NoSuchElementException("No next chunk is available.");
Chunk chunk = null; Block block = null;
try { try {
chunk = blockReader.getChunkAt(position); block = blockReader.getBlockAt(position);
position = (chunk.getChunkEnd() >> 16) + 1; position = block.position + block.compressedBlockSize;
} }
catch(IOException ex) { catch(IOException ex) {
throw new SAMException("Unable to completely read chunk at end of file.", ex); throw new SAMException("Unable to completely read chunk at end of file.", ex);
} }
return chunk; return block;
} }
/** /**

View File

@ -0,0 +1,77 @@
package net.sf.samtools;
import net.sf.samtools.util.BlockCompressedInputStream;
import net.sf.samtools.util.BinaryCodec;
import net.sf.samtools.util.StringLineReader;
import java.io.File;
import java.io.IOException;
import java.io.DataInputStream;
import java.util.Arrays;
/**
* Loads a BAM file header from an file, optionally providing its position
* within the file.
*
* @author mhanna
* @version 0.1
*/
public class BAMFileHeaderLoader {
/**
* The contents of the BAM file header.
*/
private final SAMFileHeader header;
/**
* Location of the header within the BAM file.
*/
private final Chunk location;
/**
* Load the header from the given file.
* @param header the parsed haeder for the BAM file.
* @param location the location of the header (start and stop) within the BAM.
*/
private BAMFileHeaderLoader(SAMFileHeader header, Chunk location) {
this.header = header;
this.location = location;
}
/**
* Gets the header for the given BAM file.
* @return The header for this BAM file.
*/
public SAMFileHeader getHeader() {
return header;
}
/**
* Gets the location of the header within the given BAM file, in chunk format.
* @return the location of the header, in chunk coordinates.
*/
public Chunk getLocation() {
return location;
}
public static BAMFileHeaderLoader load(File file) throws IOException {
BlockCompressedInputStream inputStream = new BlockCompressedInputStream(file);
BinaryCodec binaryCodec = new BinaryCodec(new DataInputStream(inputStream));
final byte[] buffer = new byte[4];
binaryCodec.readBytes(buffer);
if (!Arrays.equals(buffer, BAMFileConstants.BAM_MAGIC)) {
throw new IOException("Invalid BAM file header");
}
final int headerTextLength = binaryCodec.readInt();
final String textHeader = binaryCodec.readString(headerTextLength);
final SAMTextHeaderCodec headerCodec = new SAMTextHeaderCodec();
headerCodec.setValidationStringency(SAMFileReader.ValidationStringency.SILENT);
SAMFileHeader header = headerCodec.decode(new StringLineReader(textHeader),file.getAbsolutePath());
inputStream.close();
return new BAMFileHeaderLoader(header,new Chunk(buffer.length,inputStream.getFilePointer()));
}
}

View File

@ -0,0 +1,34 @@
package net.sf.samtools;
/**
* Represents the position of a block on disk.
*
* @author mhanna
* @version 0.1
*/
public class Block {
public final long position;
public final int compressedBlockSize;
public final long uncompressedBlockSize;
/**
* Create a block, loading no data into memory.
* @param position Position of this block on disk.s
* @param compressedBlockSize Size of the block on disk; if compressedData is present, should match compressedData.length.
* @param uncompressedBlockSize Size of the data in the block.
*/
public Block(final long position, final int compressedBlockSize, final long uncompressedBlockSize) {
this.position = position;
this.compressedBlockSize = compressedBlockSize;
this.uncompressedBlockSize = uncompressedBlockSize;
}
/**
* Build a string representation of the block.
* @return A string indicating position and size.
*/
@Override
public String toString() {
return String.format("Block: pos = %d, compressed size = %d, uncompressed size = %d",position,compressedBlockSize,uncompressedBlockSize);
}
}

View File

@ -3,12 +3,13 @@ package net.sf.samtools;
import net.sf.samtools.util.BlockCompressedStreamConstants; import net.sf.samtools.util.BlockCompressedStreamConstants;
import java.io.IOException; import java.io.IOException;
import java.io.FileInputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
/** /**
* Read an individual block or chunk from the BGZF file. * Read an individual block from the BGZF file.
* *
* @author mhanna * @author mhanna
* @version 0.1 * @version 0.1
@ -26,14 +27,22 @@ public class BlockReader {
/** /**
* Create a new block reader. Block readers can operate independently on the same input file. * Create a new block reader. Block readers can operate independently on the same input file.
* @param channel File channel from which to read. * @param inputStream InputStream from which to read.
*/ */
public BlockReader(final FileChannel channel) { public BlockReader(final FileInputStream inputStream) {
this.channel = channel; this.channel = inputStream.getChannel();
this.buffer = ByteBuffer.allocateDirect(BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); this.buffer = ByteBuffer.allocateDirect(BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.order(ByteOrder.LITTLE_ENDIAN);
} }
/**
* Closes the block reader's channel.
* @throws IOException On failure to close channel.
*/
public void close() throws IOException {
this.channel.close();
}
/** /**
* Determine whether the given offset is at the end of the file. * Determine whether the given offset is at the end of the file.
* @param position Position at which to start reading. Must be at the beginning of a block. * @param position Position at which to start reading. Must be at the beginning of a block.
@ -51,7 +60,7 @@ public class BlockReader {
* @return Chunk of the BGZF file, starting at position. * @return Chunk of the BGZF file, starting at position.
* @throws IOException if the chunk could not be read. * @throws IOException if the chunk could not be read.
*/ */
public Chunk getChunkAt(long position) throws IOException { public Block getBlockAt(long position) throws IOException {
buffer.rewind(); buffer.rewind();
buffer.limit(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); buffer.limit(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
int count = channel.read(buffer,position); int count = channel.read(buffer,position);
@ -78,14 +87,18 @@ public class BlockReader {
// Skip blocksize subfield intro // Skip blocksize subfield intro
buffer.position(buffer.position() + 4); buffer.position(buffer.position() + 4);
// Read ushort // Read ushort
final int totalBlockSize = (buffer.getShort() & 0xffff) + 1; final int compressedBlockSize = (buffer.getShort() & 0xffff) + 1;
if (totalBlockSize < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || totalBlockSize > buffer.capacity()) { if (compressedBlockSize < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || compressedBlockSize > buffer.capacity()) {
throw new IOException("Unexpected compressed block length: " + totalBlockSize); throw new IOException("Unexpected compressed block length: " + compressedBlockSize);
} }
final long chunkStart = position << 16; // Read the uncompressed block size
final long chunkEnd = (position+totalBlockSize-1) << 16; buffer.rewind();
buffer.limit(4);
channel.read(buffer,position+compressedBlockSize-4);
buffer.flip();
final int uncompressedBlockSize = buffer.getInt();
return new Chunk(chunkStart,chunkEnd); return new Block(position,compressedBlockSize,uncompressedBlockSize);
} }
} }

View File

@ -0,0 +1,46 @@
package net.sf.samtools;
import java.io.File;
import java.io.IOException;
/**
* Test harness for playing with sharding by BGZF block.
*
* @author mhanna
* @version 0.1
*/
public class BlockTestHarness {
private static void usage() {
System.out.println("Usage: ChunkTestHarness <filename>.bam");
System.exit(1);
}
public static void main(String args[]) throws IOException {
if(args.length == 0)
usage();
String bamFileName = args[0];
if(!bamFileName.endsWith(".bam"))
usage();
File bamFile = new File(bamFileName);
if(!bamFile.exists())
usage();
Chunk headerLocation = BAMFileHeaderLoader.load(bamFile).getLocation();
System.out.printf("Header location = %s%n", headerLocation);
BAMBlockIterator blockIterator = new BAMBlockIterator(bamFile);
long blockCount = 0;
long startTime = System.currentTimeMillis();
while(blockIterator.hasNext()) {
Block block = blockIterator.next();
blockCount++;
//System.out.println(block);
}
long endTime = System.currentTimeMillis();
System.out.printf("Number of chunks: %d; elapsed time: %dms%n", blockCount, endTime-startTime);
}
}

View File

@ -60,4 +60,9 @@ class Chunk implements Comparable<Chunk> {
result = 31 * result + (int) (mChunkEnd ^ (mChunkEnd >>> 32)); result = 31 * result + (int) (mChunkEnd ^ (mChunkEnd >>> 32));
return result; return result;
} }
@Override
public String toString() {
return String.format("%d:%d-%d:%d",mChunkStart >> 16,mChunkStart & 0xFFFF,mChunkEnd >> 16,mChunkEnd & 0xFFFF);
}
} }

View File

@ -1,51 +0,0 @@
package net.sf.samtools;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* Test harness for playing with sharding by BGZF block.
*
* @author mhanna
* @version 0.1
*/
public class ChunkTestHarness {
private static void usage() {
System.out.println("Usage: ChunkTestHarness <filename>.bam");
System.exit(1);
}
public static void main(String args[]) throws IOException {
if(args.length == 0)
usage();
String bamFileName = args[0];
if(!bamFileName.endsWith(".bam"))
usage();
File bamFile = new File(bamFileName);
if(!bamFile.exists())
usage();
FileInputStream bamInputStream = new FileInputStream(bamFile);
FileChannel bamInputChannel = bamInputStream.getChannel();
BAMChunkIterator chunkIterator = new BAMChunkIterator(bamInputChannel);
long chunkCount = 0;
long startTime = System.currentTimeMillis();
while(chunkIterator.hasNext()) {
Chunk chunk = chunkIterator.next();
chunkCount++;
System.out.printf("Chunk: [%d,%d)\tByte offsets: [%d,%d)%n",chunk.getChunkStart(),
chunk.getChunkEnd(),
chunk.getChunkStart()>>16,
chunk.getChunkEnd()>>16);
}
long endTime = System.currentTimeMillis();
System.out.printf("Number of chunks: %d; elasped time: %dms%n", chunkCount, endTime-startTime);
}
}