Re-enabled fastBAMindexing by replacing the FileChannel with a SeekableBufferedStream

This helps a lot since FileChannel is very low-level and traversing the BAMIndex involves lots of short reads.

- Fixed a deterioration in BAMIndex due to rev'ed picard (see below)
- Added unit tests for SeekableBufferedStream
- Added integrationTests for GATKBAMIndex (in PileupWalkerIntegrationTest)
- Added a runtime-test to verify that the amount read equals the amount requested.
- Added failing tests with expectedExceptions
- Used a DataProvider to make code nicer
This commit is contained in:
Yossi Farjoun 2013-02-01 15:46:14 -05:00
parent 5cc5aedcd1
commit 3a7c8c13e2
3 changed files with 193 additions and 20 deletions

View File

@ -25,17 +25,18 @@
package org.broadinstitute.sting.gatk.datasources.reads; package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.seekablestream.SeekableBufferedStream;
import net.sf.samtools.seekablestream.SeekableFileStream;
import net.sf.samtools.*; import net.sf.samtools.*;
import org.broadinstitute.sting.gatk.CommandLineGATK;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.StingException;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File; import java.io.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -70,6 +71,9 @@ public class GATKBAMIndex {
private final File mFile; private final File mFile;
//TODO: figure out a good value for this buffer size
private final int BUFFERED_STREAM_BUFFER_SIZE = 8192;
/** /**
* Number of sequences stored in this index. * Number of sequences stored in this index.
*/ */
@ -80,8 +84,8 @@ public class GATKBAMIndex {
*/ */
private final long[] sequenceStartCache; private final long[] sequenceStartCache;
private FileInputStream fileStream; private SeekableFileStream fileStream;
private FileChannel fileChannel; private SeekableBufferedStream bufferedStream;
public GATKBAMIndex(final File file) { public GATKBAMIndex(final File file) {
mFile = file; mFile = file;
@ -279,7 +283,6 @@ public class GATKBAMIndex {
for (int i = sequenceIndex; i < referenceSequence; i++) { for (int i = sequenceIndex; i < referenceSequence; i++) {
sequenceStartCache[i] = position(); sequenceStartCache[i] = position();
// System.out.println("# Sequence TID: " + i); // System.out.println("# Sequence TID: " + i);
final int nBins = readInteger(); final int nBins = readInteger();
// System.out.println("# nBins: " + nBins); // System.out.println("# nBins: " + nBins);
@ -292,15 +295,18 @@ public class GATKBAMIndex {
final int nLinearBins = readInteger(); final int nLinearBins = readInteger();
// System.out.println("# nLinearBins: " + nLinearBins); // System.out.println("# nLinearBins: " + nLinearBins);
skipBytes(8 * nLinearBins); skipBytes(8 * nLinearBins);
} }
sequenceStartCache[referenceSequence] = position(); sequenceStartCache[referenceSequence] = position();
} }
private void openIndexFile() { private void openIndexFile() {
try { try {
fileStream = new FileInputStream(mFile); fileStream = new SeekableFileStream(mFile);
fileChannel = fileStream.getChannel(); bufferedStream = new SeekableBufferedStream(fileStream,BUFFERED_STREAM_BUFFER_SIZE);
} }
catch (IOException exc) { catch (IOException exc) {
throw new ReviewedStingException("Unable to open index file (" + exc.getMessage() +")" + mFile, exc); throw new ReviewedStingException("Unable to open index file (" + exc.getMessage() +")" + mFile, exc);
@ -309,7 +315,7 @@ public class GATKBAMIndex {
private void closeIndexFile() { private void closeIndexFile() {
try { try {
fileChannel.close(); bufferedStream.close();
fileStream.close(); fileStream.close();
} }
catch (IOException exc) { catch (IOException exc) {
@ -352,19 +358,45 @@ public class GATKBAMIndex {
} }
private void read(final ByteBuffer buffer) { private void read(final ByteBuffer buffer) {
final int bytesRequested = buffer.limit();
try { try {
int bytesExpected = buffer.limit();
int bytesRead = fileChannel.read(buffer); //BufferedInputStream cannot read directly into a byte buffer, so we read into an array
//and put the result into the bytebuffer after the if statement.
// We have a rigid expectation here to read in exactly the number of bytes we've limited // We have a rigid expectation here to read in exactly the number of bytes we've limited
// our buffer to -- if we read in fewer bytes than this, or encounter EOF (-1), the index // our buffer to -- if there isn't enough data in the file, the index
// must be truncated or otherwise corrupt: // must be truncated or otherwise corrupt:
if ( bytesRead < bytesExpected ) { if(bytesRequested > bufferedStream.length() - bufferedStream.position()){
throw new UserException.MalformedFile(mFile, String.format("Premature end-of-file while reading BAM index file %s. " + throw new UserException.MalformedFile(mFile, String.format("Premature end-of-file while reading BAM index file %s. " +
"It's likely that this file is truncated or corrupt -- " + "It's likely that this file is truncated or corrupt -- " +
"Please try re-indexing the corresponding BAM file.", "Please try re-indexing the corresponding BAM file.",
mFile)); mFile));
} }
int totalBytesRead = 0;
// This while loop must terminate since we demand that we read at least one byte from the file at each iteration
while (totalBytesRead < bytesRequested) {
// bufferedStream.read may return less than the requested amount of byte despite
// not reaching the end of the file, hence the loop.
int bytesRead = bufferedStream.read(byteArray, totalBytesRead, bytesRequested-totalBytesRead);
// We have a rigid expectation here to read in exactly the number of bytes we've limited
// our buffer to -- if we encounter EOF (-1), the index
// must be truncated or otherwise corrupt:
if (bytesRead <= 0) {
throw new UserException.MalformedFile(mFile, String.format("Premature end-of-file while reading BAM index file %s. " +
"It's likely that this file is truncated or corrupt -- " +
"Please try re-indexing the corresponding BAM file.",
mFile));
}
totalBytesRead += bytesRead;
}
if(totalBytesRead != bytesRequested)
throw new RuntimeException("Read amount different from requested amount. This should not happen.");
buffer.put(byteArray, 0, bytesRequested);
} }
catch(IOException ex) { catch(IOException ex) {
throw new ReviewedStingException("Index: unable to read bytes from index file " + mFile); throw new ReviewedStingException("Index: unable to read bytes from index file " + mFile);
@ -378,10 +410,13 @@ public class GATKBAMIndex {
*/ */
private ByteBuffer buffer = null; private ByteBuffer buffer = null;
//BufferedStream don't read into ByteBuffers, so we need this temporary array
private byte[] byteArray=null;
private ByteBuffer getBuffer(final int size) { private ByteBuffer getBuffer(final int size) {
if(buffer == null || buffer.capacity() < size) { if(buffer == null || buffer.capacity() < size) {
// Allocate a new byte buffer. For now, make it indirect to make sure it winds up on the heap for easier debugging. // Allocate a new byte buffer. For now, make it indirect to make sure it winds up on the heap for easier debugging.
buffer = ByteBuffer.allocate(size); buffer = ByteBuffer.allocate(size);
byteArray = new byte[size];
buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.order(ByteOrder.LITTLE_ENDIAN);
} }
buffer.clear(); buffer.clear();
@ -391,7 +426,13 @@ public class GATKBAMIndex {
private void skipBytes(final int count) { private void skipBytes(final int count) {
try { try {
fileChannel.position(fileChannel.position() + count);
//try to skip forward the requested amount.
long skipped = bufferedStream.skip(count);
if( skipped != count ) { //if not managed to skip the requested amount
throw new ReviewedStingException("Index: unable to reposition file channel of index file " + mFile);
}
} }
catch(IOException ex) { catch(IOException ex) {
throw new ReviewedStingException("Index: unable to reposition file channel of index file " + mFile); throw new ReviewedStingException("Index: unable to reposition file channel of index file " + mFile);
@ -400,7 +441,8 @@ public class GATKBAMIndex {
private void seek(final long position) { private void seek(final long position) {
try { try {
fileChannel.position(position); //to seek a new position, move the fileChannel, and reposition the bufferedStream
bufferedStream.seek(position);
} }
catch(IOException ex) { catch(IOException ex) {
throw new ReviewedStingException("Index: unable to reposition of file channel of index file " + mFile); throw new ReviewedStingException("Index: unable to reposition of file channel of index file " + mFile);
@ -413,7 +455,7 @@ public class GATKBAMIndex {
*/ */
private long position() { private long position() {
try { try {
return fileChannel.position(); return bufferedStream.position();
} }
catch (IOException exc) { catch (IOException exc) {
throw new ReviewedStingException("Unable to read position from index file " + mFile, exc); throw new ReviewedStingException("Unable to read position from index file " + mFile, exc);

View File

@ -0,0 +1,101 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.seekablestream.SeekableBufferedStream;
import net.sf.samtools.seekablestream.SeekableFileStream;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.exceptions.StingException;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
/**
* Test basic functionality in SeekableBufferedStream.
*/
public class SeekableBufferedStreamUnitTest extends BaseTest {
private static File InputFile = new File(validationDataLocation + "megabyteZeros.dat");
final private int BUFFERED_STREAM_BUFFER_SIZE = 100;
private byte buffer[] = new byte[BUFFERED_STREAM_BUFFER_SIZE * 10];
@DataProvider(name = "BasicArgumentsDivisible")
public Integer[][] DivisableReads() {
return new Integer[][]{{1}, {4}, {5}, {10}, {20}, {50}, {100}};
}
@DataProvider(name = "BasicArgumentsIndivisibleAndSmall")
public Integer[][] InDivisableReadsSmall() {
return new Integer[][]{{3}, {11}, {31}, {51}, {77}, {99}};
}
@DataProvider(name = "BasicArgumentsIndivisibleYetLarge")
public Integer[][] InDivisableReadsLarge() {
return new Integer[][]{{101}, {151}, {205}, {251}, {301}};
}
private void testReadsLength(int length) throws IOException {
final int READ_SIZE=100000; //file is 10^6, so make this smaller to be safe.
SeekableFileStream fileStream = new SeekableFileStream(InputFile);
SeekableBufferedStream bufferedStream = new SeekableBufferedStream(fileStream, BUFFERED_STREAM_BUFFER_SIZE);
for (int i = 0; i < READ_SIZE / length; ++i) {
Assert.assertEquals(bufferedStream.read(buffer, 0, length), length);
}
}
// These tests fail because SeekableBuffered stream may return _less_ than the amount you are asking for.
// make sure that you wrap reads with while-loops. If these test start failing (meaning that the reads work properly,
// the layer of protection built into GATKBamIndex can be removed.
@Test(dataProvider = "BasicArgumentsIndivisibleAndSmall", enabled = true, expectedExceptions = java.lang.AssertionError.class)
public void testIndivisableSmallReadsFAIL(Integer readLength) throws IOException {
testReadsLength(readLength);
}
//Evidently, if you ask for a read length that's larger than the inernal buffer,
//SeekableBufferedStreamdoes something else and gives you what you asked for
@Test(dataProvider = "BasicArgumentsIndivisibleYetLarge", enabled = true)
public void testIndivisableLargeReadsPASS(Integer readLength) throws IOException {
testReadsLength(readLength);
}
// if the readlength divides the buffer, there are no failures
@Test(dataProvider = "BasicArgumentsDivisible", enabled = true)
public void testDivisableReadsPASS(Integer readLength) throws IOException {
testReadsLength(readLength);
}
}

View File

@ -31,6 +31,9 @@ import org.testng.annotations.Test;
import java.util.Arrays; import java.util.Arrays;
public class PileupWalkerIntegrationTest extends WalkerTest { public class PileupWalkerIntegrationTest extends WalkerTest {
String gatkSpeedupArgs="-T Pileup -I " + validationDataLocation + "NA12878.HiSeq.WGS.bwa.cleaned.recal.hg19.20.bam "
+ "-R " + hg19Reference + " -o %s ";
@Test @Test
public void testGnarleyFHSPileup() { public void testGnarleyFHSPileup() {
String gatk_args = "-T Pileup -I " + validationDataLocation + "FHS_Pileup_Test.bam " String gatk_args = "-T Pileup -I " + validationDataLocation + "FHS_Pileup_Test.bam "
@ -64,4 +67,31 @@ public class PileupWalkerIntegrationTest extends WalkerTest {
WalkerTestSpec spec = new WalkerTestSpec(gatk_args, 1, Arrays.asList(SingleReadAligningOffChromosome1MD5)); WalkerTestSpec spec = new WalkerTestSpec(gatk_args, 1, Arrays.asList(SingleReadAligningOffChromosome1MD5));
executeTest("Testing single read spanning off chromosome 1 unindexed", spec); executeTest("Testing single read spanning off chromosome 1 unindexed", spec);
} }
/************************/
//testing speedup to GATKBAMIndex
@Test
public void testPileupOnLargeBamChr20(){
WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:1-76,050", 1, Arrays.asList("8702701350de11a6d28204acefdc4775"));
executeTest("Testing single on big BAM at start of chromosome 20", spec);
}
@Test
public void testPileupOnLargeBamMid20(){
WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:10,000,000-10,001,100", 1, Arrays.asList("818cf5a8229efe6f89fc1cd8145ccbe3"));
executeTest("Testing single on big BAM somewhere in chromosome 20", spec);
}
@Test
public void testPileupOnLargeBamEnd20(){
WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:62,954,114-63,025,520", 1, Arrays.asList("22471ea4a12e5139aef62bf8ff2a5b63"));
executeTest("Testing single at end of chromosome 20", spec);
}
@Test
public void testPileupOnLargeBam20Many(){
WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:1-76,050 -L 20:20,000,000-20,000,100 -L 20:40,000,000-40,000,100 -L 20:30,000,000-30,000,100 -L 20:50,000,000-50,000,100 -L 20:62,954,114-63,025,520 ",
1, Arrays.asList("08d899ed7c5a76ef3947bf67338acda1"));
executeTest("Testing single on big BAM many places", spec);
}
} }