diff --git a/public/java/src/net/sf/samtools/BAMFileReader.java b/public/java/src/net/sf/samtools/BAMFileReader.java deleted file mode 100644 index 5005b6265..000000000 --- a/public/java/src/net/sf/samtools/BAMFileReader.java +++ /dev/null @@ -1,762 +0,0 @@ -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ -package net.sf.samtools; - - -import net.sf.samtools.util.*; -import net.sf.samtools.SAMFileReader.ValidationStringency; - -import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * Internal class for reading and querying BAM files. - */ -class BAMFileReader extends SAMFileReader.ReaderImplementation { - // True if reading from a File rather than an InputStream - private boolean mIsSeekable = false; - - // For converting bytes into other primitive types - private BinaryCodec mStream = null; - - // Underlying compressed data stream. - private final BAMInputStream mInputStream; - private SAMFileHeader mFileHeader = null; - - // Populated if the file is seekable and an index exists - private File mIndexFile; - private BAMIndex mIndex = null; - private long mFirstRecordPointer = 0; - private CloseableIterator mCurrentIterator = null; - - // If true, all SAMRecords are fully decoded as they are read. - private final boolean eagerDecode; - - // For error-checking. - private ValidationStringency mValidationStringency; - - // For creating BAMRecords - private SAMRecordFactory samRecordFactory; - - /** - * Use the caching index reader implementation rather than the disk-hit-per-file model. - */ - private boolean mEnableIndexCaching = false; - - /** - * Use the traditional memory-mapped implementation for BAM file indexes rather than regular I/O. - */ - private boolean mEnableIndexMemoryMapping = true; - - /** - * Add information about the origin (reader and position) to SAM records. - */ - private SAMFileReader mFileReader = null; - - /** - * Prepare to read BAM from a stream (not seekable) - * @param stream source of bytes. - * @param eagerDecode if true, decode all BAM fields as reading rather than lazily. - * @param validationStringency Controls how to handle invalidate reads or header lines. - */ - BAMFileReader(final InputStream stream, - final File indexFile, - final boolean eagerDecode, - final ValidationStringency validationStringency, - final SAMRecordFactory factory) - throws IOException { - mIndexFile = indexFile; - mIsSeekable = false; - mInputStream = stream instanceof BAMInputStream ? (BAMInputStream)stream : new BlockCompressedInputStream(stream); - mStream = new BinaryCodec(new DataInputStream((InputStream)mInputStream)); - this.eagerDecode = eagerDecode; - this.mValidationStringency = validationStringency; - this.samRecordFactory = factory; - readHeader(null); - } - - /** - * Prepare to read BAM from a file (seekable) - * @param file source of bytes. - * @param eagerDecode if true, decode all BAM fields as reading rather than lazily. - * @param validationStringency Controls how to handle invalidate reads or header lines. - */ - BAMFileReader(final File file, - final File indexFile, - final boolean eagerDecode, - final ValidationStringency validationStringency, - final SAMRecordFactory factory) - throws IOException { - this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : findIndexFile(file), eagerDecode, file.getAbsolutePath(), validationStringency, factory); - if (mIndexFile != null && mIndexFile.lastModified() < file.lastModified()) { - System.err.println("WARNING: BAM index file " + mIndexFile.getAbsolutePath() + - " is older than BAM " + file.getAbsolutePath()); - } - } - - BAMFileReader(final SeekableStream strm, - final File indexFile, - final boolean eagerDecode, - final ValidationStringency validationStringency, - final SAMRecordFactory factory) - throws IOException { - this(strm instanceof BAMInputStream ? (BAMInputStream)strm : new BlockCompressedInputStream(strm), - indexFile, - eagerDecode, - strm.getSource(), - validationStringency, - factory); - } - - private BAMFileReader(final BAMInputStream inputStream, - final File indexFile, - final boolean eagerDecode, - final String source, - final ValidationStringency validationStringency, - final SAMRecordFactory factory) - throws IOException { - mIndexFile = indexFile; - mIsSeekable = true; - mInputStream = inputStream; - mStream = new BinaryCodec(new DataInputStream((InputStream)inputStream)); - this.eagerDecode = eagerDecode; - this.mValidationStringency = validationStringency; - this.samRecordFactory = factory; - readHeader(source); - mFirstRecordPointer = inputStream.getFilePointer(); - } - - /** - * If true, writes the source of every read into the source SAMRecords. - * @param enabled true to write source information into each SAMRecord. - */ - void enableFileSource(final SAMFileReader reader, final boolean enabled) { - this.mFileReader = enabled ? reader : null; - } - - /** - * If true, uses the caching version of the index reader. - * @param enabled true to write source information into each SAMRecord. - */ - public void enableIndexCaching(final boolean enabled) { - if(mIndex != null) - throw new SAMException("Unable to turn on index caching; index file has already been loaded."); - this.mEnableIndexCaching = enabled; - } - - /** - * If false, disable the use of memory mapping for accessing index files (default behavior is to use memory mapping). - * This is slower but more scalable when accessing large numbers of BAM files sequentially. - * @param enabled True to use memory mapping, false to use regular I/O. - */ - public void enableIndexMemoryMapping(final boolean enabled) { - if (mIndex != null) { - throw new SAMException("Unable to change index memory mapping; index file has already been loaded."); - } - this.mEnableIndexMemoryMapping = enabled; - } - - @Override void enableCrcChecking(final boolean enabled) { - this.mInputStream.setCheckCrcs(enabled); - } - - @Override void setSAMRecordFactory(final SAMRecordFactory factory) { this.samRecordFactory = factory; } - - /** - * @return true if ths is a BAM file, and has an index - */ - public boolean hasIndex() { - return (mIndexFile != null); - } - - /** - * Retrieves the index for the given file type. Ensure that the index is of the specified type. - * @return An index of the given type. - */ - public BAMIndex getIndex() { - if(mIndexFile == null) - throw new SAMException("No index is available for this BAM file."); - if(mIndex == null) - mIndex = mEnableIndexCaching ? new CachingBAMFileIndex(mIndexFile, getFileHeader().getSequenceDictionary(), mEnableIndexMemoryMapping) - : new DiskBasedBAMFileIndex(mIndexFile, getFileHeader().getSequenceDictionary(), mEnableIndexMemoryMapping); - return mIndex; - } - - void close() { - if (mStream != null) { - mStream.close(); - } - if (mIndex != null) { - mIndex.close(); - } - mStream = null; - mFileHeader = null; - mIndex = null; - } - - SAMFileHeader getFileHeader() { - return mFileHeader; - } - - /** - * Set error-checking level for subsequent SAMRecord reads. - */ - void setValidationStringency(final SAMFileReader.ValidationStringency validationStringency) { - this.mValidationStringency = validationStringency; - } - - SAMFileReader.ValidationStringency getValidationStringency() { - return this.mValidationStringency; - } - - /** - * Prepare to iterate through the SAMRecords in file order. - * Only a single iterator on a BAM file can be extant at a time. If getIterator() or a query method has been called once, - * that iterator must be closed before getIterator() can be called again. - * A somewhat peculiar aspect of this method is that if the file is not seekable, a second call to - * getIterator() begins its iteration where the last one left off. That is the best that can be - * done in that situation. - */ - CloseableIterator getIterator() { - if (mStream == null) { - throw new IllegalStateException("File reader is closed"); - } - if (mCurrentIterator != null) { - throw new IllegalStateException("Iteration in progress"); - } - if (mIsSeekable) { - try { - mInputStream.seek(mFirstRecordPointer); - } catch (IOException exc) { - throw new RuntimeException(exc.getMessage(), exc); - } - } - mCurrentIterator = new BAMFileIterator(); - return mCurrentIterator; - } - - @Override - CloseableIterator getIterator(final SAMFileSpan chunks) { - if (mStream == null) { - throw new IllegalStateException("File reader is closed"); - } - if (mCurrentIterator != null) { - throw new IllegalStateException("Iteration in progress"); - } - if (!(chunks instanceof BAMFileSpan)) { - throw new IllegalStateException("BAMFileReader cannot handle this type of file span."); - } - - // Create an iterator over the given chunk boundaries. - mCurrentIterator = new BAMFileIndexIterator(((BAMFileSpan)chunks).toCoordinateArray()); - return mCurrentIterator; - } - - /** - * Gets an unbounded pointer to the first record in the BAM file. Because the reader doesn't necessarily know - * when the file ends, the rightmost bound of the file pointer will not end exactly where the file ends. However, - * the rightmost bound is guaranteed to be after the last read in the file. - * @return An unbounded pointer to the first record in the BAM file. - */ - @Override - SAMFileSpan getFilePointerSpanningReads() { - return new BAMFileSpan(new Chunk(mFirstRecordPointer,Long.MAX_VALUE)); - } - - /** - * Prepare to iterate through the SAMRecords that match the given interval. - * Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed - * before calling any of the methods that return an iterator. - * - * Note that an unmapped SAMRecord may still have a reference name and an alignment start for sorting - * purposes (typically this is the coordinate of its mate), and will be found by this method if the coordinate - * matches the specified interval. - * - * Note that this method is not necessarily efficient in terms of disk I/O. The index does not have perfect - * resolution, so some SAMRecords may be read and then discarded because they do not match the specified interval. - * - * @param sequence Reference sequence sought. - * @param start Desired SAMRecords must overlap or be contained in the interval specified by start and end. - * A value of zero implies the start of the reference sequence. - * @param end A value of zero implies the end of the reference sequence. - * @param contained If true, the alignments for the SAMRecords must be completely contained in the interval - * specified by start and end. If false, the SAMRecords need only overlap the interval. - * @return Iterator for the matching SAMRecords - */ - CloseableIterator query(final String sequence, final int start, final int end, final boolean contained) { - if (mStream == null) { - throw new IllegalStateException("File reader is closed"); - } - if (mCurrentIterator != null) { - throw new IllegalStateException("Iteration in progress"); - } - if (!mIsSeekable) { - throw new UnsupportedOperationException("Cannot query stream-based BAM file"); - } - mCurrentIterator = createIndexIterator(sequence, start, end, contained? QueryType.CONTAINED: QueryType.OVERLAPPING); - return mCurrentIterator; - } - - /** - * Prepare to iterate through the SAMRecords with the given alignment start. - * Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed - * before calling any of the methods that return an iterator. - * - * Note that an unmapped SAMRecord may still have a reference name and an alignment start for sorting - * purposes (typically this is the coordinate of its mate), and will be found by this method if the coordinate - * matches the specified interval. - * - * Note that this method is not necessarily efficient in terms of disk I/O. The index does not have perfect - * resolution, so some SAMRecords may be read and then discarded because they do not match the specified interval. - * - * @param sequence Reference sequence sought. - * @param start Alignment start sought. - * @return Iterator for the matching SAMRecords. - */ - CloseableIterator queryAlignmentStart(final String sequence, final int start) { - if (mStream == null) { - throw new IllegalStateException("File reader is closed"); - } - if (mCurrentIterator != null) { - throw new IllegalStateException("Iteration in progress"); - } - if (!mIsSeekable) { - throw new UnsupportedOperationException("Cannot query stream-based BAM file"); - } - mCurrentIterator = createIndexIterator(sequence, start, -1, QueryType.STARTING_AT); - return mCurrentIterator; - } - - public CloseableIterator queryUnmapped() { - if (mStream == null) { - throw new IllegalStateException("File reader is closed"); - } - if (mCurrentIterator != null) { - throw new IllegalStateException("Iteration in progress"); - } - if (!mIsSeekable) { - throw new UnsupportedOperationException("Cannot query stream-based BAM file"); - } - try { - final long startOfLastLinearBin = getIndex().getStartOfLastLinearBin(); - if (startOfLastLinearBin != -1) { - mInputStream.seek(startOfLastLinearBin); - } else { - // No mapped reads in file, just start at the first read in file. - mInputStream.seek(mFirstRecordPointer); - } - mCurrentIterator = new BAMFileIndexUnmappedIterator(); - return mCurrentIterator; - } catch (IOException e) { - throw new RuntimeException("IOException seeking to unmapped reads", e); - } - } - - /** - * Reads the header from the file or stream - * @param source Note that this is used only for reporting errors. - */ - private void readHeader(final String source) - throws IOException { - - final byte[] buffer = new byte[4]; - mStream.readBytes(buffer); - if (!Arrays.equals(buffer, BAMFileConstants.BAM_MAGIC)) { - throw new IOException("Invalid BAM file header"); - } - - final int headerTextLength = mStream.readInt(); - final String textHeader = mStream.readString(headerTextLength); - final SAMTextHeaderCodec headerCodec = new SAMTextHeaderCodec(); - headerCodec.setValidationStringency(mValidationStringency); - mFileHeader = headerCodec.decode(new StringLineReader(textHeader), - source); - - final int sequenceCount = mStream.readInt(); - if (mFileHeader.getSequenceDictionary().size() > 0) { - // It is allowed to have binary sequences but no text sequences, so only validate if both are present - if (sequenceCount != mFileHeader.getSequenceDictionary().size()) { - throw new SAMFormatException("Number of sequences in text header (" + - mFileHeader.getSequenceDictionary().size() + - ") != number of sequences in binary header (" + sequenceCount + ") for file " + source); - } - for (int i = 0; i < sequenceCount; i++) { - final SAMSequenceRecord binarySequenceRecord = readSequenceRecord(source); - final SAMSequenceRecord sequenceRecord = mFileHeader.getSequence(i); - if (!sequenceRecord.getSequenceName().equals(binarySequenceRecord.getSequenceName())) { - throw new SAMFormatException("For sequence " + i + ", text and binary have different names in file " + - source); - } - if (sequenceRecord.getSequenceLength() != binarySequenceRecord.getSequenceLength()) { - throw new SAMFormatException("For sequence " + i + ", text and binary have different lengths in file " + - source); - } - } - } else { - // If only binary sequences are present, copy them into mFileHeader - final List sequences = new ArrayList(sequenceCount); - for (int i = 0; i < sequenceCount; i++) { - sequences.add(readSequenceRecord(source)); - } - mFileHeader.setSequenceDictionary(new SAMSequenceDictionary(sequences)); - } - } - - /** - * Reads a single binary sequence record from the file or stream - * @param source Note that this is used only for reporting errors. - */ - private SAMSequenceRecord readSequenceRecord(final String source) { - final int nameLength = mStream.readInt(); - if (nameLength <= 1) { - throw new SAMFormatException("Invalid BAM file header: missing sequence name in file " + source); - } - final String sequenceName = mStream.readString(nameLength - 1); - // Skip the null terminator - mStream.readByte(); - final int sequenceLength = mStream.readInt(); - return new SAMSequenceRecord(SAMSequenceRecord.truncateSequenceName(sequenceName), sequenceLength); - } - - /** - * Iterator for non-indexed sequential iteration through all SAMRecords in file. - * Starting point of iteration is wherever current file position is when the iterator is constructed. - */ - private class BAMFileIterator implements CloseableIterator { - private SAMRecord mNextRecord = null; - private final BAMRecordCodec bamRecordCodec; - private long samRecordIndex = 0; // Records at what position (counted in records) we are at in the file - - BAMFileIterator() { - this(true); - } - - /** - * @param advance Trick to enable subclass to do more setup before advancing - */ - BAMFileIterator(final boolean advance) { - this.bamRecordCodec = new BAMRecordCodec(getFileHeader(), samRecordFactory); - this.bamRecordCodec.setInputStream(BAMFileReader.this.mStream.getInputStream()); - - if (advance) { - advance(); - } - } - - public void close() { - if (mCurrentIterator != null && this != mCurrentIterator) { - throw new IllegalStateException("Attempt to close non-current iterator"); - } - mCurrentIterator = null; - } - - public boolean hasNext() { - return (mNextRecord != null); - } - - public SAMRecord next() { - final SAMRecord result = mNextRecord; - advance(); - return result; - } - - public void remove() { - throw new UnsupportedOperationException("Not supported: remove"); - } - - void advance() { - try { - mNextRecord = getNextRecord(); - - if (mNextRecord != null) { - ++this.samRecordIndex; - // Because some decoding is done lazily, the record needs to remember the validation stringency. - mNextRecord.setValidationStringency(mValidationStringency); - - if (mValidationStringency != ValidationStringency.SILENT) { - final List validationErrors = mNextRecord.isValid(); - SAMUtils.processValidationErrors(validationErrors, - this.samRecordIndex, BAMFileReader.this.getValidationStringency()); - } - } - if (eagerDecode && mNextRecord != null) { - mNextRecord.eagerDecode(); - } - } catch (IOException exc) { - throw new RuntimeException(exc.getMessage(), exc); - } - } - - /** - * Read the next record from the input stream. - */ - SAMRecord getNextRecord() throws IOException { - final long startCoordinate = mInputStream.getFilePointer(); - final SAMRecord next = bamRecordCodec.decode(); - final long stopCoordinate = mInputStream.getFilePointer(); - - if(mFileReader != null && next != null) - next.setFileSource(new SAMFileSource(mFileReader,new BAMFileSpan(new Chunk(startCoordinate,stopCoordinate)))); - - return next; - } - - /** - * @return The record that will be return by the next call to next() - */ - protected SAMRecord peek() { - return mNextRecord; - } - } - - /** - * Prepare to iterate through SAMRecords matching the target interval. - * @param sequence Desired reference sequence. - * @param start 1-based start of target interval, inclusive. - * @param end 1-based end of target interval, inclusive. - * @param queryType contained, overlapping, or starting-at query. - */ - private CloseableIterator createIndexIterator(final String sequence, - final int start, - final int end, - final QueryType queryType) { - long[] filePointers = null; - - // Hit the index to determine the chunk boundaries for the required data. - final SAMFileHeader fileHeader = getFileHeader(); - final int referenceIndex = fileHeader.getSequenceIndex(sequence); - if (referenceIndex != -1) { - final BAMIndex fileIndex = getIndex(); - final BAMFileSpan fileSpan = fileIndex.getSpanOverlapping(referenceIndex, start, end); - filePointers = fileSpan != null ? fileSpan.toCoordinateArray() : null; - } - - // Create an iterator over the above chunk boundaries. - final BAMFileIndexIterator iterator = new BAMFileIndexIterator(filePointers); - - // Add some preprocessing filters for edge-case reads that don't fit into this - // query type. - return new BAMQueryFilteringIterator(iterator,sequence,start,end,queryType); - } - - enum QueryType {CONTAINED, OVERLAPPING, STARTING_AT} - - /** - * Look for BAM index file according to standard naming convention. - * - * @param dataFile BAM file name. - * @return Index file name, or null if not found. - */ - private static File findIndexFile(final File dataFile) { - // If input is foo.bam, look for foo.bai - final String bamExtension = ".bam"; - File indexFile; - final String fileName = dataFile.getName(); - if (fileName.endsWith(bamExtension)) { - final String bai = fileName.substring(0, fileName.length() - bamExtension.length()) + BAMIndex.BAMIndexSuffix; - indexFile = new File(dataFile.getParent(), bai); - if (indexFile.exists()) { - return indexFile; - } - } - - // If foo.bai doesn't exist look for foo.bam.bai - indexFile = new File(dataFile.getParent(), dataFile.getName() + ".bai"); - if (indexFile.exists()) { - return indexFile; - } else { - return null; - } - } - - private class BAMFileIndexIterator extends BAMFileIterator { - - private long[] mFilePointers = null; - private int mFilePointerIndex = 0; - private long mFilePointerLimit = -1; - - /** - * Prepare to iterate through SAMRecords stored in the specified compressed blocks at the given offset. - * @param filePointers the block / offset combination, stored in chunk format. - */ - BAMFileIndexIterator(final long[] filePointers) { - super(false); // delay advance() until after construction - mFilePointers = filePointers; - advance(); - } - - SAMRecord getNextRecord() - throws IOException { - // Advance to next file block if necessary - while (mInputStream.getFilePointer() >= mFilePointerLimit) { - if (mFilePointers == null || - mFilePointerIndex >= mFilePointers.length) { - return null; - } - final long startOffset = mFilePointers[mFilePointerIndex++]; - final long endOffset = mFilePointers[mFilePointerIndex++]; - mInputStream.seek(startOffset); - mFilePointerLimit = endOffset; - } - // Pull next record from stream - return super.getNextRecord(); - } - } - - /** - * A decorating iterator that filters out records that are outside the bounds of the - * given query parameters. - */ - private class BAMQueryFilteringIterator implements CloseableIterator { - /** - * The wrapped iterator. - */ - private final CloseableIterator wrappedIterator; - - /** - * The next record to be returned. Will be null if no such record exists. - */ - private SAMRecord mNextRecord; - - private final int mReferenceIndex; - private final int mRegionStart; - private final int mRegionEnd; - private final QueryType mQueryType; - - public BAMQueryFilteringIterator(final CloseableIterator iterator,final String sequence, final int start, final int end, final QueryType queryType) { - this.wrappedIterator = iterator; - final SAMFileHeader fileHeader = getFileHeader(); - mReferenceIndex = fileHeader.getSequenceIndex(sequence); - mRegionStart = start; - if (queryType == QueryType.STARTING_AT) { - mRegionEnd = mRegionStart; - } else { - mRegionEnd = (end <= 0) ? Integer.MAX_VALUE : end; - } - mQueryType = queryType; - mNextRecord = advance(); - } - - /** - * Returns true if a next element exists; false otherwise. - */ - public boolean hasNext() { - return mNextRecord != null; - } - - /** - * Gets the next record from the given iterator. - * @return The next SAM record in the iterator. - */ - public SAMRecord next() { - if(!hasNext()) - throw new NoSuchElementException("BAMQueryFilteringIterator: no next element available"); - final SAMRecord currentRead = mNextRecord; - mNextRecord = advance(); - return currentRead; - } - - /** - * Closes down the existing iterator. - */ - public void close() { - if (this != mCurrentIterator) { - throw new IllegalStateException("Attempt to close non-current iterator"); - } - mCurrentIterator = null; - } - - /** - * @throws UnsupportedOperationException always. - */ - public void remove() { - throw new UnsupportedOperationException("Not supported: remove"); - } - - SAMRecord advance() { - while (true) { - // Pull next record from stream - if(!wrappedIterator.hasNext()) - return null; - - final SAMRecord record = wrappedIterator.next(); - // If beyond the end of this reference sequence, end iteration - final int referenceIndex = record.getReferenceIndex(); - if (referenceIndex != mReferenceIndex) { - if (referenceIndex < 0 || - referenceIndex > mReferenceIndex) { - return null; - } - // If before this reference sequence, continue - continue; - } - if (mRegionStart == 0 && mRegionEnd == Integer.MAX_VALUE) { - // Quick exit to avoid expensive alignment end calculation - return record; - } - final int alignmentStart = record.getAlignmentStart(); - // If read is unmapped but has a coordinate, return it if the coordinate is within - // the query region, regardless of whether the mapped mate will be returned. - final int alignmentEnd; - if (mQueryType == QueryType.STARTING_AT) { - alignmentEnd = -1; - } else { - alignmentEnd = (record.getAlignmentEnd() != SAMRecord.NO_ALIGNMENT_START? - record.getAlignmentEnd(): alignmentStart); - } - - if (alignmentStart > mRegionEnd) { - // If scanned beyond target region, end iteration - return null; - } - // Filter for overlap with region - if (mQueryType == QueryType.CONTAINED) { - if (alignmentStart >= mRegionStart && alignmentEnd <= mRegionEnd) { - return record; - } - } else if (mQueryType == QueryType.OVERLAPPING) { - if (alignmentEnd >= mRegionStart && alignmentStart <= mRegionEnd) { - return record; - } - } else { - if (alignmentStart == mRegionStart) { - return record; - } - } - } - } - } - - private class BAMFileIndexUnmappedIterator extends BAMFileIterator { - private BAMFileIndexUnmappedIterator() { - while (this.hasNext() && peek().getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { - advance(); - } - } - } - -} diff --git a/public/java/src/net/sf/samtools/GATKChunk.java b/public/java/src/net/sf/samtools/GATKChunk.java index 5d349e72e..e9335a86d 100644 --- a/public/java/src/net/sf/samtools/GATKChunk.java +++ b/public/java/src/net/sf/samtools/GATKChunk.java @@ -40,6 +40,10 @@ public class GATKChunk extends Chunk { super(start,stop); } + public GATKChunk(final long blockStart, final int blockOffsetStart, final long blockEnd, final int blockOffsetEnd) { + super(blockStart << 16 | blockOffsetStart,blockEnd << 16 | blockOffsetEnd); + } + public GATKChunk(final Chunk chunk) { super(chunk.getChunkStart(),chunk.getChunkEnd()); } diff --git a/public/java/src/net/sf/samtools/PicardNamespaceUtils.java b/public/java/src/net/sf/samtools/PicardNamespaceUtils.java new file mode 100644 index 000000000..b645f8fdc --- /dev/null +++ b/public/java/src/net/sf/samtools/PicardNamespaceUtils.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2012, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package net.sf.samtools; + +/** + * Utils that insist on being in the same package as Picard. + */ +public class PicardNamespaceUtils { + /** + * Private constructor only. Do not instantiate. + */ + private PicardNamespaceUtils() {} + + public static void setFileSource(final SAMRecord read, final SAMFileSource fileSource) { + read.setFileSource(fileSource); + } +} diff --git a/public/java/src/net/sf/samtools/util/BAMInputStream.java b/public/java/src/net/sf/samtools/util/BAMInputStream.java deleted file mode 100644 index d825c23d5..000000000 --- a/public/java/src/net/sf/samtools/util/BAMInputStream.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -package net.sf.samtools.util; - -import java.io.IOException; - -/** - * An input stream formulated for use reading BAM files. Supports - */ -public interface BAMInputStream { - /** - * Seek to the given position in the file. Note that pos is a special virtual file pointer, - * not an actual byte offset. - * - * @param pos virtual file pointer - */ - public void seek(final long pos) throws IOException; - - /** - * @return virtual file pointer that can be passed to seek() to return to the current position. This is - * not an actual byte offset, so arithmetic on file pointers cannot be done to determine the distance between - * the two. - */ - public long getFilePointer(); - - /** - * Determines whether or not the inflater will re-calculated the CRC on the decompressed data - * and check it against the value stored in the GZIP header. CRC checking is an expensive - * operation and should be used accordingly. - */ - public void setCheckCrcs(final boolean check); - - public int read() throws java.io.IOException; - - public int read(byte[] bytes) throws java.io.IOException; - - public int read(byte[] bytes, int i, int i1) throws java.io.IOException; - - public long skip(long l) throws java.io.IOException; - - public int available() throws java.io.IOException; - - public void close() throws java.io.IOException; - - public void mark(int i); - - public void reset() throws java.io.IOException; - - public boolean markSupported(); -} diff --git a/public/java/src/net/sf/samtools/util/BlockCompressedInputStream.java b/public/java/src/net/sf/samtools/util/BlockCompressedInputStream.java deleted file mode 100755 index fae2fc89b..000000000 --- a/public/java/src/net/sf/samtools/util/BlockCompressedInputStream.java +++ /dev/null @@ -1,483 +0,0 @@ -/* - * The MIT License - * - * Copyright (c) 2009 The Broad Institute - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package net.sf.samtools.util; - - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; -import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Arrays; - -import net.sf.samtools.FileTruncatedException; - -/* - * Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream. - * It probably is not necessary to wrap this stream in a buffering stream, because there is internal buffering. - * The advantage of BGZF over conventional GZip format is that BGZF allows for seeking without having to read the - * entire file up to the location being sought. Note that seeking is only possible if the ctor(File) is used. - * - * c.f. http://samtools.sourceforge.net/SAM1.pdf for details of BGZF format - */ -public class BlockCompressedInputStream extends InputStream implements BAMInputStream { - private InputStream mStream = null; - private SeekableStream mFile = null; - private byte[] mFileBuffer = null; - private byte[] mCurrentBlock = null; - private int mCurrentOffset = 0; - private long mBlockAddress = 0; - private int mLastBlockLength = 0; - private final BlockGunzipper blockGunzipper = new BlockGunzipper(); - - - /** - * Note that seek() is not supported if this ctor is used. - */ - public BlockCompressedInputStream(final InputStream stream) { - mStream = IOUtil.toBufferedStream(stream); - mFile = null; - } - - /** - * Use this ctor if you wish to call seek() - */ - public BlockCompressedInputStream(final File file) - throws IOException { - mFile = new SeekableFileStream(file); - mStream = null; - - } - - public BlockCompressedInputStream(final URL url) { - mFile = new SeekableBufferedStream(new SeekableHTTPStream(url)); - mStream = null; - } - - /** - * For providing some arbitrary data source. No additional buffering is - * provided, so if the underlying source is not buffered, wrap it in a - * SeekableBufferedStream before passing to this ctor. - */ - public BlockCompressedInputStream(final SeekableStream strm) { - mFile = strm; - mStream = null; - } - - /** - * Determines whether or not the inflater will re-calculated the CRC on the decompressed data - * and check it against the value stored in the GZIP header. CRC checking is an expensive - * operation and should be used accordingly. - */ - public void setCheckCrcs(final boolean check) { - this.blockGunzipper.setCheckCrcs(check); - } - - /** - * @return the number of bytes that can be read (or skipped over) from this input stream without blocking by the - * next caller of a method for this input stream. The next caller might be the same thread or another thread. - * Note that although the next caller can read this many bytes without blocking, the available() method call itself - * may block in order to fill an internal buffer if it has been exhausted. - */ - public int available() - throws IOException { - if (mCurrentBlock == null || mCurrentOffset == mCurrentBlock.length) { - readBlock(); - } - if (mCurrentBlock == null) { - return 0; - } - return mCurrentBlock.length - mCurrentOffset; - } - - /** - * Closes the underlying InputStream or RandomAccessFile - */ - public void close() - throws IOException { - if (mFile != null) { - mFile.close(); - mFile = null; - } else if (mStream != null) { - mStream.close(); - mStream = null; - } - // Encourage garbage collection - mFileBuffer = null; - mCurrentBlock = null; - } - - /** - * Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. - * If no byte is available because the end of the stream has been reached, the value -1 is returned. - * This method blocks until input data is available, the end of the stream is detected, or an exception is thrown. - - * @return the next byte of data, or -1 if the end of the stream is reached. - */ - public int read() - throws IOException { - return (available() > 0) ? mCurrentBlock[mCurrentOffset++] : -1; - } - - /** - * Reads some number of bytes from the input stream and stores them into the buffer array b. The number of bytes - * actually read is returned as an integer. This method blocks until input data is available, end of file is detected, - * or an exception is thrown. - * - * read(buf) has the same effect as read(buf, 0, buf.length). - * - * @param buffer the buffer into which the data is read. - * @return the total number of bytes read into the buffer, or -1 is there is no more data because the end of - * the stream has been reached. - */ - public int read(final byte[] buffer) - throws IOException { - return read(buffer, 0, buffer.length); - } - - private volatile ByteArrayOutputStream buf = null; - private static final byte eol = '\n'; - private static final byte eolCr = '\r'; - - /** - * Reads a whole line. A line is considered to be terminated by either a line feed ('\n'), - * carriage return ('\r') or carriage return followed by a line feed ("\r\n"). - * - * @return A String containing the contents of the line, excluding the line terminating - * character, or null if the end of the stream has been reached - * - * @exception IOException If an I/O error occurs - * @ - */ - public String readLine() throws IOException { - int available = available(); - if (available == 0) { - return null; - } - if(null == buf){ // lazy initialisation - buf = new ByteArrayOutputStream(8192); - } - buf.reset(); - boolean done = false; - boolean foundCr = false; // \r found flag - while (!done) { - int linetmpPos = mCurrentOffset; - int bCnt = 0; - while((available-- > 0)){ - final byte c = mCurrentBlock[linetmpPos++]; - if(c == eol){ // found \n - done = true; - break; - } else if(foundCr){ // previous char was \r - --linetmpPos; // current char is not \n so put it back - done = true; - break; - } else if(c == eolCr){ // found \r - foundCr = true; - continue; // no ++bCnt - } - ++bCnt; - } - if(mCurrentOffset < linetmpPos){ - buf.write(mCurrentBlock, mCurrentOffset, bCnt); - mCurrentOffset = linetmpPos; - } - available = available(); - if(available == 0){ - // EOF - done = true; - } - } - return buf.toString(); - } - - /** - * Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read - * as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer. - * - * This method blocks until input data is available, end of file is detected, or an exception is thrown. - * - * @param buffer buffer into which data is read. - * @param offset the start offset in array b at which the data is written. - * @param length the maximum number of bytes to read. - * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of - * the stream has been reached. - */ - public int read(final byte[] buffer, int offset, int length) - throws IOException { - final int originalLength = length; - while (length > 0) { - final int available = available(); - if (available == 0) { - // Signal EOF to caller - if (originalLength == length) { - return -1; - } - break; - } - final int copyLength = Math.min(length, available); - System.arraycopy(mCurrentBlock, mCurrentOffset, buffer, offset, copyLength); - mCurrentOffset += copyLength; - offset += copyLength; - length -= copyLength; - } - return originalLength - length; - } - - /** - * Seek to the given position in the file. Note that pos is a special virtual file pointer, - * not an actual byte offset. - * - * @param pos virtual file pointer - */ - public void seek(final long pos) - throws IOException { - if (mFile == null) { - throw new IOException("Cannot seek on stream based file"); - } - // Decode virtual file pointer - // Upper 48 bits is the byte offset into the compressed stream of a block. - // Lower 16 bits is the byte offset into the uncompressed stream inside the block. - final long compressedOffset = BlockCompressedFilePointerUtil.getBlockAddress(pos); - final int uncompressedOffset = BlockCompressedFilePointerUtil.getBlockOffset(pos); - final int available; - if (mBlockAddress == compressedOffset && mCurrentBlock != null) { - available = mCurrentBlock.length; - } else { - mFile.seek(compressedOffset); - mBlockAddress = compressedOffset; - mLastBlockLength = 0; - readBlock(); - available = available(); - } - if (uncompressedOffset > available || - (uncompressedOffset == available && !eof())) { - throw new IOException("Invalid file pointer: " + pos); - } - mCurrentOffset = uncompressedOffset; - } - - private boolean eof() throws IOException { - if (mFile.eof()) { - return true; - } - // If the last remaining block is the size of the EMPTY_GZIP_BLOCK, this is the same as being at EOF. - return (mFile.length() - (mBlockAddress + mLastBlockLength) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - } - - /** - * @return virtual file pointer that can be passed to seek() to return to the current position. This is - * not an actual byte offset, so arithmetic on file pointers cannot be done to determine the distance between - * the two. - */ - public long getFilePointer() { - if (mCurrentOffset == mCurrentBlock.length) { - // If current offset is at the end of the current block, file pointer should point - // to the beginning of the next block. - return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress + mLastBlockLength, 0); - } - return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress, mCurrentOffset); - } - - public static long getFileBlock(final long bgzfOffset) { - return BlockCompressedFilePointerUtil.getBlockAddress(bgzfOffset); - } - - /** - * @param stream Must be at start of file. Throws RuntimeException if !stream.markSupported(). - * @return true if the given file looks like a valid BGZF file. - */ - public static boolean isValidFile(final InputStream stream) - throws IOException { - if (!stream.markSupported()) { - throw new RuntimeException("Cannot test non-buffered stream"); - } - stream.mark(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); - final byte[] buffer = new byte[BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH]; - final int count = readBytes(stream, buffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); - stream.reset(); - return count == BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH && isValidBlockHeader(buffer); - } - - private static boolean isValidBlockHeader(final byte[] buffer) { - return (buffer[0] == BlockCompressedStreamConstants.GZIP_ID1 && - (buffer[1] & 0xFF) == BlockCompressedStreamConstants.GZIP_ID2 && - (buffer[3] & BlockCompressedStreamConstants.GZIP_FLG) != 0 && - buffer[10] == BlockCompressedStreamConstants.GZIP_XLEN && - buffer[12] == BlockCompressedStreamConstants.BGZF_ID1 && - buffer[13] == BlockCompressedStreamConstants.BGZF_ID2); - } - - private void readBlock() - throws IOException { - - if (mFileBuffer == null) { - mFileBuffer = new byte[BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE]; - } - int count = readBytes(mFileBuffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); - if (count == 0) { - // Handle case where there is no empty gzip block at end. - mCurrentOffset = 0; - mBlockAddress += mLastBlockLength; - mCurrentBlock = new byte[0]; - return; - } - if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) { - throw new IOException("Premature end of file"); - } - final int blockLength = unpackInt16(mFileBuffer, BlockCompressedStreamConstants.BLOCK_LENGTH_OFFSET) + 1; - if (blockLength < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || blockLength > mFileBuffer.length) { - throw new IOException("Unexpected compressed block length: " + blockLength); - } - final int remaining = blockLength - BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH; - count = readBytes(mFileBuffer, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH, remaining); - if (count != remaining) { - throw new FileTruncatedException("Premature end of file"); - } - inflateBlock(mFileBuffer, blockLength); - mCurrentOffset = 0; - mBlockAddress += mLastBlockLength; - mLastBlockLength = blockLength; - } - - private void inflateBlock(final byte[] compressedBlock, final int compressedLength) - throws IOException { - final int uncompressedLength = unpackInt32(compressedBlock, compressedLength-4); - byte[] buffer = mCurrentBlock; - mCurrentBlock = null; - if (buffer == null || buffer.length != uncompressedLength) { - try { - buffer = new byte[uncompressedLength]; - } catch (NegativeArraySizeException e) { - throw new RuntimeException("BGZF file has invalid uncompressedLength: " + uncompressedLength, e); - } - } - blockGunzipper.unzipBlock(buffer, compressedBlock, compressedLength); - mCurrentBlock = buffer; - } - - private int readBytes(final byte[] buffer, final int offset, final int length) - throws IOException { - if (mFile != null) { - return readBytes(mFile, buffer, offset, length); - } else if (mStream != null) { - return readBytes(mStream, buffer, offset, length); - } else { - return 0; - } - } - - private static int readBytes(final SeekableStream file, final byte[] buffer, final int offset, final int length) - throws IOException { - int bytesRead = 0; - while (bytesRead < length) { - final int count = file.read(buffer, offset + bytesRead, length - bytesRead); - if (count <= 0) { - break; - } - bytesRead += count; - } - return bytesRead; - } - - private static int readBytes(final InputStream stream, final byte[] buffer, final int offset, final int length) - throws IOException { - int bytesRead = 0; - while (bytesRead < length) { - final int count = stream.read(buffer, offset + bytesRead, length - bytesRead); - if (count <= 0) { - break; - } - bytesRead += count; - } - return bytesRead; - } - - private int unpackInt16(final byte[] buffer, final int offset) { - return ((buffer[offset] & 0xFF) | - ((buffer[offset+1] & 0xFF) << 8)); - } - - private int unpackInt32(final byte[] buffer, final int offset) { - return ((buffer[offset] & 0xFF) | - ((buffer[offset+1] & 0xFF) << 8) | - ((buffer[offset+2] & 0xFF) << 16) | - ((buffer[offset+3] & 0xFF) << 24)); - } - - public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE} - - public static FileTermination checkTermination(final File file) - throws IOException { - final long fileSize = file.length(); - if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) { - return FileTermination.DEFECTIVE; - } - final RandomAccessFile raFile = new RandomAccessFile(file, "r"); - try { - raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length]; - raFile.readFully(buf); - if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { - return FileTermination.HAS_TERMINATOR_BLOCK; - } - final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); - buf = new byte[bufsize]; - raFile.seek(fileSize - bufsize); - raFile.read(buf); - for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; - i >= 0; --i) { - if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, - buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { - continue; - } - final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4); - byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; - if (buf.length - i == totalBlockSizeMinusOne + 1) { - return FileTermination.HAS_HEALTHY_LAST_BLOCK; - } else { - return FileTermination.DEFECTIVE; - } - } - return FileTermination.DEFECTIVE; - } finally { - raFile.close(); - } - } - - private static boolean preambleEqual(final byte[] preamble, final byte[] buf, final int startOffset, final int length) { - for (int i = 0; i < length; ++i) { - if (preamble[i] != buf[i + startOffset]) { - return false; - } - } - return true; - } -} - - diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BAMAccessPlan.java similarity index 58% rename from public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java rename to public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BAMAccessPlan.java index 0a6173c1e..164971365 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMReaderPosition.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BAMAccessPlan.java @@ -27,8 +27,10 @@ package org.broadinstitute.sting.gatk.datasources.reads; import net.sf.picard.util.PeekableIterator; import net.sf.samtools.GATKBAMFileSpan; import net.sf.samtools.GATKChunk; +import net.sf.samtools.util.BlockCompressedFilePointerUtil; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import java.util.LinkedList; import java.util.List; /** @@ -38,7 +40,7 @@ import java.util.List; * Time: 10:47 PM * To change this template use File | Settings | File Templates. */ -class SAMReaderPosition { +class BAMAccessPlan { private final SAMReaderID reader; private final BlockInputStream inputStream; @@ -51,7 +53,7 @@ class SAMReaderPosition { private long nextBlockAddress; - SAMReaderPosition(final SAMReaderID reader, final BlockInputStream inputStream, GATKBAMFileSpan fileSpan) { + BAMAccessPlan(final SAMReaderID reader, final BlockInputStream inputStream, GATKBAMFileSpan fileSpan) { this.reader = reader; this.inputStream = inputStream; @@ -84,11 +86,45 @@ class SAMReaderPosition { } /** - * Retrieves the last offset of interest in the block returned by getBlockAddress(). - * @return First block of interest in this segment. + * Gets the spans overlapping the given block; used to copy the contents of the block into the circular buffer. + * @param blockAddress Block address for which to search. + * @param filePosition Block address at which to terminate the last chunk if the last chunk goes beyond this span. + * @return list of chunks containing that block. */ - public int getLastOffsetInBlock() { - return (nextBlockAddress == positionIterator.peek().getBlockEnd()) ? positionIterator.peek().getBlockOffsetEnd() : 65536; + public List getSpansOverlappingBlock(long blockAddress, long filePosition) { + List spansOverlapping = new LinkedList(); + // While the position iterator overlaps the given block, pull out spans to report. + while(positionIterator.hasNext() && positionIterator.peek().getBlockStart() <= blockAddress) { + // Create a span over as much of the block as is covered by this chunk. + int blockOffsetStart = (blockAddress == positionIterator.peek().getBlockStart()) ? positionIterator.peek().getBlockOffsetStart() : 0; + + // Calculate the end of this span. If the span extends past this block, cap it using the current file position. + long blockEnd; + int blockOffsetEnd; + if(blockAddress < positionIterator.peek().getBlockEnd()) { + blockEnd = filePosition; + blockOffsetEnd = 0; + } + else { + blockEnd = positionIterator.peek().getBlockEnd(); + blockOffsetEnd = positionIterator.peek().getBlockOffsetEnd(); + } + + GATKChunk newChunk = new GATKChunk(blockAddress,blockOffsetStart,blockEnd,blockOffsetEnd); + + if(newChunk.getChunkStart() <= newChunk.getChunkEnd()) + spansOverlapping.add(new GATKChunk(blockAddress,blockOffsetStart,blockEnd,blockOffsetEnd)); + + // If the value currently stored in the position iterator ends past the current block, we must be done. Abort. + if(!positionIterator.hasNext() || positionIterator.peek().getBlockEnd() > blockAddress) + break; + + // If the position iterator ends before the block ends, pull the position iterator forward. + if(positionIterator.peek().getBlockEnd() <= blockAddress) + positionIterator.next(); + } + + return spansOverlapping; } public void reset() { @@ -111,20 +147,16 @@ class SAMReaderPosition { * @param filePosition The current position within the file. */ void advancePosition(final long filePosition) { - nextBlockAddress = filePosition >> 16; + nextBlockAddress = BlockCompressedFilePointerUtil.getBlockAddress(filePosition); // Check the current file position against the iterator; if the iterator is before the current file position, // draw the iterator forward. Remember when performing the check that coordinates are half-open! - while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) { + while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) positionIterator.next(); - // If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block. - if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart()) { - nextBlockAddress = positionIterator.peek().getBlockStart(); - //System.out.printf("SAMReaderPosition: next block address advanced to %d%n",nextBlockAddress); - break; - } - } + // If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block. + if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart()) + nextBlockAddress = positionIterator.peek().getBlockStart(); // If we've shot off the end of the block pointer, notify consumers that iteration is complete. if(!positionIterator.hasNext()) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BGZFBlockLoadingDispatcher.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BGZFBlockLoadingDispatcher.java index f468d2020..d75e91bf3 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BGZFBlockLoadingDispatcher.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BGZFBlockLoadingDispatcher.java @@ -44,12 +44,12 @@ public class BGZFBlockLoadingDispatcher { private final ExecutorService threadPool; - private final Queue inputQueue; + private final Queue inputQueue; public BGZFBlockLoadingDispatcher(final int numThreads, final int numFileHandles) { threadPool = Executors.newFixedThreadPool(numThreads); fileHandleCache = new FileHandleCache(numFileHandles); - inputQueue = new LinkedList(); + inputQueue = new LinkedList(); threadPool.execute(new BlockLoader(this,fileHandleCache,true)); } @@ -58,7 +58,7 @@ public class BGZFBlockLoadingDispatcher { * Initiates a request for a new block load. * @param readerPosition Position at which to load. */ - void queueBlockLoad(final SAMReaderPosition readerPosition) { + void queueBlockLoad(final BAMAccessPlan readerPosition) { synchronized(inputQueue) { inputQueue.add(readerPosition); inputQueue.notify(); @@ -69,7 +69,7 @@ public class BGZFBlockLoadingDispatcher { * Claims the next work request from the queue. * @return The next work request, or null if none is available. */ - SAMReaderPosition claimNextWorkRequest() { + BAMAccessPlan claimNextWorkRequest() { synchronized(inputQueue) { while(inputQueue.isEmpty()) { try { diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java index cb37bad31..fda5d818c 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockInputStream.java @@ -26,24 +26,21 @@ package org.broadinstitute.sting.gatk.datasources.reads; import net.sf.samtools.GATKBAMFileSpan; import net.sf.samtools.GATKChunk; -import net.sf.samtools.util.BAMInputStream; -import net.sf.samtools.util.BlockCompressedFilePointerUtil; import net.sf.samtools.util.BlockCompressedInputStream; -import net.sf.samtools.util.RuntimeEOFException; -import net.sf.samtools.util.SeekableStream; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; +import java.util.List; /** * Presents decompressed blocks to the SAMFileReader. */ -public class BlockInputStream extends SeekableStream implements BAMInputStream { +public class BlockInputStream extends InputStream { /** * Mechanism for triggering block loads. */ @@ -65,9 +62,9 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { private Throwable error; /** - * Current position. + * Current accessPlan. */ - private SAMReaderPosition position; + private BAMAccessPlan accessPlan; /** * A stream of compressed data blocks. @@ -94,11 +91,6 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { */ private final BlockCompressedInputStream validatingInputStream; - /** - * Has the buffer been filled since last request? - */ - private boolean bufferFilled = false; - /** * Create a new block presenting input stream with a dedicated buffer. * @param dispatcher the block loading messenger. @@ -118,7 +110,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { this.dispatcher = dispatcher; // TODO: Kill the region when all we want to do is start at the beginning of the stream and run to the end of the stream. - this.position = new SAMReaderPosition(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE))); + this.accessPlan = new BAMAccessPlan(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE))); // The block offsets / block positions guarantee that the ending offset/position in the data structure maps to // the point in the file just following the last read. These two arrays should never be empty; initializing @@ -151,7 +143,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { synchronized(lock) { // Find the current block within the input stream. int blockIndex; - for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() >= blockOffsets.get(blockIndex + 1); blockIndex++) + for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() > blockOffsets.get(blockIndex+1); blockIndex++) ; filePointer = blockPositions.get(blockIndex) + (buffer.position()-blockOffsets.get(blockIndex)); } @@ -164,51 +156,8 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { return filePointer; } - public void seek(long target) { - //System.out.printf("Thread %s, BlockInputStream %s: seeking to block %d, offset %d%n",Thread.currentThread().getId(),this,BlockCompressedFilePointerUtil.getBlockAddress(target),BlockCompressedFilePointerUtil.getBlockOffset(target)); - synchronized(lock) { - clearBuffers(); - - // Ensure that the position filled in by submitAccessPlan() is in sync with the seek target just specified. - position.advancePosition(target); - - // If the position advances past the end of the target, that must mean that we seeked to a point at the end - // of one of the chunk list's subregions. Make a note of our current position and punt on loading any data. - if(target < position.getBlockAddress() << 16) { - blockOffsets.clear(); - blockOffsets.add(0); - blockPositions.clear(); - blockPositions.add(target); - } - else { - waitForBufferFill(); - // A buffer fill will load the relevant data from the shard, but the buffer position still needs to be - // advanced as appropriate. - Iterator blockOffsetIterator = blockOffsets.descendingIterator(); - Iterator blockPositionIterator = blockPositions.descendingIterator(); - while(blockOffsetIterator.hasNext() && blockPositionIterator.hasNext()) { - final int blockOffset = blockOffsetIterator.next(); - final long blockPosition = blockPositionIterator.next(); - if((blockPosition >> 16) == (target >> 16) && (blockPosition&0xFFFF) < (target&0xFFFF)) { - buffer.position(blockOffset + (int)(target&0xFFFF)-(int)(blockPosition&0xFFFF)); - break; - } - } - } - - if(validatingInputStream != null) { - try { - validatingInputStream.seek(target); - } - catch(IOException ex) { - throw new ReviewedStingException("Unable to validate against Picard input stream",ex); - } - } - } - } - private void clearBuffers() { - this.position.reset(); + this.accessPlan.reset(); // Buffer semantics say that outside of a lock, buffer should always be prepared for reading. // Indicate no data to be read. @@ -225,29 +174,41 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { public boolean eof() { synchronized(lock) { // TODO: Handle multiple empty BGZF blocks at end of the file. - return position != null && (position.getBlockAddress() < 0 || position.getBlockAddress() >= length); + return accessPlan != null && (accessPlan.getBlockAddress() < 0 || accessPlan.getBlockAddress() >= length); } } - public void setCheckCrcs(final boolean check) { - // TODO: Implement - } - /** - * Submits a new access plan for the given dataset. - * @param position The next seek point for BAM data in this reader. + * Submits a new access plan for the given dataset and seeks to the given point. + * @param accessPlan The next seek point for BAM data in this reader. */ - public void submitAccessPlan(final SAMReaderPosition position) { + public void submitAccessPlan(final BAMAccessPlan accessPlan) { //System.out.printf("Thread %s: submitting access plan for block at position: %d%n",Thread.currentThread().getId(),position.getBlockAddress()); - synchronized(lock) { - // Assume that the access plan is going to tell us to start where we are and move forward. - // If this isn't the case, we'll soon receive a seek request and the buffer will be forced to reset. - if(this.position != null && position.getBlockAddress() < this.position.getBlockAddress()) - position.advancePosition(this.position.getBlockAddress() << 16); + this.accessPlan = accessPlan; + accessPlan.reset(); + + clearBuffers(); + + // Pull the iterator past any oddball chunks at the beginning of the shard (chunkEnd < chunkStart, empty chunks, etc). + // TODO: Don't pass these empty chunks in. + accessPlan.advancePosition(makeFilePointer(accessPlan.getBlockAddress(),0)); + + if(accessPlan.getBlockAddress() >= 0) { + waitForBufferFill(); } - this.position = position; + + if(validatingInputStream != null) { + try { + validatingInputStream.seek(makeFilePointer(accessPlan.getBlockAddress(),0)); + } + catch(IOException ex) { + throw new ReviewedStingException("Unable to validate against Picard input stream",ex); + } + } + } + private void compactBuffer() { // Compact buffer to maximize storage space. int bytesToRemove = 0; @@ -286,27 +247,14 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { * Push contents of incomingBuffer into the end of this buffer. * MUST be called from a thread that is NOT the reader thread. * @param incomingBuffer The data being pushed into this input stream. - * @param position target position for the data. + * @param accessPlan target access plan for the data. * @param filePosition the current position of the file pointer */ - public void copyIntoBuffer(final ByteBuffer incomingBuffer, final SAMReaderPosition position, final long filePosition) { + public void copyIntoBuffer(final ByteBuffer incomingBuffer, final BAMAccessPlan accessPlan, final long filePosition) { synchronized(lock) { try { - compactBuffer(); - // Open up the buffer for more reading. - buffer.limit(buffer.capacity()); - - // Advance the position to take the most recent read into account. - final long lastBlockAddress = position.getBlockAddress(); - final int blockOffsetStart = position.getFirstOffsetInBlock(); - final int blockOffsetEnd = position.getLastOffsetInBlock(); - - // Where did this read end? It either ended in the middle of a block (for a bounding chunk) or it ended at the start of the next block. - final long endOfRead = (blockOffsetEnd < incomingBuffer.remaining()) ? (lastBlockAddress << 16) | blockOffsetEnd : filePosition << 16; - - byte[] validBytes = null; if(validatingInputStream != null) { - validBytes = new byte[incomingBuffer.remaining()]; + byte[] validBytes = new byte[incomingBuffer.remaining()]; byte[] currentBytes = new byte[incomingBuffer.remaining()]; int pos = incomingBuffer.position(); @@ -317,7 +265,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { incomingBuffer.position(pos); long currentFilePointer = validatingInputStream.getFilePointer(); - validatingInputStream.seek(lastBlockAddress << 16); + validatingInputStream.seek(makeFilePointer(accessPlan.getBlockAddress(), 0)); validatingInputStream.read(validBytes); validatingInputStream.seek(currentFilePointer); @@ -325,33 +273,41 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { throw new ReviewedStingException(String.format("Bytes being inserted into BlockInputStream %s are incorrect",this)); } - this.position = position; - position.advancePosition(filePosition << 16); + compactBuffer(); + // Open up the buffer for more reading. + buffer.limit(buffer.capacity()); - if(buffer.remaining() < incomingBuffer.remaining()) { - //System.out.printf("Thread %s: waiting for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n",Thread.currentThread().getId(),buffer.remaining(),incomingBuffer.remaining()); + // Get the spans overlapping this particular block... + List spansOverlapping = accessPlan.getSpansOverlappingBlock(accessPlan.getBlockAddress(),filePosition); + + // ...and advance the block + this.accessPlan = accessPlan; + accessPlan.advancePosition(makeFilePointer(filePosition, 0)); + + if(buffer.remaining() < incomingBuffer.remaining()) lock.wait(); - //System.out.printf("Thread %s: waited for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n", Thread.currentThread().getId(), buffer.remaining(), incomingBuffer.remaining()); + + final int bytesInIncomingBuffer = incomingBuffer.limit(); + + for(GATKChunk spanOverlapping: spansOverlapping) { + // Clear out the endcap tracking state and add in the starting position for this transfer. + blockOffsets.removeLast(); + blockOffsets.add(buffer.position()); + blockPositions.removeLast(); + blockPositions.add(spanOverlapping.getChunkStart()); + + // Stream the buffer into the data stream. + incomingBuffer.limit((spanOverlapping.getBlockEnd() > spanOverlapping.getBlockStart()) ? bytesInIncomingBuffer : spanOverlapping.getBlockOffsetEnd()); + incomingBuffer.position(spanOverlapping.getBlockOffsetStart()); + buffer.put(incomingBuffer); + + // Add the endcap for this transfer. + blockOffsets.add(buffer.position()); + blockPositions.add(spanOverlapping.getChunkEnd()); } - // Remove the last position in the list and add in the last read position, in case the two are different. - blockOffsets.removeLast(); - blockOffsets.add(buffer.position()); - blockPositions.removeLast(); - blockPositions.add(lastBlockAddress << 16 | blockOffsetStart); - - // Stream the buffer into the data stream. - incomingBuffer.position(blockOffsetStart); - incomingBuffer.limit(Math.min(incomingBuffer.limit(),blockOffsetEnd)); - buffer.put(incomingBuffer); - - // Then, add the last position read to the very end of the list, just past the end of the last buffer. - blockOffsets.add(buffer.position()); - blockPositions.add(endOfRead); - // Set up the buffer for reading. buffer.flip(); - bufferFilled = true; lock.notify(); } @@ -447,12 +403,8 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { if(remaining < length) return length - remaining; - // Otherwise, if at eof(), return -1. - else if(eof()) - return -1; - - // Otherwise, we must've hit a bug in the system. - throw new ReviewedStingException("BUG: read returned no data, but eof() reports false."); + // Otherwise, return -1. + return -1; } public void close() { @@ -472,20 +424,26 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream { private void waitForBufferFill() { synchronized(lock) { - bufferFilled = false; if(buffer.remaining() == 0 && !eof()) { //System.out.printf("Thread %s is waiting for a buffer fill from position %d to buffer %s%n",Thread.currentThread().getId(),position.getBlockAddress(),this); - dispatcher.queueBlockLoad(position); + dispatcher.queueBlockLoad(accessPlan); try { lock.wait(); } catch(InterruptedException ex) { throw new ReviewedStingException("Interrupt occurred waiting for buffer to fill",ex); } - - if(bufferFilled && buffer.remaining() == 0) - throw new RuntimeEOFException("No more data left in InputStream"); } } } + + /** + * Create an encoded BAM file pointer given the address of a BGZF block and an offset. + * @param blockAddress Physical address on disk of a BGZF block. + * @param blockOffset Offset into the uncompressed data stored in the BGZF block. + * @return 64-bit pointer encoded according to the BAM spec. + */ + public static long makeFilePointer(final long blockAddress, final int blockOffset) { + return blockAddress << 16 | blockOffset; + } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockLoader.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockLoader.java index ab4299802..81a37e53c 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockLoader.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/BlockLoader.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011, The Broad Institute + * Copyright (c) 2012, The Broad Institute * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation @@ -70,29 +70,29 @@ class BlockLoader implements Runnable { public void run() { for(;;) { - SAMReaderPosition readerPosition = null; + BAMAccessPlan accessPlan = null; try { - readerPosition = dispatcher.claimNextWorkRequest(); - FileInputStream inputStream = fileHandleCache.claimFileInputStream(readerPosition.getReader()); + accessPlan = dispatcher.claimNextWorkRequest(); + FileInputStream inputStream = fileHandleCache.claimFileInputStream(accessPlan.getReader()); - long blockAddress = readerPosition.getBlockAddress(); + //long blockAddress = readerPosition.getBlockAddress(); //System.out.printf("Thread %s: BlockLoader: copying bytes from %s at position %d into %s%n",Thread.currentThread().getId(),inputStream,blockAddress,readerPosition.getInputStream()); - ByteBuffer compressedBlock = readBGZFBlock(inputStream,readerPosition.getBlockAddress()); + ByteBuffer compressedBlock = readBGZFBlock(inputStream,accessPlan.getBlockAddress()); long nextBlockAddress = position(inputStream); - fileHandleCache.releaseFileInputStream(readerPosition.getReader(),inputStream); + fileHandleCache.releaseFileInputStream(accessPlan.getReader(),inputStream); ByteBuffer block = decompress ? decompressBGZFBlock(compressedBlock) : compressedBlock; int bytesCopied = block.remaining(); - BlockInputStream bamInputStream = readerPosition.getInputStream(); - bamInputStream.copyIntoBuffer(block,readerPosition,nextBlockAddress); + BlockInputStream bamInputStream = accessPlan.getInputStream(); + bamInputStream.copyIntoBuffer(block,accessPlan,nextBlockAddress); //System.out.printf("Thread %s: BlockLoader: copied %d bytes from %s at position %d into %s%n",Thread.currentThread().getId(),bytesCopied,inputStream,blockAddress,readerPosition.getInputStream()); } catch(Throwable error) { - if(readerPosition != null && readerPosition.getInputStream() != null) - readerPosition.getInputStream().reportException(error); + if(accessPlan != null && accessPlan.getInputStream() != null) + accessPlan.getInputStream().reportException(error); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java index 8d73b1b15..96b55674a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java @@ -36,7 +36,7 @@ import java.util.Map; */ public class ReadShard extends Shard { /** - * What is the maximum number of reads which should go into a read shard. + * What is the maximum number of reads per BAM file which should go into a read shard. */ public static int MAX_READS = 10000; diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java index c040b53c4..a4681cffd 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java @@ -567,9 +567,14 @@ public class SAMDataSource { if(threadAllocation.getNumIOThreads() > 0) { BlockInputStream inputStream = readers.getInputStream(id); - inputStream.submitAccessPlan(new SAMReaderPosition(id,inputStream,(GATKBAMFileSpan)shard.getFileSpans().get(id))); + inputStream.submitAccessPlan(new BAMAccessPlan(id, inputStream, (GATKBAMFileSpan) shard.getFileSpans().get(id))); + BAMRecordCodec codec = new BAMRecordCodec(getHeader(id),factory); + codec.setInputStream(inputStream); + iterator = new BAMCodecIterator(inputStream,readers.getReader(id),codec); + } + else { + iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id)); } - iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id)); if(shard.getGenomeLocs().size() > 0) iterator = new IntervalOverlapFilteringIterator(iterator,shard.getGenomeLocs()); iteratorMap.put(readers.getReader(id), iterator); @@ -577,8 +582,6 @@ public class SAMDataSource { MergingSamRecordIterator mergingIterator = readers.createMergingIterator(iteratorMap); - - return applyDecoratingIterators(shard.getReadMetrics(), enableVerification, readProperties.useOriginalBaseQualities(), @@ -592,6 +595,49 @@ public class SAMDataSource { readProperties.defaultBaseQualities()); } + private class BAMCodecIterator implements CloseableIterator { + private final BlockInputStream inputStream; + private final SAMFileReader reader; + private final BAMRecordCodec codec; + private SAMRecord nextRead; + + private BAMCodecIterator(final BlockInputStream inputStream, final SAMFileReader reader, final BAMRecordCodec codec) { + this.inputStream = inputStream; + this.reader = reader; + this.codec = codec; + advance(); + } + + public boolean hasNext() { + return nextRead != null; + } + + public SAMRecord next() { + if(!hasNext()) + throw new NoSuchElementException("Unable to retrieve next record from BAMCodecIterator; input stream is empty"); + SAMRecord currentRead = nextRead; + advance(); + return currentRead; + } + + public void close() { + // NO-OP. + } + + public void remove() { + throw new UnsupportedOperationException("Unable to remove from BAMCodecIterator"); + } + + private void advance() { + final long startCoordinate = inputStream.getFilePointer(); + nextRead = codec.decode(); + final long stopCoordinate = inputStream.getFilePointer(); + + if(reader != null && nextRead != null) + PicardNamespaceUtils.setFileSource(nextRead,new SAMFileSource(reader,new GATKBAMFileSpan(new GATKChunk(startCoordinate,stopCoordinate)))); + } + } + /** * Filter reads based on user-specified criteria. * @@ -871,12 +917,9 @@ public class SAMDataSource { public ReaderInitializer call() { final File indexFile = findIndexFile(readerID.samFile); try { - if (threadAllocation.getNumIOThreads() > 0) { + if (threadAllocation.getNumIOThreads() > 0) blockInputStream = new BlockInputStream(dispatcher,readerID,false); - reader = new SAMFileReader(blockInputStream,indexFile,false); - } - else - reader = new SAMFileReader(readerID.samFile,indexFile,false); + reader = new SAMFileReader(readerID.samFile,indexFile,false); } catch ( RuntimeIOException e ) { if ( e.getCause() != null && e.getCause() instanceof FileNotFoundException ) throw new UserException.CouldNotReadInputFile(readerID.samFile, e);