Retiring Picard customizations for async I/O and cleaning up parts of the code to use common Picard utilities I recently discovered.
Also embedded bug fix for issues reading sparse shards and did some cleanup based on comments during BAM reading code transition meetings.
This commit is contained in:
parent
f596377e73
commit
5b58fe741a
|
|
@ -1,762 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package net.sf.samtools;
|
||||
|
||||
|
||||
import net.sf.samtools.util.*;
|
||||
import net.sf.samtools.SAMFileReader.ValidationStringency;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* Internal class for reading and querying BAM files.
|
||||
*/
|
||||
class BAMFileReader extends SAMFileReader.ReaderImplementation {
|
||||
// True if reading from a File rather than an InputStream
|
||||
private boolean mIsSeekable = false;
|
||||
|
||||
// For converting bytes into other primitive types
|
||||
private BinaryCodec mStream = null;
|
||||
|
||||
// Underlying compressed data stream.
|
||||
private final BAMInputStream mInputStream;
|
||||
private SAMFileHeader mFileHeader = null;
|
||||
|
||||
// Populated if the file is seekable and an index exists
|
||||
private File mIndexFile;
|
||||
private BAMIndex mIndex = null;
|
||||
private long mFirstRecordPointer = 0;
|
||||
private CloseableIterator<SAMRecord> mCurrentIterator = null;
|
||||
|
||||
// If true, all SAMRecords are fully decoded as they are read.
|
||||
private final boolean eagerDecode;
|
||||
|
||||
// For error-checking.
|
||||
private ValidationStringency mValidationStringency;
|
||||
|
||||
// For creating BAMRecords
|
||||
private SAMRecordFactory samRecordFactory;
|
||||
|
||||
/**
|
||||
* Use the caching index reader implementation rather than the disk-hit-per-file model.
|
||||
*/
|
||||
private boolean mEnableIndexCaching = false;
|
||||
|
||||
/**
|
||||
* Use the traditional memory-mapped implementation for BAM file indexes rather than regular I/O.
|
||||
*/
|
||||
private boolean mEnableIndexMemoryMapping = true;
|
||||
|
||||
/**
|
||||
* Add information about the origin (reader and position) to SAM records.
|
||||
*/
|
||||
private SAMFileReader mFileReader = null;
|
||||
|
||||
/**
|
||||
* Prepare to read BAM from a stream (not seekable)
|
||||
* @param stream source of bytes.
|
||||
* @param eagerDecode if true, decode all BAM fields as reading rather than lazily.
|
||||
* @param validationStringency Controls how to handle invalidate reads or header lines.
|
||||
*/
|
||||
BAMFileReader(final InputStream stream,
|
||||
final File indexFile,
|
||||
final boolean eagerDecode,
|
||||
final ValidationStringency validationStringency,
|
||||
final SAMRecordFactory factory)
|
||||
throws IOException {
|
||||
mIndexFile = indexFile;
|
||||
mIsSeekable = false;
|
||||
mInputStream = stream instanceof BAMInputStream ? (BAMInputStream)stream : new BlockCompressedInputStream(stream);
|
||||
mStream = new BinaryCodec(new DataInputStream((InputStream)mInputStream));
|
||||
this.eagerDecode = eagerDecode;
|
||||
this.mValidationStringency = validationStringency;
|
||||
this.samRecordFactory = factory;
|
||||
readHeader(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to read BAM from a file (seekable)
|
||||
* @param file source of bytes.
|
||||
* @param eagerDecode if true, decode all BAM fields as reading rather than lazily.
|
||||
* @param validationStringency Controls how to handle invalidate reads or header lines.
|
||||
*/
|
||||
BAMFileReader(final File file,
|
||||
final File indexFile,
|
||||
final boolean eagerDecode,
|
||||
final ValidationStringency validationStringency,
|
||||
final SAMRecordFactory factory)
|
||||
throws IOException {
|
||||
this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : findIndexFile(file), eagerDecode, file.getAbsolutePath(), validationStringency, factory);
|
||||
if (mIndexFile != null && mIndexFile.lastModified() < file.lastModified()) {
|
||||
System.err.println("WARNING: BAM index file " + mIndexFile.getAbsolutePath() +
|
||||
" is older than BAM " + file.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
BAMFileReader(final SeekableStream strm,
|
||||
final File indexFile,
|
||||
final boolean eagerDecode,
|
||||
final ValidationStringency validationStringency,
|
||||
final SAMRecordFactory factory)
|
||||
throws IOException {
|
||||
this(strm instanceof BAMInputStream ? (BAMInputStream)strm : new BlockCompressedInputStream(strm),
|
||||
indexFile,
|
||||
eagerDecode,
|
||||
strm.getSource(),
|
||||
validationStringency,
|
||||
factory);
|
||||
}
|
||||
|
||||
private BAMFileReader(final BAMInputStream inputStream,
|
||||
final File indexFile,
|
||||
final boolean eagerDecode,
|
||||
final String source,
|
||||
final ValidationStringency validationStringency,
|
||||
final SAMRecordFactory factory)
|
||||
throws IOException {
|
||||
mIndexFile = indexFile;
|
||||
mIsSeekable = true;
|
||||
mInputStream = inputStream;
|
||||
mStream = new BinaryCodec(new DataInputStream((InputStream)inputStream));
|
||||
this.eagerDecode = eagerDecode;
|
||||
this.mValidationStringency = validationStringency;
|
||||
this.samRecordFactory = factory;
|
||||
readHeader(source);
|
||||
mFirstRecordPointer = inputStream.getFilePointer();
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, writes the source of every read into the source SAMRecords.
|
||||
* @param enabled true to write source information into each SAMRecord.
|
||||
*/
|
||||
void enableFileSource(final SAMFileReader reader, final boolean enabled) {
|
||||
this.mFileReader = enabled ? reader : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, uses the caching version of the index reader.
|
||||
* @param enabled true to write source information into each SAMRecord.
|
||||
*/
|
||||
public void enableIndexCaching(final boolean enabled) {
|
||||
if(mIndex != null)
|
||||
throw new SAMException("Unable to turn on index caching; index file has already been loaded.");
|
||||
this.mEnableIndexCaching = enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* If false, disable the use of memory mapping for accessing index files (default behavior is to use memory mapping).
|
||||
* This is slower but more scalable when accessing large numbers of BAM files sequentially.
|
||||
* @param enabled True to use memory mapping, false to use regular I/O.
|
||||
*/
|
||||
public void enableIndexMemoryMapping(final boolean enabled) {
|
||||
if (mIndex != null) {
|
||||
throw new SAMException("Unable to change index memory mapping; index file has already been loaded.");
|
||||
}
|
||||
this.mEnableIndexMemoryMapping = enabled;
|
||||
}
|
||||
|
||||
@Override void enableCrcChecking(final boolean enabled) {
|
||||
this.mInputStream.setCheckCrcs(enabled);
|
||||
}
|
||||
|
||||
@Override void setSAMRecordFactory(final SAMRecordFactory factory) { this.samRecordFactory = factory; }
|
||||
|
||||
/**
|
||||
* @return true if ths is a BAM file, and has an index
|
||||
*/
|
||||
public boolean hasIndex() {
|
||||
return (mIndexFile != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the index for the given file type. Ensure that the index is of the specified type.
|
||||
* @return An index of the given type.
|
||||
*/
|
||||
public BAMIndex getIndex() {
|
||||
if(mIndexFile == null)
|
||||
throw new SAMException("No index is available for this BAM file.");
|
||||
if(mIndex == null)
|
||||
mIndex = mEnableIndexCaching ? new CachingBAMFileIndex(mIndexFile, getFileHeader().getSequenceDictionary(), mEnableIndexMemoryMapping)
|
||||
: new DiskBasedBAMFileIndex(mIndexFile, getFileHeader().getSequenceDictionary(), mEnableIndexMemoryMapping);
|
||||
return mIndex;
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (mStream != null) {
|
||||
mStream.close();
|
||||
}
|
||||
if (mIndex != null) {
|
||||
mIndex.close();
|
||||
}
|
||||
mStream = null;
|
||||
mFileHeader = null;
|
||||
mIndex = null;
|
||||
}
|
||||
|
||||
SAMFileHeader getFileHeader() {
|
||||
return mFileHeader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set error-checking level for subsequent SAMRecord reads.
|
||||
*/
|
||||
void setValidationStringency(final SAMFileReader.ValidationStringency validationStringency) {
|
||||
this.mValidationStringency = validationStringency;
|
||||
}
|
||||
|
||||
SAMFileReader.ValidationStringency getValidationStringency() {
|
||||
return this.mValidationStringency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to iterate through the SAMRecords in file order.
|
||||
* Only a single iterator on a BAM file can be extant at a time. If getIterator() or a query method has been called once,
|
||||
* that iterator must be closed before getIterator() can be called again.
|
||||
* A somewhat peculiar aspect of this method is that if the file is not seekable, a second call to
|
||||
* getIterator() begins its iteration where the last one left off. That is the best that can be
|
||||
* done in that situation.
|
||||
*/
|
||||
CloseableIterator<SAMRecord> getIterator() {
|
||||
if (mStream == null) {
|
||||
throw new IllegalStateException("File reader is closed");
|
||||
}
|
||||
if (mCurrentIterator != null) {
|
||||
throw new IllegalStateException("Iteration in progress");
|
||||
}
|
||||
if (mIsSeekable) {
|
||||
try {
|
||||
mInputStream.seek(mFirstRecordPointer);
|
||||
} catch (IOException exc) {
|
||||
throw new RuntimeException(exc.getMessage(), exc);
|
||||
}
|
||||
}
|
||||
mCurrentIterator = new BAMFileIterator();
|
||||
return mCurrentIterator;
|
||||
}
|
||||
|
||||
@Override
|
||||
CloseableIterator<SAMRecord> getIterator(final SAMFileSpan chunks) {
|
||||
if (mStream == null) {
|
||||
throw new IllegalStateException("File reader is closed");
|
||||
}
|
||||
if (mCurrentIterator != null) {
|
||||
throw new IllegalStateException("Iteration in progress");
|
||||
}
|
||||
if (!(chunks instanceof BAMFileSpan)) {
|
||||
throw new IllegalStateException("BAMFileReader cannot handle this type of file span.");
|
||||
}
|
||||
|
||||
// Create an iterator over the given chunk boundaries.
|
||||
mCurrentIterator = new BAMFileIndexIterator(((BAMFileSpan)chunks).toCoordinateArray());
|
||||
return mCurrentIterator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an unbounded pointer to the first record in the BAM file. Because the reader doesn't necessarily know
|
||||
* when the file ends, the rightmost bound of the file pointer will not end exactly where the file ends. However,
|
||||
* the rightmost bound is guaranteed to be after the last read in the file.
|
||||
* @return An unbounded pointer to the first record in the BAM file.
|
||||
*/
|
||||
@Override
|
||||
SAMFileSpan getFilePointerSpanningReads() {
|
||||
return new BAMFileSpan(new Chunk(mFirstRecordPointer,Long.MAX_VALUE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to iterate through the SAMRecords that match the given interval.
|
||||
* Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed
|
||||
* before calling any of the methods that return an iterator.
|
||||
*
|
||||
* Note that an unmapped SAMRecord may still have a reference name and an alignment start for sorting
|
||||
* purposes (typically this is the coordinate of its mate), and will be found by this method if the coordinate
|
||||
* matches the specified interval.
|
||||
*
|
||||
* Note that this method is not necessarily efficient in terms of disk I/O. The index does not have perfect
|
||||
* resolution, so some SAMRecords may be read and then discarded because they do not match the specified interval.
|
||||
*
|
||||
* @param sequence Reference sequence sought.
|
||||
* @param start Desired SAMRecords must overlap or be contained in the interval specified by start and end.
|
||||
* A value of zero implies the start of the reference sequence.
|
||||
* @param end A value of zero implies the end of the reference sequence.
|
||||
* @param contained If true, the alignments for the SAMRecords must be completely contained in the interval
|
||||
* specified by start and end. If false, the SAMRecords need only overlap the interval.
|
||||
* @return Iterator for the matching SAMRecords
|
||||
*/
|
||||
CloseableIterator<SAMRecord> query(final String sequence, final int start, final int end, final boolean contained) {
|
||||
if (mStream == null) {
|
||||
throw new IllegalStateException("File reader is closed");
|
||||
}
|
||||
if (mCurrentIterator != null) {
|
||||
throw new IllegalStateException("Iteration in progress");
|
||||
}
|
||||
if (!mIsSeekable) {
|
||||
throw new UnsupportedOperationException("Cannot query stream-based BAM file");
|
||||
}
|
||||
mCurrentIterator = createIndexIterator(sequence, start, end, contained? QueryType.CONTAINED: QueryType.OVERLAPPING);
|
||||
return mCurrentIterator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to iterate through the SAMRecords with the given alignment start.
|
||||
* Only a single iterator on a BAMFile can be extant at a time. The previous one must be closed
|
||||
* before calling any of the methods that return an iterator.
|
||||
*
|
||||
* Note that an unmapped SAMRecord may still have a reference name and an alignment start for sorting
|
||||
* purposes (typically this is the coordinate of its mate), and will be found by this method if the coordinate
|
||||
* matches the specified interval.
|
||||
*
|
||||
* Note that this method is not necessarily efficient in terms of disk I/O. The index does not have perfect
|
||||
* resolution, so some SAMRecords may be read and then discarded because they do not match the specified interval.
|
||||
*
|
||||
* @param sequence Reference sequence sought.
|
||||
* @param start Alignment start sought.
|
||||
* @return Iterator for the matching SAMRecords.
|
||||
*/
|
||||
CloseableIterator<SAMRecord> queryAlignmentStart(final String sequence, final int start) {
|
||||
if (mStream == null) {
|
||||
throw new IllegalStateException("File reader is closed");
|
||||
}
|
||||
if (mCurrentIterator != null) {
|
||||
throw new IllegalStateException("Iteration in progress");
|
||||
}
|
||||
if (!mIsSeekable) {
|
||||
throw new UnsupportedOperationException("Cannot query stream-based BAM file");
|
||||
}
|
||||
mCurrentIterator = createIndexIterator(sequence, start, -1, QueryType.STARTING_AT);
|
||||
return mCurrentIterator;
|
||||
}
|
||||
|
||||
public CloseableIterator<SAMRecord> queryUnmapped() {
|
||||
if (mStream == null) {
|
||||
throw new IllegalStateException("File reader is closed");
|
||||
}
|
||||
if (mCurrentIterator != null) {
|
||||
throw new IllegalStateException("Iteration in progress");
|
||||
}
|
||||
if (!mIsSeekable) {
|
||||
throw new UnsupportedOperationException("Cannot query stream-based BAM file");
|
||||
}
|
||||
try {
|
||||
final long startOfLastLinearBin = getIndex().getStartOfLastLinearBin();
|
||||
if (startOfLastLinearBin != -1) {
|
||||
mInputStream.seek(startOfLastLinearBin);
|
||||
} else {
|
||||
// No mapped reads in file, just start at the first read in file.
|
||||
mInputStream.seek(mFirstRecordPointer);
|
||||
}
|
||||
mCurrentIterator = new BAMFileIndexUnmappedIterator();
|
||||
return mCurrentIterator;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("IOException seeking to unmapped reads", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the header from the file or stream
|
||||
* @param source Note that this is used only for reporting errors.
|
||||
*/
|
||||
private void readHeader(final String source)
|
||||
throws IOException {
|
||||
|
||||
final byte[] buffer = new byte[4];
|
||||
mStream.readBytes(buffer);
|
||||
if (!Arrays.equals(buffer, BAMFileConstants.BAM_MAGIC)) {
|
||||
throw new IOException("Invalid BAM file header");
|
||||
}
|
||||
|
||||
final int headerTextLength = mStream.readInt();
|
||||
final String textHeader = mStream.readString(headerTextLength);
|
||||
final SAMTextHeaderCodec headerCodec = new SAMTextHeaderCodec();
|
||||
headerCodec.setValidationStringency(mValidationStringency);
|
||||
mFileHeader = headerCodec.decode(new StringLineReader(textHeader),
|
||||
source);
|
||||
|
||||
final int sequenceCount = mStream.readInt();
|
||||
if (mFileHeader.getSequenceDictionary().size() > 0) {
|
||||
// It is allowed to have binary sequences but no text sequences, so only validate if both are present
|
||||
if (sequenceCount != mFileHeader.getSequenceDictionary().size()) {
|
||||
throw new SAMFormatException("Number of sequences in text header (" +
|
||||
mFileHeader.getSequenceDictionary().size() +
|
||||
") != number of sequences in binary header (" + sequenceCount + ") for file " + source);
|
||||
}
|
||||
for (int i = 0; i < sequenceCount; i++) {
|
||||
final SAMSequenceRecord binarySequenceRecord = readSequenceRecord(source);
|
||||
final SAMSequenceRecord sequenceRecord = mFileHeader.getSequence(i);
|
||||
if (!sequenceRecord.getSequenceName().equals(binarySequenceRecord.getSequenceName())) {
|
||||
throw new SAMFormatException("For sequence " + i + ", text and binary have different names in file " +
|
||||
source);
|
||||
}
|
||||
if (sequenceRecord.getSequenceLength() != binarySequenceRecord.getSequenceLength()) {
|
||||
throw new SAMFormatException("For sequence " + i + ", text and binary have different lengths in file " +
|
||||
source);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If only binary sequences are present, copy them into mFileHeader
|
||||
final List<SAMSequenceRecord> sequences = new ArrayList<SAMSequenceRecord>(sequenceCount);
|
||||
for (int i = 0; i < sequenceCount; i++) {
|
||||
sequences.add(readSequenceRecord(source));
|
||||
}
|
||||
mFileHeader.setSequenceDictionary(new SAMSequenceDictionary(sequences));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a single binary sequence record from the file or stream
|
||||
* @param source Note that this is used only for reporting errors.
|
||||
*/
|
||||
private SAMSequenceRecord readSequenceRecord(final String source) {
|
||||
final int nameLength = mStream.readInt();
|
||||
if (nameLength <= 1) {
|
||||
throw new SAMFormatException("Invalid BAM file header: missing sequence name in file " + source);
|
||||
}
|
||||
final String sequenceName = mStream.readString(nameLength - 1);
|
||||
// Skip the null terminator
|
||||
mStream.readByte();
|
||||
final int sequenceLength = mStream.readInt();
|
||||
return new SAMSequenceRecord(SAMSequenceRecord.truncateSequenceName(sequenceName), sequenceLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterator for non-indexed sequential iteration through all SAMRecords in file.
|
||||
* Starting point of iteration is wherever current file position is when the iterator is constructed.
|
||||
*/
|
||||
private class BAMFileIterator implements CloseableIterator<SAMRecord> {
|
||||
private SAMRecord mNextRecord = null;
|
||||
private final BAMRecordCodec bamRecordCodec;
|
||||
private long samRecordIndex = 0; // Records at what position (counted in records) we are at in the file
|
||||
|
||||
BAMFileIterator() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param advance Trick to enable subclass to do more setup before advancing
|
||||
*/
|
||||
BAMFileIterator(final boolean advance) {
|
||||
this.bamRecordCodec = new BAMRecordCodec(getFileHeader(), samRecordFactory);
|
||||
this.bamRecordCodec.setInputStream(BAMFileReader.this.mStream.getInputStream());
|
||||
|
||||
if (advance) {
|
||||
advance();
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (mCurrentIterator != null && this != mCurrentIterator) {
|
||||
throw new IllegalStateException("Attempt to close non-current iterator");
|
||||
}
|
||||
mCurrentIterator = null;
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
return (mNextRecord != null);
|
||||
}
|
||||
|
||||
public SAMRecord next() {
|
||||
final SAMRecord result = mNextRecord;
|
||||
advance();
|
||||
return result;
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Not supported: remove");
|
||||
}
|
||||
|
||||
void advance() {
|
||||
try {
|
||||
mNextRecord = getNextRecord();
|
||||
|
||||
if (mNextRecord != null) {
|
||||
++this.samRecordIndex;
|
||||
// Because some decoding is done lazily, the record needs to remember the validation stringency.
|
||||
mNextRecord.setValidationStringency(mValidationStringency);
|
||||
|
||||
if (mValidationStringency != ValidationStringency.SILENT) {
|
||||
final List<SAMValidationError> validationErrors = mNextRecord.isValid();
|
||||
SAMUtils.processValidationErrors(validationErrors,
|
||||
this.samRecordIndex, BAMFileReader.this.getValidationStringency());
|
||||
}
|
||||
}
|
||||
if (eagerDecode && mNextRecord != null) {
|
||||
mNextRecord.eagerDecode();
|
||||
}
|
||||
} catch (IOException exc) {
|
||||
throw new RuntimeException(exc.getMessage(), exc);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the next record from the input stream.
|
||||
*/
|
||||
SAMRecord getNextRecord() throws IOException {
|
||||
final long startCoordinate = mInputStream.getFilePointer();
|
||||
final SAMRecord next = bamRecordCodec.decode();
|
||||
final long stopCoordinate = mInputStream.getFilePointer();
|
||||
|
||||
if(mFileReader != null && next != null)
|
||||
next.setFileSource(new SAMFileSource(mFileReader,new BAMFileSpan(new Chunk(startCoordinate,stopCoordinate))));
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The record that will be return by the next call to next()
|
||||
*/
|
||||
protected SAMRecord peek() {
|
||||
return mNextRecord;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to iterate through SAMRecords matching the target interval.
|
||||
* @param sequence Desired reference sequence.
|
||||
* @param start 1-based start of target interval, inclusive.
|
||||
* @param end 1-based end of target interval, inclusive.
|
||||
* @param queryType contained, overlapping, or starting-at query.
|
||||
*/
|
||||
private CloseableIterator<SAMRecord> createIndexIterator(final String sequence,
|
||||
final int start,
|
||||
final int end,
|
||||
final QueryType queryType) {
|
||||
long[] filePointers = null;
|
||||
|
||||
// Hit the index to determine the chunk boundaries for the required data.
|
||||
final SAMFileHeader fileHeader = getFileHeader();
|
||||
final int referenceIndex = fileHeader.getSequenceIndex(sequence);
|
||||
if (referenceIndex != -1) {
|
||||
final BAMIndex fileIndex = getIndex();
|
||||
final BAMFileSpan fileSpan = fileIndex.getSpanOverlapping(referenceIndex, start, end);
|
||||
filePointers = fileSpan != null ? fileSpan.toCoordinateArray() : null;
|
||||
}
|
||||
|
||||
// Create an iterator over the above chunk boundaries.
|
||||
final BAMFileIndexIterator iterator = new BAMFileIndexIterator(filePointers);
|
||||
|
||||
// Add some preprocessing filters for edge-case reads that don't fit into this
|
||||
// query type.
|
||||
return new BAMQueryFilteringIterator(iterator,sequence,start,end,queryType);
|
||||
}
|
||||
|
||||
enum QueryType {CONTAINED, OVERLAPPING, STARTING_AT}
|
||||
|
||||
/**
|
||||
* Look for BAM index file according to standard naming convention.
|
||||
*
|
||||
* @param dataFile BAM file name.
|
||||
* @return Index file name, or null if not found.
|
||||
*/
|
||||
private static File findIndexFile(final File dataFile) {
|
||||
// If input is foo.bam, look for foo.bai
|
||||
final String bamExtension = ".bam";
|
||||
File indexFile;
|
||||
final String fileName = dataFile.getName();
|
||||
if (fileName.endsWith(bamExtension)) {
|
||||
final String bai = fileName.substring(0, fileName.length() - bamExtension.length()) + BAMIndex.BAMIndexSuffix;
|
||||
indexFile = new File(dataFile.getParent(), bai);
|
||||
if (indexFile.exists()) {
|
||||
return indexFile;
|
||||
}
|
||||
}
|
||||
|
||||
// If foo.bai doesn't exist look for foo.bam.bai
|
||||
indexFile = new File(dataFile.getParent(), dataFile.getName() + ".bai");
|
||||
if (indexFile.exists()) {
|
||||
return indexFile;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private class BAMFileIndexIterator extends BAMFileIterator {
|
||||
|
||||
private long[] mFilePointers = null;
|
||||
private int mFilePointerIndex = 0;
|
||||
private long mFilePointerLimit = -1;
|
||||
|
||||
/**
|
||||
* Prepare to iterate through SAMRecords stored in the specified compressed blocks at the given offset.
|
||||
* @param filePointers the block / offset combination, stored in chunk format.
|
||||
*/
|
||||
BAMFileIndexIterator(final long[] filePointers) {
|
||||
super(false); // delay advance() until after construction
|
||||
mFilePointers = filePointers;
|
||||
advance();
|
||||
}
|
||||
|
||||
SAMRecord getNextRecord()
|
||||
throws IOException {
|
||||
// Advance to next file block if necessary
|
||||
while (mInputStream.getFilePointer() >= mFilePointerLimit) {
|
||||
if (mFilePointers == null ||
|
||||
mFilePointerIndex >= mFilePointers.length) {
|
||||
return null;
|
||||
}
|
||||
final long startOffset = mFilePointers[mFilePointerIndex++];
|
||||
final long endOffset = mFilePointers[mFilePointerIndex++];
|
||||
mInputStream.seek(startOffset);
|
||||
mFilePointerLimit = endOffset;
|
||||
}
|
||||
// Pull next record from stream
|
||||
return super.getNextRecord();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A decorating iterator that filters out records that are outside the bounds of the
|
||||
* given query parameters.
|
||||
*/
|
||||
private class BAMQueryFilteringIterator implements CloseableIterator<SAMRecord> {
|
||||
/**
|
||||
* The wrapped iterator.
|
||||
*/
|
||||
private final CloseableIterator<SAMRecord> wrappedIterator;
|
||||
|
||||
/**
|
||||
* The next record to be returned. Will be null if no such record exists.
|
||||
*/
|
||||
private SAMRecord mNextRecord;
|
||||
|
||||
private final int mReferenceIndex;
|
||||
private final int mRegionStart;
|
||||
private final int mRegionEnd;
|
||||
private final QueryType mQueryType;
|
||||
|
||||
public BAMQueryFilteringIterator(final CloseableIterator<SAMRecord> iterator,final String sequence, final int start, final int end, final QueryType queryType) {
|
||||
this.wrappedIterator = iterator;
|
||||
final SAMFileHeader fileHeader = getFileHeader();
|
||||
mReferenceIndex = fileHeader.getSequenceIndex(sequence);
|
||||
mRegionStart = start;
|
||||
if (queryType == QueryType.STARTING_AT) {
|
||||
mRegionEnd = mRegionStart;
|
||||
} else {
|
||||
mRegionEnd = (end <= 0) ? Integer.MAX_VALUE : end;
|
||||
}
|
||||
mQueryType = queryType;
|
||||
mNextRecord = advance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a next element exists; false otherwise.
|
||||
*/
|
||||
public boolean hasNext() {
|
||||
return mNextRecord != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next record from the given iterator.
|
||||
* @return The next SAM record in the iterator.
|
||||
*/
|
||||
public SAMRecord next() {
|
||||
if(!hasNext())
|
||||
throw new NoSuchElementException("BAMQueryFilteringIterator: no next element available");
|
||||
final SAMRecord currentRead = mNextRecord;
|
||||
mNextRecord = advance();
|
||||
return currentRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes down the existing iterator.
|
||||
*/
|
||||
public void close() {
|
||||
if (this != mCurrentIterator) {
|
||||
throw new IllegalStateException("Attempt to close non-current iterator");
|
||||
}
|
||||
mCurrentIterator = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always.
|
||||
*/
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Not supported: remove");
|
||||
}
|
||||
|
||||
SAMRecord advance() {
|
||||
while (true) {
|
||||
// Pull next record from stream
|
||||
if(!wrappedIterator.hasNext())
|
||||
return null;
|
||||
|
||||
final SAMRecord record = wrappedIterator.next();
|
||||
// If beyond the end of this reference sequence, end iteration
|
||||
final int referenceIndex = record.getReferenceIndex();
|
||||
if (referenceIndex != mReferenceIndex) {
|
||||
if (referenceIndex < 0 ||
|
||||
referenceIndex > mReferenceIndex) {
|
||||
return null;
|
||||
}
|
||||
// If before this reference sequence, continue
|
||||
continue;
|
||||
}
|
||||
if (mRegionStart == 0 && mRegionEnd == Integer.MAX_VALUE) {
|
||||
// Quick exit to avoid expensive alignment end calculation
|
||||
return record;
|
||||
}
|
||||
final int alignmentStart = record.getAlignmentStart();
|
||||
// If read is unmapped but has a coordinate, return it if the coordinate is within
|
||||
// the query region, regardless of whether the mapped mate will be returned.
|
||||
final int alignmentEnd;
|
||||
if (mQueryType == QueryType.STARTING_AT) {
|
||||
alignmentEnd = -1;
|
||||
} else {
|
||||
alignmentEnd = (record.getAlignmentEnd() != SAMRecord.NO_ALIGNMENT_START?
|
||||
record.getAlignmentEnd(): alignmentStart);
|
||||
}
|
||||
|
||||
if (alignmentStart > mRegionEnd) {
|
||||
// If scanned beyond target region, end iteration
|
||||
return null;
|
||||
}
|
||||
// Filter for overlap with region
|
||||
if (mQueryType == QueryType.CONTAINED) {
|
||||
if (alignmentStart >= mRegionStart && alignmentEnd <= mRegionEnd) {
|
||||
return record;
|
||||
}
|
||||
} else if (mQueryType == QueryType.OVERLAPPING) {
|
||||
if (alignmentEnd >= mRegionStart && alignmentStart <= mRegionEnd) {
|
||||
return record;
|
||||
}
|
||||
} else {
|
||||
if (alignmentStart == mRegionStart) {
|
||||
return record;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class BAMFileIndexUnmappedIterator extends BAMFileIterator {
|
||||
private BAMFileIndexUnmappedIterator() {
|
||||
while (this.hasNext() && peek().getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||
advance();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -40,6 +40,10 @@ public class GATKChunk extends Chunk {
|
|||
super(start,stop);
|
||||
}
|
||||
|
||||
public GATKChunk(final long blockStart, final int blockOffsetStart, final long blockEnd, final int blockOffsetEnd) {
|
||||
super(blockStart << 16 | blockOffsetStart,blockEnd << 16 | blockOffsetEnd);
|
||||
}
|
||||
|
||||
public GATKChunk(final Chunk chunk) {
|
||||
super(chunk.getChunkStart(),chunk.getChunkEnd());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package net.sf.samtools;
|
||||
|
||||
/**
|
||||
* Utils that insist on being in the same package as Picard.
|
||||
*/
|
||||
public class PicardNamespaceUtils {
|
||||
/**
|
||||
* Private constructor only. Do not instantiate.
|
||||
*/
|
||||
private PicardNamespaceUtils() {}
|
||||
|
||||
public static void setFileSource(final SAMRecord read, final SAMFileSource fileSource) {
|
||||
read.setFileSource(fileSource);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,72 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package net.sf.samtools.util;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An input stream formulated for use reading BAM files. Supports
|
||||
*/
|
||||
public interface BAMInputStream {
|
||||
/**
|
||||
* Seek to the given position in the file. Note that pos is a special virtual file pointer,
|
||||
* not an actual byte offset.
|
||||
*
|
||||
* @param pos virtual file pointer
|
||||
*/
|
||||
public void seek(final long pos) throws IOException;
|
||||
|
||||
/**
|
||||
* @return virtual file pointer that can be passed to seek() to return to the current position. This is
|
||||
* not an actual byte offset, so arithmetic on file pointers cannot be done to determine the distance between
|
||||
* the two.
|
||||
*/
|
||||
public long getFilePointer();
|
||||
|
||||
/**
|
||||
* Determines whether or not the inflater will re-calculated the CRC on the decompressed data
|
||||
* and check it against the value stored in the GZIP header. CRC checking is an expensive
|
||||
* operation and should be used accordingly.
|
||||
*/
|
||||
public void setCheckCrcs(final boolean check);
|
||||
|
||||
public int read() throws java.io.IOException;
|
||||
|
||||
public int read(byte[] bytes) throws java.io.IOException;
|
||||
|
||||
public int read(byte[] bytes, int i, int i1) throws java.io.IOException;
|
||||
|
||||
public long skip(long l) throws java.io.IOException;
|
||||
|
||||
public int available() throws java.io.IOException;
|
||||
|
||||
public void close() throws java.io.IOException;
|
||||
|
||||
public void mark(int i);
|
||||
|
||||
public void reset() throws java.io.IOException;
|
||||
|
||||
public boolean markSupported();
|
||||
}
|
||||
|
|
@ -1,483 +0,0 @@
|
|||
/*
|
||||
* The MIT License
|
||||
*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
* THE SOFTWARE.
|
||||
*/
|
||||
package net.sf.samtools.util;
|
||||
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Arrays;
|
||||
|
||||
import net.sf.samtools.FileTruncatedException;
|
||||
|
||||
/*
|
||||
* Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream.
|
||||
* It probably is not necessary to wrap this stream in a buffering stream, because there is internal buffering.
|
||||
* The advantage of BGZF over conventional GZip format is that BGZF allows for seeking without having to read the
|
||||
* entire file up to the location being sought. Note that seeking is only possible if the ctor(File) is used.
|
||||
*
|
||||
* c.f. http://samtools.sourceforge.net/SAM1.pdf for details of BGZF format
|
||||
*/
|
||||
public class BlockCompressedInputStream extends InputStream implements BAMInputStream {
|
||||
private InputStream mStream = null;
|
||||
private SeekableStream mFile = null;
|
||||
private byte[] mFileBuffer = null;
|
||||
private byte[] mCurrentBlock = null;
|
||||
private int mCurrentOffset = 0;
|
||||
private long mBlockAddress = 0;
|
||||
private int mLastBlockLength = 0;
|
||||
private final BlockGunzipper blockGunzipper = new BlockGunzipper();
|
||||
|
||||
|
||||
/**
|
||||
* Note that seek() is not supported if this ctor is used.
|
||||
*/
|
||||
public BlockCompressedInputStream(final InputStream stream) {
|
||||
mStream = IOUtil.toBufferedStream(stream);
|
||||
mFile = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this ctor if you wish to call seek()
|
||||
*/
|
||||
public BlockCompressedInputStream(final File file)
|
||||
throws IOException {
|
||||
mFile = new SeekableFileStream(file);
|
||||
mStream = null;
|
||||
|
||||
}
|
||||
|
||||
public BlockCompressedInputStream(final URL url) {
|
||||
mFile = new SeekableBufferedStream(new SeekableHTTPStream(url));
|
||||
mStream = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* For providing some arbitrary data source. No additional buffering is
|
||||
* provided, so if the underlying source is not buffered, wrap it in a
|
||||
* SeekableBufferedStream before passing to this ctor.
|
||||
*/
|
||||
public BlockCompressedInputStream(final SeekableStream strm) {
|
||||
mFile = strm;
|
||||
mStream = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether or not the inflater will re-calculated the CRC on the decompressed data
|
||||
* and check it against the value stored in the GZIP header. CRC checking is an expensive
|
||||
* operation and should be used accordingly.
|
||||
*/
|
||||
public void setCheckCrcs(final boolean check) {
|
||||
this.blockGunzipper.setCheckCrcs(check);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that can be read (or skipped over) from this input stream without blocking by the
|
||||
* next caller of a method for this input stream. The next caller might be the same thread or another thread.
|
||||
* Note that although the next caller can read this many bytes without blocking, the available() method call itself
|
||||
* may block in order to fill an internal buffer if it has been exhausted.
|
||||
*/
|
||||
public int available()
|
||||
throws IOException {
|
||||
if (mCurrentBlock == null || mCurrentOffset == mCurrentBlock.length) {
|
||||
readBlock();
|
||||
}
|
||||
if (mCurrentBlock == null) {
|
||||
return 0;
|
||||
}
|
||||
return mCurrentBlock.length - mCurrentOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the underlying InputStream or RandomAccessFile
|
||||
*/
|
||||
public void close()
|
||||
throws IOException {
|
||||
if (mFile != null) {
|
||||
mFile.close();
|
||||
mFile = null;
|
||||
} else if (mStream != null) {
|
||||
mStream.close();
|
||||
mStream = null;
|
||||
}
|
||||
// Encourage garbage collection
|
||||
mFileBuffer = null;
|
||||
mCurrentBlock = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255.
|
||||
* If no byte is available because the end of the stream has been reached, the value -1 is returned.
|
||||
* This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
|
||||
|
||||
* @return the next byte of data, or -1 if the end of the stream is reached.
|
||||
*/
|
||||
public int read()
|
||||
throws IOException {
|
||||
return (available() > 0) ? mCurrentBlock[mCurrentOffset++] : -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads some number of bytes from the input stream and stores them into the buffer array b. The number of bytes
|
||||
* actually read is returned as an integer. This method blocks until input data is available, end of file is detected,
|
||||
* or an exception is thrown.
|
||||
*
|
||||
* read(buf) has the same effect as read(buf, 0, buf.length).
|
||||
*
|
||||
* @param buffer the buffer into which the data is read.
|
||||
* @return the total number of bytes read into the buffer, or -1 is there is no more data because the end of
|
||||
* the stream has been reached.
|
||||
*/
|
||||
public int read(final byte[] buffer)
|
||||
throws IOException {
|
||||
return read(buffer, 0, buffer.length);
|
||||
}
|
||||
|
||||
private volatile ByteArrayOutputStream buf = null;
|
||||
private static final byte eol = '\n';
|
||||
private static final byte eolCr = '\r';
|
||||
|
||||
/**
|
||||
* Reads a whole line. A line is considered to be terminated by either a line feed ('\n'),
|
||||
* carriage return ('\r') or carriage return followed by a line feed ("\r\n").
|
||||
*
|
||||
* @return A String containing the contents of the line, excluding the line terminating
|
||||
* character, or null if the end of the stream has been reached
|
||||
*
|
||||
* @exception IOException If an I/O error occurs
|
||||
* @
|
||||
*/
|
||||
public String readLine() throws IOException {
|
||||
int available = available();
|
||||
if (available == 0) {
|
||||
return null;
|
||||
}
|
||||
if(null == buf){ // lazy initialisation
|
||||
buf = new ByteArrayOutputStream(8192);
|
||||
}
|
||||
buf.reset();
|
||||
boolean done = false;
|
||||
boolean foundCr = false; // \r found flag
|
||||
while (!done) {
|
||||
int linetmpPos = mCurrentOffset;
|
||||
int bCnt = 0;
|
||||
while((available-- > 0)){
|
||||
final byte c = mCurrentBlock[linetmpPos++];
|
||||
if(c == eol){ // found \n
|
||||
done = true;
|
||||
break;
|
||||
} else if(foundCr){ // previous char was \r
|
||||
--linetmpPos; // current char is not \n so put it back
|
||||
done = true;
|
||||
break;
|
||||
} else if(c == eolCr){ // found \r
|
||||
foundCr = true;
|
||||
continue; // no ++bCnt
|
||||
}
|
||||
++bCnt;
|
||||
}
|
||||
if(mCurrentOffset < linetmpPos){
|
||||
buf.write(mCurrentBlock, mCurrentOffset, bCnt);
|
||||
mCurrentOffset = linetmpPos;
|
||||
}
|
||||
available = available();
|
||||
if(available == 0){
|
||||
// EOF
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read
|
||||
* as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
|
||||
*
|
||||
* This method blocks until input data is available, end of file is detected, or an exception is thrown.
|
||||
*
|
||||
* @param buffer buffer into which data is read.
|
||||
* @param offset the start offset in array b at which the data is written.
|
||||
* @param length the maximum number of bytes to read.
|
||||
* @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of
|
||||
* the stream has been reached.
|
||||
*/
|
||||
public int read(final byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
final int originalLength = length;
|
||||
while (length > 0) {
|
||||
final int available = available();
|
||||
if (available == 0) {
|
||||
// Signal EOF to caller
|
||||
if (originalLength == length) {
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
final int copyLength = Math.min(length, available);
|
||||
System.arraycopy(mCurrentBlock, mCurrentOffset, buffer, offset, copyLength);
|
||||
mCurrentOffset += copyLength;
|
||||
offset += copyLength;
|
||||
length -= copyLength;
|
||||
}
|
||||
return originalLength - length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek to the given position in the file. Note that pos is a special virtual file pointer,
|
||||
* not an actual byte offset.
|
||||
*
|
||||
* @param pos virtual file pointer
|
||||
*/
|
||||
public void seek(final long pos)
|
||||
throws IOException {
|
||||
if (mFile == null) {
|
||||
throw new IOException("Cannot seek on stream based file");
|
||||
}
|
||||
// Decode virtual file pointer
|
||||
// Upper 48 bits is the byte offset into the compressed stream of a block.
|
||||
// Lower 16 bits is the byte offset into the uncompressed stream inside the block.
|
||||
final long compressedOffset = BlockCompressedFilePointerUtil.getBlockAddress(pos);
|
||||
final int uncompressedOffset = BlockCompressedFilePointerUtil.getBlockOffset(pos);
|
||||
final int available;
|
||||
if (mBlockAddress == compressedOffset && mCurrentBlock != null) {
|
||||
available = mCurrentBlock.length;
|
||||
} else {
|
||||
mFile.seek(compressedOffset);
|
||||
mBlockAddress = compressedOffset;
|
||||
mLastBlockLength = 0;
|
||||
readBlock();
|
||||
available = available();
|
||||
}
|
||||
if (uncompressedOffset > available ||
|
||||
(uncompressedOffset == available && !eof())) {
|
||||
throw new IOException("Invalid file pointer: " + pos);
|
||||
}
|
||||
mCurrentOffset = uncompressedOffset;
|
||||
}
|
||||
|
||||
private boolean eof() throws IOException {
|
||||
if (mFile.eof()) {
|
||||
return true;
|
||||
}
|
||||
// If the last remaining block is the size of the EMPTY_GZIP_BLOCK, this is the same as being at EOF.
|
||||
return (mFile.length() - (mBlockAddress + mLastBlockLength) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return virtual file pointer that can be passed to seek() to return to the current position. This is
|
||||
* not an actual byte offset, so arithmetic on file pointers cannot be done to determine the distance between
|
||||
* the two.
|
||||
*/
|
||||
public long getFilePointer() {
|
||||
if (mCurrentOffset == mCurrentBlock.length) {
|
||||
// If current offset is at the end of the current block, file pointer should point
|
||||
// to the beginning of the next block.
|
||||
return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress + mLastBlockLength, 0);
|
||||
}
|
||||
return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress, mCurrentOffset);
|
||||
}
|
||||
|
||||
public static long getFileBlock(final long bgzfOffset) {
|
||||
return BlockCompressedFilePointerUtil.getBlockAddress(bgzfOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param stream Must be at start of file. Throws RuntimeException if !stream.markSupported().
|
||||
* @return true if the given file looks like a valid BGZF file.
|
||||
*/
|
||||
public static boolean isValidFile(final InputStream stream)
|
||||
throws IOException {
|
||||
if (!stream.markSupported()) {
|
||||
throw new RuntimeException("Cannot test non-buffered stream");
|
||||
}
|
||||
stream.mark(BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
|
||||
final byte[] buffer = new byte[BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH];
|
||||
final int count = readBytes(stream, buffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
|
||||
stream.reset();
|
||||
return count == BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH && isValidBlockHeader(buffer);
|
||||
}
|
||||
|
||||
private static boolean isValidBlockHeader(final byte[] buffer) {
|
||||
return (buffer[0] == BlockCompressedStreamConstants.GZIP_ID1 &&
|
||||
(buffer[1] & 0xFF) == BlockCompressedStreamConstants.GZIP_ID2 &&
|
||||
(buffer[3] & BlockCompressedStreamConstants.GZIP_FLG) != 0 &&
|
||||
buffer[10] == BlockCompressedStreamConstants.GZIP_XLEN &&
|
||||
buffer[12] == BlockCompressedStreamConstants.BGZF_ID1 &&
|
||||
buffer[13] == BlockCompressedStreamConstants.BGZF_ID2);
|
||||
}
|
||||
|
||||
private void readBlock()
|
||||
throws IOException {
|
||||
|
||||
if (mFileBuffer == null) {
|
||||
mFileBuffer = new byte[BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE];
|
||||
}
|
||||
int count = readBytes(mFileBuffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH);
|
||||
if (count == 0) {
|
||||
// Handle case where there is no empty gzip block at end.
|
||||
mCurrentOffset = 0;
|
||||
mBlockAddress += mLastBlockLength;
|
||||
mCurrentBlock = new byte[0];
|
||||
return;
|
||||
}
|
||||
if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) {
|
||||
throw new IOException("Premature end of file");
|
||||
}
|
||||
final int blockLength = unpackInt16(mFileBuffer, BlockCompressedStreamConstants.BLOCK_LENGTH_OFFSET) + 1;
|
||||
if (blockLength < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || blockLength > mFileBuffer.length) {
|
||||
throw new IOException("Unexpected compressed block length: " + blockLength);
|
||||
}
|
||||
final int remaining = blockLength - BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH;
|
||||
count = readBytes(mFileBuffer, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH, remaining);
|
||||
if (count != remaining) {
|
||||
throw new FileTruncatedException("Premature end of file");
|
||||
}
|
||||
inflateBlock(mFileBuffer, blockLength);
|
||||
mCurrentOffset = 0;
|
||||
mBlockAddress += mLastBlockLength;
|
||||
mLastBlockLength = blockLength;
|
||||
}
|
||||
|
||||
private void inflateBlock(final byte[] compressedBlock, final int compressedLength)
|
||||
throws IOException {
|
||||
final int uncompressedLength = unpackInt32(compressedBlock, compressedLength-4);
|
||||
byte[] buffer = mCurrentBlock;
|
||||
mCurrentBlock = null;
|
||||
if (buffer == null || buffer.length != uncompressedLength) {
|
||||
try {
|
||||
buffer = new byte[uncompressedLength];
|
||||
} catch (NegativeArraySizeException e) {
|
||||
throw new RuntimeException("BGZF file has invalid uncompressedLength: " + uncompressedLength, e);
|
||||
}
|
||||
}
|
||||
blockGunzipper.unzipBlock(buffer, compressedBlock, compressedLength);
|
||||
mCurrentBlock = buffer;
|
||||
}
|
||||
|
||||
private int readBytes(final byte[] buffer, final int offset, final int length)
|
||||
throws IOException {
|
||||
if (mFile != null) {
|
||||
return readBytes(mFile, buffer, offset, length);
|
||||
} else if (mStream != null) {
|
||||
return readBytes(mStream, buffer, offset, length);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static int readBytes(final SeekableStream file, final byte[] buffer, final int offset, final int length)
|
||||
throws IOException {
|
||||
int bytesRead = 0;
|
||||
while (bytesRead < length) {
|
||||
final int count = file.read(buffer, offset + bytesRead, length - bytesRead);
|
||||
if (count <= 0) {
|
||||
break;
|
||||
}
|
||||
bytesRead += count;
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
private static int readBytes(final InputStream stream, final byte[] buffer, final int offset, final int length)
|
||||
throws IOException {
|
||||
int bytesRead = 0;
|
||||
while (bytesRead < length) {
|
||||
final int count = stream.read(buffer, offset + bytesRead, length - bytesRead);
|
||||
if (count <= 0) {
|
||||
break;
|
||||
}
|
||||
bytesRead += count;
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
private int unpackInt16(final byte[] buffer, final int offset) {
|
||||
return ((buffer[offset] & 0xFF) |
|
||||
((buffer[offset+1] & 0xFF) << 8));
|
||||
}
|
||||
|
||||
private int unpackInt32(final byte[] buffer, final int offset) {
|
||||
return ((buffer[offset] & 0xFF) |
|
||||
((buffer[offset+1] & 0xFF) << 8) |
|
||||
((buffer[offset+2] & 0xFF) << 16) |
|
||||
((buffer[offset+3] & 0xFF) << 24));
|
||||
}
|
||||
|
||||
public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE}
|
||||
|
||||
public static FileTermination checkTermination(final File file)
|
||||
throws IOException {
|
||||
final long fileSize = file.length();
|
||||
if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) {
|
||||
return FileTermination.DEFECTIVE;
|
||||
}
|
||||
final RandomAccessFile raFile = new RandomAccessFile(file, "r");
|
||||
try {
|
||||
raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
|
||||
byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length];
|
||||
raFile.readFully(buf);
|
||||
if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) {
|
||||
return FileTermination.HAS_TERMINATOR_BLOCK;
|
||||
}
|
||||
final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
|
||||
buf = new byte[bufsize];
|
||||
raFile.seek(fileSize - bufsize);
|
||||
raFile.read(buf);
|
||||
for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
|
||||
i >= 0; --i) {
|
||||
if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE,
|
||||
buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) {
|
||||
continue;
|
||||
}
|
||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4);
|
||||
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF;
|
||||
if (buf.length - i == totalBlockSizeMinusOne + 1) {
|
||||
return FileTermination.HAS_HEALTHY_LAST_BLOCK;
|
||||
} else {
|
||||
return FileTermination.DEFECTIVE;
|
||||
}
|
||||
}
|
||||
return FileTermination.DEFECTIVE;
|
||||
} finally {
|
||||
raFile.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean preambleEqual(final byte[] preamble, final byte[] buf, final int startOffset, final int length) {
|
||||
for (int i = 0; i < length; ++i) {
|
||||
if (preamble[i] != buf[i + startOffset]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -27,8 +27,10 @@ package org.broadinstitute.sting.gatk.datasources.reads;
|
|||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.GATKBAMFileSpan;
|
||||
import net.sf.samtools.GATKChunk;
|
||||
import net.sf.samtools.util.BlockCompressedFilePointerUtil;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
|
@ -38,7 +40,7 @@ import java.util.List;
|
|||
* Time: 10:47 PM
|
||||
* To change this template use File | Settings | File Templates.
|
||||
*/
|
||||
class SAMReaderPosition {
|
||||
class BAMAccessPlan {
|
||||
private final SAMReaderID reader;
|
||||
private final BlockInputStream inputStream;
|
||||
|
||||
|
|
@ -51,7 +53,7 @@ class SAMReaderPosition {
|
|||
private long nextBlockAddress;
|
||||
|
||||
|
||||
SAMReaderPosition(final SAMReaderID reader, final BlockInputStream inputStream, GATKBAMFileSpan fileSpan) {
|
||||
BAMAccessPlan(final SAMReaderID reader, final BlockInputStream inputStream, GATKBAMFileSpan fileSpan) {
|
||||
this.reader = reader;
|
||||
this.inputStream = inputStream;
|
||||
|
||||
|
|
@ -84,11 +86,45 @@ class SAMReaderPosition {
|
|||
}
|
||||
|
||||
/**
|
||||
* Retrieves the last offset of interest in the block returned by getBlockAddress().
|
||||
* @return First block of interest in this segment.
|
||||
* Gets the spans overlapping the given block; used to copy the contents of the block into the circular buffer.
|
||||
* @param blockAddress Block address for which to search.
|
||||
* @param filePosition Block address at which to terminate the last chunk if the last chunk goes beyond this span.
|
||||
* @return list of chunks containing that block.
|
||||
*/
|
||||
public int getLastOffsetInBlock() {
|
||||
return (nextBlockAddress == positionIterator.peek().getBlockEnd()) ? positionIterator.peek().getBlockOffsetEnd() : 65536;
|
||||
public List<GATKChunk> getSpansOverlappingBlock(long blockAddress, long filePosition) {
|
||||
List<GATKChunk> spansOverlapping = new LinkedList<GATKChunk>();
|
||||
// While the position iterator overlaps the given block, pull out spans to report.
|
||||
while(positionIterator.hasNext() && positionIterator.peek().getBlockStart() <= blockAddress) {
|
||||
// Create a span over as much of the block as is covered by this chunk.
|
||||
int blockOffsetStart = (blockAddress == positionIterator.peek().getBlockStart()) ? positionIterator.peek().getBlockOffsetStart() : 0;
|
||||
|
||||
// Calculate the end of this span. If the span extends past this block, cap it using the current file position.
|
||||
long blockEnd;
|
||||
int blockOffsetEnd;
|
||||
if(blockAddress < positionIterator.peek().getBlockEnd()) {
|
||||
blockEnd = filePosition;
|
||||
blockOffsetEnd = 0;
|
||||
}
|
||||
else {
|
||||
blockEnd = positionIterator.peek().getBlockEnd();
|
||||
blockOffsetEnd = positionIterator.peek().getBlockOffsetEnd();
|
||||
}
|
||||
|
||||
GATKChunk newChunk = new GATKChunk(blockAddress,blockOffsetStart,blockEnd,blockOffsetEnd);
|
||||
|
||||
if(newChunk.getChunkStart() <= newChunk.getChunkEnd())
|
||||
spansOverlapping.add(new GATKChunk(blockAddress,blockOffsetStart,blockEnd,blockOffsetEnd));
|
||||
|
||||
// If the value currently stored in the position iterator ends past the current block, we must be done. Abort.
|
||||
if(!positionIterator.hasNext() || positionIterator.peek().getBlockEnd() > blockAddress)
|
||||
break;
|
||||
|
||||
// If the position iterator ends before the block ends, pull the position iterator forward.
|
||||
if(positionIterator.peek().getBlockEnd() <= blockAddress)
|
||||
positionIterator.next();
|
||||
}
|
||||
|
||||
return spansOverlapping;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
|
|
@ -111,20 +147,16 @@ class SAMReaderPosition {
|
|||
* @param filePosition The current position within the file.
|
||||
*/
|
||||
void advancePosition(final long filePosition) {
|
||||
nextBlockAddress = filePosition >> 16;
|
||||
nextBlockAddress = BlockCompressedFilePointerUtil.getBlockAddress(filePosition);
|
||||
|
||||
// Check the current file position against the iterator; if the iterator is before the current file position,
|
||||
// draw the iterator forward. Remember when performing the check that coordinates are half-open!
|
||||
while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek())) {
|
||||
while(positionIterator.hasNext() && isFilePositionPastEndOfChunk(filePosition,positionIterator.peek()))
|
||||
positionIterator.next();
|
||||
|
||||
// If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block.
|
||||
if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart()) {
|
||||
nextBlockAddress = positionIterator.peek().getBlockStart();
|
||||
//System.out.printf("SAMReaderPosition: next block address advanced to %d%n",nextBlockAddress);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// If the block iterator has shot past the file pointer, bring the file pointer flush with the start of the current block.
|
||||
if(positionIterator.hasNext() && filePosition < positionIterator.peek().getChunkStart())
|
||||
nextBlockAddress = positionIterator.peek().getBlockStart();
|
||||
|
||||
// If we've shot off the end of the block pointer, notify consumers that iteration is complete.
|
||||
if(!positionIterator.hasNext())
|
||||
|
|
@ -44,12 +44,12 @@ public class BGZFBlockLoadingDispatcher {
|
|||
|
||||
private final ExecutorService threadPool;
|
||||
|
||||
private final Queue<SAMReaderPosition> inputQueue;
|
||||
private final Queue<BAMAccessPlan> inputQueue;
|
||||
|
||||
public BGZFBlockLoadingDispatcher(final int numThreads, final int numFileHandles) {
|
||||
threadPool = Executors.newFixedThreadPool(numThreads);
|
||||
fileHandleCache = new FileHandleCache(numFileHandles);
|
||||
inputQueue = new LinkedList<SAMReaderPosition>();
|
||||
inputQueue = new LinkedList<BAMAccessPlan>();
|
||||
|
||||
threadPool.execute(new BlockLoader(this,fileHandleCache,true));
|
||||
}
|
||||
|
|
@ -58,7 +58,7 @@ public class BGZFBlockLoadingDispatcher {
|
|||
* Initiates a request for a new block load.
|
||||
* @param readerPosition Position at which to load.
|
||||
*/
|
||||
void queueBlockLoad(final SAMReaderPosition readerPosition) {
|
||||
void queueBlockLoad(final BAMAccessPlan readerPosition) {
|
||||
synchronized(inputQueue) {
|
||||
inputQueue.add(readerPosition);
|
||||
inputQueue.notify();
|
||||
|
|
@ -69,7 +69,7 @@ public class BGZFBlockLoadingDispatcher {
|
|||
* Claims the next work request from the queue.
|
||||
* @return The next work request, or null if none is available.
|
||||
*/
|
||||
SAMReaderPosition claimNextWorkRequest() {
|
||||
BAMAccessPlan claimNextWorkRequest() {
|
||||
synchronized(inputQueue) {
|
||||
while(inputQueue.isEmpty()) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -26,24 +26,21 @@ package org.broadinstitute.sting.gatk.datasources.reads;
|
|||
|
||||
import net.sf.samtools.GATKBAMFileSpan;
|
||||
import net.sf.samtools.GATKChunk;
|
||||
import net.sf.samtools.util.BAMInputStream;
|
||||
import net.sf.samtools.util.BlockCompressedFilePointerUtil;
|
||||
import net.sf.samtools.util.BlockCompressedInputStream;
|
||||
import net.sf.samtools.util.RuntimeEOFException;
|
||||
import net.sf.samtools.util.SeekableStream;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Presents decompressed blocks to the SAMFileReader.
|
||||
*/
|
||||
public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
||||
public class BlockInputStream extends InputStream {
|
||||
/**
|
||||
* Mechanism for triggering block loads.
|
||||
*/
|
||||
|
|
@ -65,9 +62,9 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
private Throwable error;
|
||||
|
||||
/**
|
||||
* Current position.
|
||||
* Current accessPlan.
|
||||
*/
|
||||
private SAMReaderPosition position;
|
||||
private BAMAccessPlan accessPlan;
|
||||
|
||||
/**
|
||||
* A stream of compressed data blocks.
|
||||
|
|
@ -94,11 +91,6 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
*/
|
||||
private final BlockCompressedInputStream validatingInputStream;
|
||||
|
||||
/**
|
||||
* Has the buffer been filled since last request?
|
||||
*/
|
||||
private boolean bufferFilled = false;
|
||||
|
||||
/**
|
||||
* Create a new block presenting input stream with a dedicated buffer.
|
||||
* @param dispatcher the block loading messenger.
|
||||
|
|
@ -118,7 +110,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
|
||||
this.dispatcher = dispatcher;
|
||||
// TODO: Kill the region when all we want to do is start at the beginning of the stream and run to the end of the stream.
|
||||
this.position = new SAMReaderPosition(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE)));
|
||||
this.accessPlan = new BAMAccessPlan(reader,this,new GATKBAMFileSpan(new GATKChunk(0,Long.MAX_VALUE)));
|
||||
|
||||
// The block offsets / block positions guarantee that the ending offset/position in the data structure maps to
|
||||
// the point in the file just following the last read. These two arrays should never be empty; initializing
|
||||
|
|
@ -151,7 +143,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
synchronized(lock) {
|
||||
// Find the current block within the input stream.
|
||||
int blockIndex;
|
||||
for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() >= blockOffsets.get(blockIndex + 1); blockIndex++)
|
||||
for(blockIndex = 0; blockIndex+1 < blockOffsets.size() && buffer.position() > blockOffsets.get(blockIndex+1); blockIndex++)
|
||||
;
|
||||
filePointer = blockPositions.get(blockIndex) + (buffer.position()-blockOffsets.get(blockIndex));
|
||||
}
|
||||
|
|
@ -164,51 +156,8 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
return filePointer;
|
||||
}
|
||||
|
||||
public void seek(long target) {
|
||||
//System.out.printf("Thread %s, BlockInputStream %s: seeking to block %d, offset %d%n",Thread.currentThread().getId(),this,BlockCompressedFilePointerUtil.getBlockAddress(target),BlockCompressedFilePointerUtil.getBlockOffset(target));
|
||||
synchronized(lock) {
|
||||
clearBuffers();
|
||||
|
||||
// Ensure that the position filled in by submitAccessPlan() is in sync with the seek target just specified.
|
||||
position.advancePosition(target);
|
||||
|
||||
// If the position advances past the end of the target, that must mean that we seeked to a point at the end
|
||||
// of one of the chunk list's subregions. Make a note of our current position and punt on loading any data.
|
||||
if(target < position.getBlockAddress() << 16) {
|
||||
blockOffsets.clear();
|
||||
blockOffsets.add(0);
|
||||
blockPositions.clear();
|
||||
blockPositions.add(target);
|
||||
}
|
||||
else {
|
||||
waitForBufferFill();
|
||||
// A buffer fill will load the relevant data from the shard, but the buffer position still needs to be
|
||||
// advanced as appropriate.
|
||||
Iterator<Integer> blockOffsetIterator = blockOffsets.descendingIterator();
|
||||
Iterator<Long> blockPositionIterator = blockPositions.descendingIterator();
|
||||
while(blockOffsetIterator.hasNext() && blockPositionIterator.hasNext()) {
|
||||
final int blockOffset = blockOffsetIterator.next();
|
||||
final long blockPosition = blockPositionIterator.next();
|
||||
if((blockPosition >> 16) == (target >> 16) && (blockPosition&0xFFFF) < (target&0xFFFF)) {
|
||||
buffer.position(blockOffset + (int)(target&0xFFFF)-(int)(blockPosition&0xFFFF));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(validatingInputStream != null) {
|
||||
try {
|
||||
validatingInputStream.seek(target);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to validate against Picard input stream",ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void clearBuffers() {
|
||||
this.position.reset();
|
||||
this.accessPlan.reset();
|
||||
|
||||
// Buffer semantics say that outside of a lock, buffer should always be prepared for reading.
|
||||
// Indicate no data to be read.
|
||||
|
|
@ -225,29 +174,41 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
public boolean eof() {
|
||||
synchronized(lock) {
|
||||
// TODO: Handle multiple empty BGZF blocks at end of the file.
|
||||
return position != null && (position.getBlockAddress() < 0 || position.getBlockAddress() >= length);
|
||||
return accessPlan != null && (accessPlan.getBlockAddress() < 0 || accessPlan.getBlockAddress() >= length);
|
||||
}
|
||||
}
|
||||
|
||||
public void setCheckCrcs(final boolean check) {
|
||||
// TODO: Implement
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a new access plan for the given dataset.
|
||||
* @param position The next seek point for BAM data in this reader.
|
||||
* Submits a new access plan for the given dataset and seeks to the given point.
|
||||
* @param accessPlan The next seek point for BAM data in this reader.
|
||||
*/
|
||||
public void submitAccessPlan(final SAMReaderPosition position) {
|
||||
public void submitAccessPlan(final BAMAccessPlan accessPlan) {
|
||||
//System.out.printf("Thread %s: submitting access plan for block at position: %d%n",Thread.currentThread().getId(),position.getBlockAddress());
|
||||
synchronized(lock) {
|
||||
// Assume that the access plan is going to tell us to start where we are and move forward.
|
||||
// If this isn't the case, we'll soon receive a seek request and the buffer will be forced to reset.
|
||||
if(this.position != null && position.getBlockAddress() < this.position.getBlockAddress())
|
||||
position.advancePosition(this.position.getBlockAddress() << 16);
|
||||
this.accessPlan = accessPlan;
|
||||
accessPlan.reset();
|
||||
|
||||
clearBuffers();
|
||||
|
||||
// Pull the iterator past any oddball chunks at the beginning of the shard (chunkEnd < chunkStart, empty chunks, etc).
|
||||
// TODO: Don't pass these empty chunks in.
|
||||
accessPlan.advancePosition(makeFilePointer(accessPlan.getBlockAddress(),0));
|
||||
|
||||
if(accessPlan.getBlockAddress() >= 0) {
|
||||
waitForBufferFill();
|
||||
}
|
||||
this.position = position;
|
||||
|
||||
if(validatingInputStream != null) {
|
||||
try {
|
||||
validatingInputStream.seek(makeFilePointer(accessPlan.getBlockAddress(),0));
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to validate against Picard input stream",ex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void compactBuffer() {
|
||||
// Compact buffer to maximize storage space.
|
||||
int bytesToRemove = 0;
|
||||
|
|
@ -286,27 +247,14 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
* Push contents of incomingBuffer into the end of this buffer.
|
||||
* MUST be called from a thread that is NOT the reader thread.
|
||||
* @param incomingBuffer The data being pushed into this input stream.
|
||||
* @param position target position for the data.
|
||||
* @param accessPlan target access plan for the data.
|
||||
* @param filePosition the current position of the file pointer
|
||||
*/
|
||||
public void copyIntoBuffer(final ByteBuffer incomingBuffer, final SAMReaderPosition position, final long filePosition) {
|
||||
public void copyIntoBuffer(final ByteBuffer incomingBuffer, final BAMAccessPlan accessPlan, final long filePosition) {
|
||||
synchronized(lock) {
|
||||
try {
|
||||
compactBuffer();
|
||||
// Open up the buffer for more reading.
|
||||
buffer.limit(buffer.capacity());
|
||||
|
||||
// Advance the position to take the most recent read into account.
|
||||
final long lastBlockAddress = position.getBlockAddress();
|
||||
final int blockOffsetStart = position.getFirstOffsetInBlock();
|
||||
final int blockOffsetEnd = position.getLastOffsetInBlock();
|
||||
|
||||
// Where did this read end? It either ended in the middle of a block (for a bounding chunk) or it ended at the start of the next block.
|
||||
final long endOfRead = (blockOffsetEnd < incomingBuffer.remaining()) ? (lastBlockAddress << 16) | blockOffsetEnd : filePosition << 16;
|
||||
|
||||
byte[] validBytes = null;
|
||||
if(validatingInputStream != null) {
|
||||
validBytes = new byte[incomingBuffer.remaining()];
|
||||
byte[] validBytes = new byte[incomingBuffer.remaining()];
|
||||
|
||||
byte[] currentBytes = new byte[incomingBuffer.remaining()];
|
||||
int pos = incomingBuffer.position();
|
||||
|
|
@ -317,7 +265,7 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
incomingBuffer.position(pos);
|
||||
|
||||
long currentFilePointer = validatingInputStream.getFilePointer();
|
||||
validatingInputStream.seek(lastBlockAddress << 16);
|
||||
validatingInputStream.seek(makeFilePointer(accessPlan.getBlockAddress(), 0));
|
||||
validatingInputStream.read(validBytes);
|
||||
validatingInputStream.seek(currentFilePointer);
|
||||
|
||||
|
|
@ -325,33 +273,41 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
throw new ReviewedStingException(String.format("Bytes being inserted into BlockInputStream %s are incorrect",this));
|
||||
}
|
||||
|
||||
this.position = position;
|
||||
position.advancePosition(filePosition << 16);
|
||||
compactBuffer();
|
||||
// Open up the buffer for more reading.
|
||||
buffer.limit(buffer.capacity());
|
||||
|
||||
if(buffer.remaining() < incomingBuffer.remaining()) {
|
||||
//System.out.printf("Thread %s: waiting for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n",Thread.currentThread().getId(),buffer.remaining(),incomingBuffer.remaining());
|
||||
// Get the spans overlapping this particular block...
|
||||
List<GATKChunk> spansOverlapping = accessPlan.getSpansOverlappingBlock(accessPlan.getBlockAddress(),filePosition);
|
||||
|
||||
// ...and advance the block
|
||||
this.accessPlan = accessPlan;
|
||||
accessPlan.advancePosition(makeFilePointer(filePosition, 0));
|
||||
|
||||
if(buffer.remaining() < incomingBuffer.remaining())
|
||||
lock.wait();
|
||||
//System.out.printf("Thread %s: waited for available space in buffer; buffer remaining = %d, incoming buffer remaining = %d%n", Thread.currentThread().getId(), buffer.remaining(), incomingBuffer.remaining());
|
||||
|
||||
final int bytesInIncomingBuffer = incomingBuffer.limit();
|
||||
|
||||
for(GATKChunk spanOverlapping: spansOverlapping) {
|
||||
// Clear out the endcap tracking state and add in the starting position for this transfer.
|
||||
blockOffsets.removeLast();
|
||||
blockOffsets.add(buffer.position());
|
||||
blockPositions.removeLast();
|
||||
blockPositions.add(spanOverlapping.getChunkStart());
|
||||
|
||||
// Stream the buffer into the data stream.
|
||||
incomingBuffer.limit((spanOverlapping.getBlockEnd() > spanOverlapping.getBlockStart()) ? bytesInIncomingBuffer : spanOverlapping.getBlockOffsetEnd());
|
||||
incomingBuffer.position(spanOverlapping.getBlockOffsetStart());
|
||||
buffer.put(incomingBuffer);
|
||||
|
||||
// Add the endcap for this transfer.
|
||||
blockOffsets.add(buffer.position());
|
||||
blockPositions.add(spanOverlapping.getChunkEnd());
|
||||
}
|
||||
|
||||
// Remove the last position in the list and add in the last read position, in case the two are different.
|
||||
blockOffsets.removeLast();
|
||||
blockOffsets.add(buffer.position());
|
||||
blockPositions.removeLast();
|
||||
blockPositions.add(lastBlockAddress << 16 | blockOffsetStart);
|
||||
|
||||
// Stream the buffer into the data stream.
|
||||
incomingBuffer.position(blockOffsetStart);
|
||||
incomingBuffer.limit(Math.min(incomingBuffer.limit(),blockOffsetEnd));
|
||||
buffer.put(incomingBuffer);
|
||||
|
||||
// Then, add the last position read to the very end of the list, just past the end of the last buffer.
|
||||
blockOffsets.add(buffer.position());
|
||||
blockPositions.add(endOfRead);
|
||||
|
||||
// Set up the buffer for reading.
|
||||
buffer.flip();
|
||||
bufferFilled = true;
|
||||
|
||||
lock.notify();
|
||||
}
|
||||
|
|
@ -447,12 +403,8 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
if(remaining < length)
|
||||
return length - remaining;
|
||||
|
||||
// Otherwise, if at eof(), return -1.
|
||||
else if(eof())
|
||||
return -1;
|
||||
|
||||
// Otherwise, we must've hit a bug in the system.
|
||||
throw new ReviewedStingException("BUG: read returned no data, but eof() reports false.");
|
||||
// Otherwise, return -1.
|
||||
return -1;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
|
@ -472,20 +424,26 @@ public class BlockInputStream extends SeekableStream implements BAMInputStream {
|
|||
|
||||
private void waitForBufferFill() {
|
||||
synchronized(lock) {
|
||||
bufferFilled = false;
|
||||
if(buffer.remaining() == 0 && !eof()) {
|
||||
//System.out.printf("Thread %s is waiting for a buffer fill from position %d to buffer %s%n",Thread.currentThread().getId(),position.getBlockAddress(),this);
|
||||
dispatcher.queueBlockLoad(position);
|
||||
dispatcher.queueBlockLoad(accessPlan);
|
||||
try {
|
||||
lock.wait();
|
||||
}
|
||||
catch(InterruptedException ex) {
|
||||
throw new ReviewedStingException("Interrupt occurred waiting for buffer to fill",ex);
|
||||
}
|
||||
|
||||
if(bufferFilled && buffer.remaining() == 0)
|
||||
throw new RuntimeEOFException("No more data left in InputStream");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an encoded BAM file pointer given the address of a BGZF block and an offset.
|
||||
* @param blockAddress Physical address on disk of a BGZF block.
|
||||
* @param blockOffset Offset into the uncompressed data stored in the BGZF block.
|
||||
* @return 64-bit pointer encoded according to the BAM spec.
|
||||
*/
|
||||
public static long makeFilePointer(final long blockAddress, final int blockOffset) {
|
||||
return blockAddress << 16 | blockOffset;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2011, The Broad Institute
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
|
|
@ -70,29 +70,29 @@ class BlockLoader implements Runnable {
|
|||
|
||||
public void run() {
|
||||
for(;;) {
|
||||
SAMReaderPosition readerPosition = null;
|
||||
BAMAccessPlan accessPlan = null;
|
||||
try {
|
||||
readerPosition = dispatcher.claimNextWorkRequest();
|
||||
FileInputStream inputStream = fileHandleCache.claimFileInputStream(readerPosition.getReader());
|
||||
accessPlan = dispatcher.claimNextWorkRequest();
|
||||
FileInputStream inputStream = fileHandleCache.claimFileInputStream(accessPlan.getReader());
|
||||
|
||||
long blockAddress = readerPosition.getBlockAddress();
|
||||
//long blockAddress = readerPosition.getBlockAddress();
|
||||
//System.out.printf("Thread %s: BlockLoader: copying bytes from %s at position %d into %s%n",Thread.currentThread().getId(),inputStream,blockAddress,readerPosition.getInputStream());
|
||||
|
||||
ByteBuffer compressedBlock = readBGZFBlock(inputStream,readerPosition.getBlockAddress());
|
||||
ByteBuffer compressedBlock = readBGZFBlock(inputStream,accessPlan.getBlockAddress());
|
||||
long nextBlockAddress = position(inputStream);
|
||||
fileHandleCache.releaseFileInputStream(readerPosition.getReader(),inputStream);
|
||||
fileHandleCache.releaseFileInputStream(accessPlan.getReader(),inputStream);
|
||||
|
||||
ByteBuffer block = decompress ? decompressBGZFBlock(compressedBlock) : compressedBlock;
|
||||
int bytesCopied = block.remaining();
|
||||
|
||||
BlockInputStream bamInputStream = readerPosition.getInputStream();
|
||||
bamInputStream.copyIntoBuffer(block,readerPosition,nextBlockAddress);
|
||||
BlockInputStream bamInputStream = accessPlan.getInputStream();
|
||||
bamInputStream.copyIntoBuffer(block,accessPlan,nextBlockAddress);
|
||||
|
||||
//System.out.printf("Thread %s: BlockLoader: copied %d bytes from %s at position %d into %s%n",Thread.currentThread().getId(),bytesCopied,inputStream,blockAddress,readerPosition.getInputStream());
|
||||
}
|
||||
catch(Throwable error) {
|
||||
if(readerPosition != null && readerPosition.getInputStream() != null)
|
||||
readerPosition.getInputStream().reportException(error);
|
||||
if(accessPlan != null && accessPlan.getInputStream() != null)
|
||||
accessPlan.getInputStream().reportException(error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class ReadShard extends Shard {
|
||||
/**
|
||||
* What is the maximum number of reads which should go into a read shard.
|
||||
* What is the maximum number of reads per BAM file which should go into a read shard.
|
||||
*/
|
||||
public static int MAX_READS = 10000;
|
||||
|
||||
|
|
|
|||
|
|
@ -567,9 +567,14 @@ public class SAMDataSource {
|
|||
|
||||
if(threadAllocation.getNumIOThreads() > 0) {
|
||||
BlockInputStream inputStream = readers.getInputStream(id);
|
||||
inputStream.submitAccessPlan(new SAMReaderPosition(id,inputStream,(GATKBAMFileSpan)shard.getFileSpans().get(id)));
|
||||
inputStream.submitAccessPlan(new BAMAccessPlan(id, inputStream, (GATKBAMFileSpan) shard.getFileSpans().get(id)));
|
||||
BAMRecordCodec codec = new BAMRecordCodec(getHeader(id),factory);
|
||||
codec.setInputStream(inputStream);
|
||||
iterator = new BAMCodecIterator(inputStream,readers.getReader(id),codec);
|
||||
}
|
||||
else {
|
||||
iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id));
|
||||
}
|
||||
iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id));
|
||||
if(shard.getGenomeLocs().size() > 0)
|
||||
iterator = new IntervalOverlapFilteringIterator(iterator,shard.getGenomeLocs());
|
||||
iteratorMap.put(readers.getReader(id), iterator);
|
||||
|
|
@ -577,8 +582,6 @@ public class SAMDataSource {
|
|||
|
||||
MergingSamRecordIterator mergingIterator = readers.createMergingIterator(iteratorMap);
|
||||
|
||||
|
||||
|
||||
return applyDecoratingIterators(shard.getReadMetrics(),
|
||||
enableVerification,
|
||||
readProperties.useOriginalBaseQualities(),
|
||||
|
|
@ -592,6 +595,49 @@ public class SAMDataSource {
|
|||
readProperties.defaultBaseQualities());
|
||||
}
|
||||
|
||||
private class BAMCodecIterator implements CloseableIterator<SAMRecord> {
|
||||
private final BlockInputStream inputStream;
|
||||
private final SAMFileReader reader;
|
||||
private final BAMRecordCodec codec;
|
||||
private SAMRecord nextRead;
|
||||
|
||||
private BAMCodecIterator(final BlockInputStream inputStream, final SAMFileReader reader, final BAMRecordCodec codec) {
|
||||
this.inputStream = inputStream;
|
||||
this.reader = reader;
|
||||
this.codec = codec;
|
||||
advance();
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
return nextRead != null;
|
||||
}
|
||||
|
||||
public SAMRecord next() {
|
||||
if(!hasNext())
|
||||
throw new NoSuchElementException("Unable to retrieve next record from BAMCodecIterator; input stream is empty");
|
||||
SAMRecord currentRead = nextRead;
|
||||
advance();
|
||||
return currentRead;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
// NO-OP.
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Unable to remove from BAMCodecIterator");
|
||||
}
|
||||
|
||||
private void advance() {
|
||||
final long startCoordinate = inputStream.getFilePointer();
|
||||
nextRead = codec.decode();
|
||||
final long stopCoordinate = inputStream.getFilePointer();
|
||||
|
||||
if(reader != null && nextRead != null)
|
||||
PicardNamespaceUtils.setFileSource(nextRead,new SAMFileSource(reader,new GATKBAMFileSpan(new GATKChunk(startCoordinate,stopCoordinate))));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter reads based on user-specified criteria.
|
||||
*
|
||||
|
|
@ -871,12 +917,9 @@ public class SAMDataSource {
|
|||
public ReaderInitializer call() {
|
||||
final File indexFile = findIndexFile(readerID.samFile);
|
||||
try {
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
if (threadAllocation.getNumIOThreads() > 0)
|
||||
blockInputStream = new BlockInputStream(dispatcher,readerID,false);
|
||||
reader = new SAMFileReader(blockInputStream,indexFile,false);
|
||||
}
|
||||
else
|
||||
reader = new SAMFileReader(readerID.samFile,indexFile,false);
|
||||
reader = new SAMFileReader(readerID.samFile,indexFile,false);
|
||||
} catch ( RuntimeIOException e ) {
|
||||
if ( e.getCause() != null && e.getCause() instanceof FileNotFoundException )
|
||||
throw new UserException.CouldNotReadInputFile(readerID.samFile, e);
|
||||
|
|
|
|||
Loading…
Reference in New Issue