diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/GATKBAMIndex.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/GATKBAMIndex.java index aec41e340..57b409dcd 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/GATKBAMIndex.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/GATKBAMIndex.java @@ -25,17 +25,18 @@ package org.broadinstitute.sting.gatk.datasources.reads; +import net.sf.samtools.seekablestream.SeekableBufferedStream; +import net.sf.samtools.seekablestream.SeekableFileStream; + import net.sf.samtools.*; -import org.broadinstitute.sting.gatk.CommandLineGATK; + import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.StingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -70,6 +71,9 @@ public class GATKBAMIndex { private final File mFile; + //TODO: figure out a good value for this buffer size + private final int BUFFERED_STREAM_BUFFER_SIZE = 8192; + /** * Number of sequences stored in this index. */ @@ -80,8 +84,8 @@ public class GATKBAMIndex { */ private final long[] sequenceStartCache; - private FileInputStream fileStream; - private FileChannel fileChannel; + private SeekableFileStream fileStream; + private SeekableBufferedStream bufferedStream; public GATKBAMIndex(final File file) { mFile = file; @@ -264,7 +268,7 @@ public class GATKBAMIndex { */ protected int getMaxAddressibleGenomicLocation() { return BIN_GENOMIC_SPAN; - } + } protected void skipToSequence(final int referenceSequence) { // Find the offset in the file of the last sequence whose position has been determined. Start here @@ -279,7 +283,6 @@ public class GATKBAMIndex { for (int i = sequenceIndex; i < referenceSequence; i++) { sequenceStartCache[i] = position(); - // System.out.println("# Sequence TID: " + i); final int nBins = readInteger(); // System.out.println("# nBins: " + nBins); @@ -292,15 +295,18 @@ public class GATKBAMIndex { final int nLinearBins = readInteger(); // System.out.println("# nLinearBins: " + nLinearBins); skipBytes(8 * nLinearBins); + } sequenceStartCache[referenceSequence] = position(); } + + private void openIndexFile() { try { - fileStream = new FileInputStream(mFile); - fileChannel = fileStream.getChannel(); + fileStream = new SeekableFileStream(mFile); + bufferedStream = new SeekableBufferedStream(fileStream,BUFFERED_STREAM_BUFFER_SIZE); } catch (IOException exc) { throw new ReviewedStingException("Unable to open index file (" + exc.getMessage() +")" + mFile, exc); @@ -309,7 +315,7 @@ public class GATKBAMIndex { private void closeIndexFile() { try { - fileChannel.close(); + bufferedStream.close(); fileStream.close(); } catch (IOException exc) { @@ -352,19 +358,45 @@ public class GATKBAMIndex { } private void read(final ByteBuffer buffer) { + final int bytesRequested = buffer.limit(); + try { - int bytesExpected = buffer.limit(); - int bytesRead = fileChannel.read(buffer); + + //BufferedInputStream cannot read directly into a byte buffer, so we read into an array + //and put the result into the bytebuffer after the if statement. // We have a rigid expectation here to read in exactly the number of bytes we've limited - // our buffer to -- if we read in fewer bytes than this, or encounter EOF (-1), the index + // our buffer to -- if there isn't enough data in the file, the index // must be truncated or otherwise corrupt: - if ( bytesRead < bytesExpected ) { + if(bytesRequested > bufferedStream.length() - bufferedStream.position()){ + throw new UserException.MalformedFile(mFile, String.format("Premature end-of-file while reading BAM index file %s. " + + "It's likely that this file is truncated or corrupt -- " + + "Please try re-indexing the corresponding BAM file.", + mFile)); + } + + int totalBytesRead = 0; + // This while loop must terminate since we demand that we read at least one byte from the file at each iteration + while (totalBytesRead < bytesRequested) { + // bufferedStream.read may return less than the requested amount of byte despite + // not reaching the end of the file, hence the loop. + int bytesRead = bufferedStream.read(byteArray, totalBytesRead, bytesRequested-totalBytesRead); + + // We have a rigid expectation here to read in exactly the number of bytes we've limited + // our buffer to -- if we encounter EOF (-1), the index + // must be truncated or otherwise corrupt: + if (bytesRead <= 0) { throw new UserException.MalformedFile(mFile, String.format("Premature end-of-file while reading BAM index file %s. " + "It's likely that this file is truncated or corrupt -- " + "Please try re-indexing the corresponding BAM file.", mFile)); + } + totalBytesRead += bytesRead; } + if(totalBytesRead != bytesRequested) + throw new RuntimeException("Read amount different from requested amount. This should not happen."); + + buffer.put(byteArray, 0, bytesRequested); } catch(IOException ex) { throw new ReviewedStingException("Index: unable to read bytes from index file " + mFile); @@ -378,10 +410,13 @@ public class GATKBAMIndex { */ private ByteBuffer buffer = null; + //BufferedStream don't read into ByteBuffers, so we need this temporary array + private byte[] byteArray=null; private ByteBuffer getBuffer(final int size) { if(buffer == null || buffer.capacity() < size) { // Allocate a new byte buffer. For now, make it indirect to make sure it winds up on the heap for easier debugging. buffer = ByteBuffer.allocate(size); + byteArray = new byte[size]; buffer.order(ByteOrder.LITTLE_ENDIAN); } buffer.clear(); @@ -391,7 +426,13 @@ public class GATKBAMIndex { private void skipBytes(final int count) { try { - fileChannel.position(fileChannel.position() + count); + + //try to skip forward the requested amount. + long skipped = bufferedStream.skip(count); + + if( skipped != count ) { //if not managed to skip the requested amount + throw new ReviewedStingException("Index: unable to reposition file channel of index file " + mFile); + } } catch(IOException ex) { throw new ReviewedStingException("Index: unable to reposition file channel of index file " + mFile); @@ -400,7 +441,8 @@ public class GATKBAMIndex { private void seek(final long position) { try { - fileChannel.position(position); + //to seek a new position, move the fileChannel, and reposition the bufferedStream + bufferedStream.seek(position); } catch(IOException ex) { throw new ReviewedStingException("Index: unable to reposition of file channel of index file " + mFile); @@ -413,10 +455,10 @@ public class GATKBAMIndex { */ private long position() { try { - return fileChannel.position(); + return bufferedStream.position(); } catch (IOException exc) { throw new ReviewedStingException("Unable to read position from index file " + mFile, exc); } - } + } } diff --git a/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/SeekableBufferedStreamUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/SeekableBufferedStreamUnitTest.java new file mode 100644 index 000000000..4cb19d154 --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/SeekableBufferedStreamUnitTest.java @@ -0,0 +1,101 @@ +/* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.gatk.datasources.reads; + +import net.sf.samtools.seekablestream.SeekableBufferedStream; +import net.sf.samtools.seekablestream.SeekableFileStream; +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.exceptions.StingException; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; + +/** + * Test basic functionality in SeekableBufferedStream. + */ +public class SeekableBufferedStreamUnitTest extends BaseTest { + private static File InputFile = new File(validationDataLocation + "megabyteZeros.dat"); + + final private int BUFFERED_STREAM_BUFFER_SIZE = 100; + private byte buffer[] = new byte[BUFFERED_STREAM_BUFFER_SIZE * 10]; + + + @DataProvider(name = "BasicArgumentsDivisible") + public Integer[][] DivisableReads() { + return new Integer[][]{{1}, {4}, {5}, {10}, {20}, {50}, {100}}; + } + + @DataProvider(name = "BasicArgumentsIndivisibleAndSmall") + public Integer[][] InDivisableReadsSmall() { + return new Integer[][]{{3}, {11}, {31}, {51}, {77}, {99}}; + } + + @DataProvider(name = "BasicArgumentsIndivisibleYetLarge") + public Integer[][] InDivisableReadsLarge() { + return new Integer[][]{{101}, {151}, {205}, {251}, {301}}; + } + + + private void testReadsLength(int length) throws IOException { + final int READ_SIZE=100000; //file is 10^6, so make this smaller to be safe. + + SeekableFileStream fileStream = new SeekableFileStream(InputFile); + SeekableBufferedStream bufferedStream = new SeekableBufferedStream(fileStream, BUFFERED_STREAM_BUFFER_SIZE); + + for (int i = 0; i < READ_SIZE / length; ++i) { + Assert.assertEquals(bufferedStream.read(buffer, 0, length), length); + } + + } + + // These tests fail because SeekableBuffered stream may return _less_ than the amount you are asking for. + // make sure that you wrap reads with while-loops. If these test start failing (meaning that the reads work properly, + // the layer of protection built into GATKBamIndex can be removed. + + @Test(dataProvider = "BasicArgumentsIndivisibleAndSmall", enabled = true, expectedExceptions = java.lang.AssertionError.class) + public void testIndivisableSmallReadsFAIL(Integer readLength) throws IOException { + testReadsLength(readLength); + } + + //Evidently, if you ask for a read length that's larger than the inernal buffer, + //SeekableBufferedStreamdoes something else and gives you what you asked for + + @Test(dataProvider = "BasicArgumentsIndivisibleYetLarge", enabled = true) + public void testIndivisableLargeReadsPASS(Integer readLength) throws IOException { + testReadsLength(readLength); + } + + // if the readlength divides the buffer, there are no failures + @Test(dataProvider = "BasicArgumentsDivisible", enabled = true) + public void testDivisableReadsPASS(Integer readLength) throws IOException { + testReadsLength(readLength); + } + + +} diff --git a/public/java/test/org/broadinstitute/sting/gatk/walkers/qc/PileupWalkerIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/walkers/qc/PileupWalkerIntegrationTest.java index a6191802b..76654fb74 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/walkers/qc/PileupWalkerIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/walkers/qc/PileupWalkerIntegrationTest.java @@ -31,6 +31,9 @@ import org.testng.annotations.Test; import java.util.Arrays; public class PileupWalkerIntegrationTest extends WalkerTest { + String gatkSpeedupArgs="-T Pileup -I " + validationDataLocation + "NA12878.HiSeq.WGS.bwa.cleaned.recal.hg19.20.bam " + + "-R " + hg19Reference + " -o %s "; + @Test public void testGnarleyFHSPileup() { String gatk_args = "-T Pileup -I " + validationDataLocation + "FHS_Pileup_Test.bam " @@ -64,4 +67,31 @@ public class PileupWalkerIntegrationTest extends WalkerTest { WalkerTestSpec spec = new WalkerTestSpec(gatk_args, 1, Arrays.asList(SingleReadAligningOffChromosome1MD5)); executeTest("Testing single read spanning off chromosome 1 unindexed", spec); } + + /************************/ + + //testing speedup to GATKBAMIndex + + + @Test + public void testPileupOnLargeBamChr20(){ + WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:1-76,050", 1, Arrays.asList("8702701350de11a6d28204acefdc4775")); + executeTest("Testing single on big BAM at start of chromosome 20", spec); + } + @Test + public void testPileupOnLargeBamMid20(){ + WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:10,000,000-10,001,100", 1, Arrays.asList("818cf5a8229efe6f89fc1cd8145ccbe3")); + executeTest("Testing single on big BAM somewhere in chromosome 20", spec); + } + @Test + public void testPileupOnLargeBamEnd20(){ + WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:62,954,114-63,025,520", 1, Arrays.asList("22471ea4a12e5139aef62bf8ff2a5b63")); + executeTest("Testing single at end of chromosome 20", spec); + } + @Test + public void testPileupOnLargeBam20Many(){ + WalkerTestSpec spec = new WalkerTestSpec(gatkSpeedupArgs + "-L 20:1-76,050 -L 20:20,000,000-20,000,100 -L 20:40,000,000-40,000,100 -L 20:30,000,000-30,000,100 -L 20:50,000,000-50,000,100 -L 20:62,954,114-63,025,520 ", + 1, Arrays.asList("08d899ed7c5a76ef3947bf67338acda1")); + executeTest("Testing single on big BAM many places", spec); + } }