Pooling of unmapped reads -- improves runtime of files with tons of unmapped reads by an order of magnitude.

Desperately needs cleanup.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@1080 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-06-23 23:48:06 +00:00
parent dfa2efbcf5
commit ef546868bf
11 changed files with 541 additions and 245 deletions

View File

@ -0,0 +1,280 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator;
import org.broadinstitute.sting.utils.StingException;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMReadGroupRecord;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.util.CloseableIterator;
import java.util.List;
import java.util.ArrayList;
import java.io.File;
/**
* User: hanna
* Date: Jun 23, 2009
* Time: 6:49:04 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Maintains a pointer into a stream of reads. Tracks state between mapped and unmapped.
* For mapped, assumes that the user will query directly to where they want; closes the iterator after each use.
* For unmapped, assumes that the user will walk through the entire stream. Keeps the iterator open permanently.
*/
enum MappingType { MAPPED, UNMAPPED }
class ReadStreamPointer {
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(ReadStreamPointer.class);
/**
* Describes the source of reads data.
*/
private final Reads sourceInfo;
/**
* Open handles to the reads info.
*/
private final SamFileHeaderMerger headerMerger;
/**
* The (possibly merged) header for the input fileset.
*/
private final SAMFileHeader header;
/**
* In which bucket of reads does this pointer live?
*/
private MappingType streamPosition = MappingType.MAPPED;
/**
* A pointer to the current position of this iterator in the read stream.
*/
private PositionTrackingIterator unmappedIterator = null;
public ReadStreamPointer( Reads sourceInfo ) {
this.sourceInfo = sourceInfo;
this.headerMerger = createHeaderMerger(sourceInfo, SAMFileHeader.SortOrder.coordinate);
this.header = this.headerMerger.getMergedHeader();
}
/**
* Gets the header information for the read stream.
* @return Header information for the read stream.
*/
public SAMFileHeader getHeader() {
return header;
}
/**
* Can this pointer be efficiently used to access the given segment?
* @param segment Segment to inspect.
* @return True if the segment can be accessed efficiently, false otherwise.
*/
public boolean canAccessSegmentEfficiently( DataStreamSegment segment ) {
switch( streamPosition ) {
case MAPPED:
return true;
case UNMAPPED:
if( segment instanceof MappedStreamSegment )
return false;
else if( segment instanceof UnmappedStreamSegment ) {
UnmappedStreamSegment unmappedSegment = (UnmappedStreamSegment)segment;
return unmappedIterator.position <= unmappedSegment.position;
}
else
throw new StingException("Unsupported stream segment type: " + segment.getClass());
default:
throw new StingException("Pointer has hit illegal stream position; current position is " + streamPosition);
}
}
public void close() {
if( unmappedIterator != null )
unmappedIterator.close();
for (SAMFileReader reader : headerMerger.getReaders())
reader.close();
}
/**
* Get a stream of all the reads that overlap a given segment.
* @param segment Segment to check for overlaps.
* @return An iterator over all reads overlapping the given segment.
*/
public StingSAMIterator getReadsOverlapping( MappedStreamSegment segment ) {
MergingSamRecordIterator2 mergingIterator = new MergingSamRecordIterator2( headerMerger, sourceInfo );
mergingIterator.queryOverlapping( segment.locus.getContig(),
(int)segment.locus.getStart(),
(int)segment.locus.getStop());
return StingSAMIteratorAdapter.adapt(sourceInfo,mergingIterator);
}
public StingSAMIterator getReadsContainedBy( DataStreamSegment segment ) {
if( segment instanceof MappedStreamSegment ) {
MappedStreamSegment mappedSegment = (MappedStreamSegment)segment;
MergingSamRecordIterator2 mergingIterator = new MergingSamRecordIterator2( headerMerger, sourceInfo );
mergingIterator.queryContained( mappedSegment.locus.getContig(),
(int)mappedSegment.locus.getStart(),
(int)mappedSegment.locus.getStop());
return StingSAMIteratorAdapter.adapt(sourceInfo,mergingIterator);
}
else if( segment instanceof UnmappedStreamSegment ) {
UnmappedStreamSegment unmappedSegment = (UnmappedStreamSegment)segment;
// If the stream position has not flipped over to the unmapped state, do some initialization.
if( streamPosition == MappingType.MAPPED ) {
MergingSamRecordIterator2 mergingIterator = new MergingSamRecordIterator2( headerMerger, sourceInfo );
mergingIterator.queryUnmappedReads();
unmappedIterator = new PositionTrackingIterator( sourceInfo, mergingIterator, 0L );
streamPosition = MappingType.UNMAPPED;
}
else {
if( streamPosition != MappingType.UNMAPPED || unmappedIterator == null )
throw new StingException("Illegal state: iterator has fetched all mapped reads but has not properly transition to unmapped reads");
// Force the iterator to the next pending position.
while(unmappedIterator.position < unmappedSegment.position)
unmappedIterator.next();
}
return new BoundedReadIterator(StingSAMIteratorAdapter.adapt(sourceInfo,unmappedIterator), unmappedSegment.size);
}
else
throw new StingException("Unable to handle stream segment of type" + segment.getClass());
}
/**
* A private function that, given the internal file list, generates a merging construct for
* all available files.
* @param reads source information about the reads.
* @param SORT_ORDER sort order for the reads.
* @return a list of SAMFileReaders that represent the stored file names
*/
protected SamFileHeaderMerger createHeaderMerger( Reads reads, SAMFileHeader.SortOrder SORT_ORDER )
throws SimpleDataSourceLoadException {
// right now this is pretty damn heavy, it copies the file list into a reader list every time
List<SAMFileReader> lst = new ArrayList<SAMFileReader>();
for (File f : reads.getReadsFiles()) {
SAMFileReader reader = new SAMFileReader(f, true);
reader.setValidationStringency(reads.getValidationStringency());
final SAMFileHeader header = reader.getFileHeader();
logger.debug(String.format("Sort order is: " + header.getSortOrder()));
if (reader.getFileHeader().getReadGroups().size() < 1) {
//logger.warn("Setting header in reader " + f.getName());
SAMReadGroupRecord rec = new SAMReadGroupRecord(f.getName());
rec.setLibrary(f.getName());
rec.setSample(f.getName());
reader.getFileHeader().addReadGroup(rec);
}
lst.add(reader);
}
return new SamFileHeaderMerger(lst,SORT_ORDER,true);
}
private class PositionTrackingIterator implements StingSAMIterator {
/**
* Source information about the reads.
*/
private Reads sourceInfo;
/**
* The iterator being tracked.
*/
private CloseableIterator<SAMRecord> iterator;
/**
* Current position within the tracked iterator.
*/
private long position;
/**
* {@inheritDoc}
*/
public Reads getSourceInfo() {
return sourceInfo;
}
/**
* Retrieves the current position of the iterator. The 'current position' of the iterator is defined as
* the coordinate of the read that will be returned if next() is called.
* @return The current position of the iterator.
*/
public long getPosition() {
return position;
}
/**
* Create a new iterator wrapping the given position, assuming that the reader is <code>position</code> reads
* into the sequence.
* @param sourceInfo Information about where these reads came from.
* @param iterator Iterator to wraps.
* @param position Non-negative position where the iterator currently sits.
*/
public PositionTrackingIterator( Reads sourceInfo, CloseableIterator<SAMRecord> iterator, long position ) {
this.sourceInfo = sourceInfo;
this.iterator = iterator;
this.position = position;
}
/**
* {@inheritDoc}
*/
public boolean hasNext() {
return iterator.hasNext();
}
/**
* Try to get the next read in the list. If a next read is available, increment the position.
* @return next read in the list, if available.
*/
public SAMRecord next() {
try {
return iterator.next();
}
finally {
position++;
}
}
/**
* {@inheritDoc}
*/
public StingSAMIterator iterator() {
return this;
}
/**
* {@inheritDoc}
*/
public void close() {
// Position tracking iterators are constant through the life of the traversal. Don't close them.
// TODO: This is an artifact of the fact that pooled query iterators need to be closed, but pooled unmapped
// TODO: iterators must not be. Clean this up!
}
/**
* {@inheritDoc}
*/
public void remove() { throw new UnsupportedOperationException("Cannot remove from a StingSAMIterator"); }
}
}

View File

@ -5,6 +5,7 @@ import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
import org.broadinstitute.sting.gatk.refdata.RODIterator;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.StingException;
import java.util.Iterator;
import java.util.List;
@ -58,7 +59,7 @@ public class ReferenceOrderedDataSource implements SimpleDataSource {
* @return Iterator through the data.
*/
public Iterator seek( Shard shard ) {
RODIterator iterator = iteratorPool.iterator(shard.getGenomeLoc());
RODIterator iterator = iteratorPool.iterator( new MappedStreamSegment(shard.getGenomeLoc()) );
return iterator;
}
@ -85,21 +86,25 @@ class ReferenceOrderedDataPool extends ResourcePool<RODIterator,RODIterator> {
/**
* Create a new iterator from the existing reference-ordered data. This new iterator is expected
* to be completely independent of any other iterator.
* @param position @{inheritedDoc}
* @return The newly created resource.
*/
public RODIterator createNewResource( GenomeLoc position ) {
public RODIterator createNewResource() {
return rod.iterator();
}
/**
* Finds the best existing ROD iterator from the pool. In this case, the best existing ROD is defined as
* the first one encountered that is at or before the given position.
* @param position @{inheritedDoc}
* @param segment @{inheritedDoc}
* @param resources @{inheritedDoc}
* @return @{inheritedDoc}
*/
public RODIterator selectBestExistingResource( GenomeLoc position, List<RODIterator> resources ) {
public RODIterator selectBestExistingResource( DataStreamSegment segment, List<RODIterator> resources ) {
if( !(segment instanceof MappedStreamSegment) )
throw new StingException("Reference-ordered data cannot utilitize unmapped segments.");
GenomeLoc position = ((MappedStreamSegment)segment).locus;
for( RODIterator iterator: resources ) {
if( (iterator.position() == null && iterator.hasNext()) ||
(iterator.position() != null && iterator.position().isBefore(position)) )
@ -111,7 +116,7 @@ class ReferenceOrderedDataPool extends ResourcePool<RODIterator,RODIterator> {
/**
* In this case, the iterator is the resource. Pass it through.
*/
public RODIterator createIteratorFromResource( GenomeLoc position, RODIterator resource ) {
public RODIterator createIteratorFromResource( DataStreamSegment segment, RODIterator resource ) {
return resource;
}

View File

@ -42,29 +42,28 @@ abstract class ResourcePool <T,I extends Iterator> {
/**
* Get an iterator whose position is before the specified location. Create a new one if none exists.
* @param position Target position for the iterator.
* @param segment Target position for the iterator.
* @return An iterator that can traverse the selected region. Should be able to iterate concurrently with other
* iterators from tihs pool.
*/
public I iterator( GenomeLoc position ) {
public I iterator( DataStreamSegment segment ) {
// Grab the first iterator in the list whose position is before the requested position.
T selectedResource = null;
synchronized(this) {
selectedResource = selectBestExistingResource( position, availableResources );
selectedResource = selectBestExistingResource( segment, availableResources );
// No iterator found? Create another. It is expected that
// each iterator created will have its own file handle.
if( selectedResource == null ) {
selectedResource = createNewResource();
addNewResource( selectedResource );
}
// Remove the iterator from the list of available iterators.
if( selectedResource != null )
availableResources.remove(selectedResource);
availableResources.remove(selectedResource);
}
// No iterator found? Create another. It is expected that
// each iterator created will have its own file handle.
if( selectedResource == null ) {
selectedResource = createNewResource(position);
addNewResource( selectedResource );
}
I iterator = createIteratorFromResource( position, selectedResource );
I iterator = createIteratorFromResource( segment, selectedResource );
// Make a note of this assignment for proper releasing later.
resourceAssignments.put( iterator, selectedResource );
@ -97,32 +96,33 @@ abstract class ResourcePool <T,I extends Iterator> {
protected void addNewResource( T resource ) {
synchronized(this) {
allResources.add(resource);
availableResources.add(resource);
}
}
/**
* If no appropriate resources are found in the pool, the system can create a new resource.
* Delegate the creation of the resource to the subclass.
* @param position Position for the new resource. This information may or may not inform the new resource.
* @return The new resource created.
*/
protected abstract T createNewResource( GenomeLoc position );
protected abstract T createNewResource();
/**
* Find the most appropriate resource to acquire the specified data.
* @param position The data over which the resource is required.
* @param segment The data over which the resource is required.
* @param availableResources A list of candidate resources to evaluate.
* @return The best choice of the availableResources, or null if no resource meets the criteria.
*/
protected abstract T selectBestExistingResource( GenomeLoc position, List<T> availableResources );
protected abstract T selectBestExistingResource( DataStreamSegment segment, List<T> availableResources );
/**
* Create an iterator over the specified resource.
* @param position The bounds of iteration. The first element of the iterator through the last element should all
* be in the range described by position.
* @param resource The resource from which to derive the iterator.
* @return A new iterator over the given data.
*/
protected abstract I createIteratorFromResource( GenomeLoc position, T resource );
protected abstract I createIteratorFromResource( DataStreamSegment position, T resource );
/**
* Retire this resource from service.
@ -149,3 +149,44 @@ abstract class ResourcePool <T,I extends Iterator> {
}
}
/**
* Marker interface that represents an arbitrary consecutive segment within a data stream.
*/
interface DataStreamSegment {
}
/**
* Models a mapped position within a stream of GATK input data.
*/
class MappedStreamSegment implements DataStreamSegment {
public final GenomeLoc locus;
public MappedStreamSegment( GenomeLoc locus ) {
this.locus = locus;
}
}
/**
* Models a position within the unmapped reads in a stream of GATK input data.
*/
class UnmappedStreamSegment implements DataStreamSegment {
/**
* Where does this region start, given 0 = the position of the first unmapped read.
*/
public final long position;
/**
* How many reads wide is this region? This size is generally treated as an upper bound.
*/
public final long size;
/**
* Create a new target location in an unmapped read stream.
* @param position The 0-based index into the unmapped reads. Position 0 represents the first unmapped read.
* @param size the size of the segment.
*/
public UnmappedStreamSegment( long position, long size ) {
this.position = position;
this.size = size;
}
}

View File

@ -1,9 +1,6 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMReadGroupRecord;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.util.CloseableIterator;
import org.apache.log4j.Logger;
@ -17,9 +14,7 @@ import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocParser;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
/*
* Copyright (c) 2009 The Broad Institute
@ -121,7 +116,7 @@ public class SAMDataSource implements SimpleDataSource {
* @return an iterator for that region
*/
public StingSAMIterator seekLocus( GenomeLoc location ) throws SimpleDataSourceLoadException {
return iteratorPool.iterator(location);
return iteratorPool.iterator( new MappedStreamSegment(location) );
}
/**
@ -178,35 +173,32 @@ public class SAMDataSource implements SimpleDataSource {
*
* @return an iterator for that region
*/
private BoundedReadIterator seekRead( ReadShard shard ) throws SimpleDataSourceLoadException {
BoundedReadIterator bound = null;
private StingSAMIterator seekRead( ReadShard shard ) throws SimpleDataSourceLoadException {
StingSAMIterator iter = null;
if (!intoUnmappedReads) {
if (lastReadPos == null) {
lastReadPos = GenomeLocParser.createGenomeLoc(getHeader().getSequenceDictionary().getSequence(0).getSequenceIndex(), 0, Integer.MAX_VALUE);
iter = iteratorPool.iterator(lastReadPos);
iter = iteratorPool.iterator(new MappedStreamSegment(lastReadPos));
return InitialReadIterator(shard.getSize(), iter);
} else {
lastReadPos.setStop(-1);
iter = iteratorPool.iterator(lastReadPos);
bound = fastMappedReadSeek(shard.getSize(), StingSAMIteratorAdapter.adapt(reads, iter));
iter = fastMappedReadSeek(shard.getSize(), StingSAMIteratorAdapter.adapt(reads, iteratorPool.iterator(new MappedStreamSegment(lastReadPos))));
}
if( intoUnmappedReads && !includeUnmappedReads )
shard.signalDone();
}
if (( bound == null || intoUnmappedReads ) && includeUnmappedReads) {
if (iter != null) {
if (intoUnmappedReads && includeUnmappedReads) {
if( iter != null )
iter.close();
}
iter = iteratorPool.iterator(null);
bound = toUnmappedReads(shard.getSize(), (QueryIterator) iter);
iter = toUnmappedReads( shard.getSize() );
if( !iter.hasNext() )
shard.signalDone();
}
if (bound == null) {
shard.signalDone();
bound = new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), 0);
}
return bound;
return iter;
}
/**
@ -222,47 +214,28 @@ public class SAMDataSource implements SimpleDataSource {
}
/**
* Seek, if we want unmapped reads. This method will be faster then the unmapped read method, but you cannot extract the
* unmapped reads.
*
* Retrieve unmapped reads.
* @param readCount how many reads to retrieve
* @param iter the iterator to use
*
* @return the bounded iterator that you can use to get the intervaled reads from
* @throws SimpleDataSourceLoadException
*/
BoundedReadIterator toUnmappedReads( long readCount, QueryIterator iter ) throws SimpleDataSourceLoadException {
iter.queryUnmappedReads();
int count = 0;
// now walk until we've taken the unmapped read count
while (iter.hasNext() && count < this.readsTaken) {
iter.next();
count++;
}
// check to see what happened, did we run out of reads?
if (!iter.hasNext()) {
return null;
}
// we're not out of unmapped reads, so increment our read cout
this.readsTaken += readCount;
return new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), readCount);
StingSAMIterator toUnmappedReads( long readCount ) {
StingSAMIterator iter = iteratorPool.iterator( new UnmappedStreamSegment( readsTaken,readCount) );
readsTaken += readCount;
return iter;
}
/**
* A seek function for unmapped reads.
* A seek function for mapped reads.
*
* @param readCount how many reads to retrieve
* @param iter the iterator to use, seeked to the correct start location
*
* @return the bounded iterator that you can use to get the intervaled reads from
* @return the bounded iterator that you can use to get the intervaled reads from. Will be a zero-length
* iterator if no reads are available.
* @throws SimpleDataSourceLoadException
*/
BoundedReadIterator fastMappedReadSeek( long readCount, StingSAMIterator iter ) throws SimpleDataSourceLoadException {
StingSAMIterator fastMappedReadSeek( long readCount, StingSAMIterator iter ) throws SimpleDataSourceLoadException {
BoundedReadIterator bound;
correctForReadPileupSeek(iter);
if (readsTaken == 0) {
@ -280,18 +253,22 @@ public class SAMDataSource implements SimpleDataSource {
lastPos = rec.getAlignmentStart();
++x;
} else {
iter.close();
// jump contigs
lastReadPos = GenomeLocParser.toNextContig(lastReadPos);
if (lastReadPos == null) {
// check to see if we're using unmapped reads, if not return, we're done
readsTaken = 0;
intoUnmappedReads = true;
return null;
// fastMappedReadSeek must return an iterator, even if that iterator iterates through nothing.
return new NullSAMIterator(reads);
} else {
readsTaken = readCount;
readsSeenAtLastPos = 0;
lastReadPos.setStop(-1);
CloseableIterator<SAMRecord> ret = iteratorPool.iterator(lastReadPos);
CloseableIterator<SAMRecord> ret = iteratorPool.iterator(new MappedStreamSegment(lastReadPos));
return new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, ret), readCount);
}
}
@ -360,7 +337,7 @@ public class SAMDataSource implements SimpleDataSource {
}
class SAMIteratorPool extends ResourcePool<SamFileHeaderMerger, QueryIterator> {
class SAMIteratorPool extends ResourcePool<ReadStreamPointer, StingSAMIterator> {
/** Source information about the reads. */
protected Reads reads;
@ -377,10 +354,10 @@ class SAMIteratorPool extends ResourcePool<SamFileHeaderMerger, QueryIterator> {
this.reads = reads;
this.byReads = byReads;
SamFileHeaderMerger merger = createNewResource(null);
this.header = merger.getMergedHeader();
ReadStreamPointer streamPointer = createNewResource();
this.header = streamPointer.getHeader();
// Add this resource to the pool.
this.addNewResource(merger);
this.addNewResource(streamPointer);
}
/** Get the combined header for all files in the iterator pool. */
@ -388,137 +365,52 @@ class SAMIteratorPool extends ResourcePool<SamFileHeaderMerger, QueryIterator> {
return header;
}
protected SamFileHeaderMerger selectBestExistingResource( GenomeLoc position, List<SamFileHeaderMerger> mergers ) {
if (mergers.size() == 0)
return null;
return mergers.get(0);
protected ReadStreamPointer selectBestExistingResource( DataStreamSegment segment, List<ReadStreamPointer> pointers ) {
for( ReadStreamPointer pointer: pointers ) {
if( pointer.canAccessSegmentEfficiently( segment ) ) {
return pointer;
}
}
return null;
}
protected SamFileHeaderMerger createNewResource( GenomeLoc position ) {
return createHeaderMerger(reads, SAMFileHeader.SortOrder.coordinate);
protected ReadStreamPointer createNewResource() {
return new ReadStreamPointer( reads );
}
protected QueryIterator createIteratorFromResource( GenomeLoc loc, SamFileHeaderMerger headerMerger ) {
final MergingSamRecordIterator2 iterator = new MergingSamRecordIterator2(headerMerger, reads);
protected StingSAMIterator createIteratorFromResource( DataStreamSegment segment, ReadStreamPointer streamPointer ) {
StingSAMIterator iterator = null;
if (loc != null) {
if (byReads)
iterator.queryContained(loc.getContig(), (int) loc.getStart(), (int) loc.getStop());
else
iterator.queryOverlapping(loc.getContig(), (int) loc.getStart(), (int) loc.getStop());
if( byReads )
iterator = streamPointer.getReadsContainedBy( segment );
else {
if( !(segment instanceof MappedStreamSegment) )
throw new StingException("Segment is unmapped; true overlaps cannot be determined.");
iterator = streamPointer.getReadsOverlapping( (MappedStreamSegment)segment );
}
return new QueryIterator() {
public Reads getSourceInfo() {
return reads;
}
public void close() {
iterator.close();
release(this);
}
public Iterator<SAMRecord> iterator() {
return this;
}
public boolean hasNext() {
return iterator.hasNext();
}
public SAMRecord next() {
return iterator.next();
}
public void remove() {
throw new UnsupportedOperationException("Can't remove from a StingSAMIterator");
}
public SAMRecord peek() {
return iterator.peek();
}
public void queryOverlapping( String contig, int start, int stop ) {
iterator.queryOverlapping(contig, start, stop);
}
public void query( String contig, int start, int stop, boolean contained ) {
iterator.query(contig, start, stop, contained);
}
public void queryUnmappedReads() {
iterator.queryUnmappedReads();
}
public void queryContained( String contig, int start, int stop ) {
iterator.queryContained(contig, start, stop);
}
};
return new ReleasingIterator(iterator);
}
protected void closeResource( SamFileHeaderMerger resource ) {
for (SAMFileReader reader : resource.getReaders())
reader.close();
protected void closeResource( ReadStreamPointer resource ) {
resource.close();
}
/**
* Load a SAM/BAM, given an input file.
*
* @param samFile the file name
*
* @return a SAMFileReader for the file, null if we're attempting to read a list
*/
protected SAMFileReader initializeSAMFile( final File samFile, SAMFileReader.ValidationStringency strictness ) {
if (samFile.toString().endsWith(".list")) {
return null;
} else {
SAMFileReader samReader = new SAMFileReader(samFile, true);
samReader.setValidationStringency(strictness);
private class ReleasingIterator implements StingSAMIterator {
private final StingSAMIterator wrappedIterator;
final SAMFileHeader header = samReader.getFileHeader();
logger.debug(String.format("Sort order is: " + header.getSortOrder()));
public Reads getSourceInfo() { return wrappedIterator.getSourceInfo(); }
return samReader;
public ReleasingIterator( StingSAMIterator wrapped ) { this.wrappedIterator = wrapped; }
public ReleasingIterator iterator() { return this; }
public void remove() { throw new UnsupportedOperationException("Can't remove from a StingSAMIterator"); }
public void close() {
wrappedIterator.close();
release(this);
}
public boolean hasNext() { return wrappedIterator.hasNext(); }
public SAMRecord next() { return wrappedIterator.next(); }
}
/**
* A private function that, given the internal file list, generates a SamFileReader
* list of validated files.
*
* @return a list of SAMFileReaders that represent the stored file names
* @throws SimpleDataSourceLoadException if there's a problem loading the files
*/
protected List<SAMFileReader> GetReaderList( Reads reads ) throws SimpleDataSourceLoadException {
// right now this is pretty damn heavy, it copies the file list into a reader list every time
List<SAMFileReader> lst = new ArrayList<SAMFileReader>();
for (File f : reads.getReadsFiles()) {
SAMFileReader reader = initializeSAMFile(f, reads.getValidationStringency());
if (reader.getFileHeader().getReadGroups().size() < 1) {
//logger.warn("Setting header in reader " + f.getName());
SAMReadGroupRecord rec = new SAMReadGroupRecord(f.getName());
rec.setLibrary(f.getName());
rec.setSample(f.getName());
reader.getFileHeader().addReadGroup(rec);
}
if (reader == null) {
throw new SimpleDataSourceLoadException("SAMDataSource: Unable to load file: " + f);
}
lst.add(reader);
}
return lst;
}
/**
* create the merging header.
*
* @return a SamFileHeaderMerger that includes the set of SAM files we were created with
*/
protected SamFileHeaderMerger createHeaderMerger( Reads reads, SAMFileHeader.SortOrder SORT_ORDER ) {
List<SAMFileReader> lst = GetReaderList(reads);
return new SamFileHeaderMerger(lst, SORT_ORDER, true);
}
}
}

View File

@ -55,7 +55,10 @@ public class BoundedReadIterator implements StingSAMIterator {
// our unmapped read flag
private boolean doNotUseThatUnmappedReadPile = false;
// the next read we've buffered
/**
* The next read that we've buffered. Null indicates that there's
* nothing in the buffer (not that there isn't a next read).
*/
private SAMRecord record = null;
/**
@ -96,6 +99,9 @@ public class BoundedReadIterator implements StingSAMIterator {
* @return
*/
public boolean hasNext() {
if( record != null )
return true;
if (iterator.hasNext() && currentCount < readCount) {
record = iterator.next();
++currentCount;
@ -113,7 +119,9 @@ public class BoundedReadIterator implements StingSAMIterator {
* @return SAMRecord representing the next read
*/
public SAMRecord next() {
return record;
SAMRecord cached = record;
record = null;
return cached;
}
/**

View File

@ -108,13 +108,13 @@ public class ArtificialPatternedSAMIterator extends ArtificialSAMIterator {
}
// check for end condition, have we finished the chromosome listing, and have no unmapped reads
if (currentChromo >= eChromosomeCount) {
if (uMappedReadCount < 1) {
if (unmappedRemaining < 1) {
this.next = null;
return false;
} else {
++totalReadCount;
this.next = ArtificialSAMUtils.createArtificialRead(this.header, String.valueOf(totalReadCount), -1, -1, 50);
--uMappedReadCount;
--unmappedRemaining;
return true;
}
}

View File

@ -1,12 +1,9 @@
package org.broadinstitute.sting.utils.sam;
import org.broadinstitute.sting.gatk.iterators.QueryIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.Reads;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.util.PeekableIterator;
import java.util.Iterator;
@ -43,6 +40,7 @@ public class ArtificialSAMIterator implements StingSAMIterator {
protected int currentChromo = 0;
protected int currentRead = 1;
protected int totalReadCount = 0;
protected int unmappedRemaining = 0;
protected boolean done = false;
// the next record
protected SAMRecord next = null;
@ -52,11 +50,16 @@ public class ArtificialSAMIterator implements StingSAMIterator {
protected final int sChr;
protected final int eChromosomeCount;
protected final int rCount;
protected int uMappedReadCount;
protected final int unmappedReadCount;
// let us know to make a read, we need this to help out the fake sam query iterator
private boolean initialized = false;
/**
* Is this iterator currently open or closed? Closed iterators can be reused.
*/
protected boolean open = false;
/**
* create the fake iterator, given the mapping of chromosomes and read counts
*
@ -70,8 +73,18 @@ public class ArtificialSAMIterator implements StingSAMIterator {
eChromosomeCount = (endingChr - startingChr) + 1;
rCount = readCount;
this.header = header;
unmappedReadCount = 0;
reset();
}
protected void reset() {
this.currentChromo = 0;
uMappedReadCount = 0;
this.currentRead = 1;
this.totalReadCount = 0;
this.done = false;
this.next = null;
this.initialized = false;
this.unmappedRemaining = unmappedReadCount;
}
/**
@ -88,7 +101,8 @@ public class ArtificialSAMIterator implements StingSAMIterator {
rCount = readCount;
this.header = header;
this.currentChromo = 0;
this.uMappedReadCount = unmappedReadCount;
this.unmappedReadCount = unmappedReadCount;
reset();
}
@ -97,11 +111,12 @@ public class ArtificialSAMIterator implements StingSAMIterator {
}
public void close() {
// done
currentChromo = Integer.MAX_VALUE;
open = false;
}
public boolean hasNext() {
open = true;
if (!initialized){
initialized = true;
createNextRead();
@ -119,13 +134,13 @@ public class ArtificialSAMIterator implements StingSAMIterator {
}
// check for end condition, have we finished the chromosome listing, and have no unmapped reads
if (currentChromo >= eChromosomeCount) {
if (uMappedReadCount < 1) {
if (unmappedRemaining < 1) {
this.next = null;
return false;
} else {
++totalReadCount;
this.next = ArtificialSAMUtils.createArtificialRead(this.header, String.valueOf(totalReadCount), -1, -1, 50);
--uMappedReadCount;
--unmappedRemaining;
return true;
}
}
@ -137,6 +152,8 @@ public class ArtificialSAMIterator implements StingSAMIterator {
public SAMRecord next() {
open = true;
SAMRecord ret = next;
createNextRead();
return ret;

View File

@ -65,6 +65,19 @@ public class ArtificialSAMQueryIterator extends ArtificialSAMIterator implements
this.startingChr = startingChr;
}
@Override
protected void reset() {
this.startPos = 0;
this.finalPos = 0;
this.contigIndex = -1;
// Doesn't make sense to reset the overlapping flag, because we rely on its state later on.
// TODO: Make this a bit more direct.
//overlapping = false;
this.startingChr = 0;
this.seeked = false;
super.reset();
}
/**
* query containing - get reads contained by the specified interval
*
@ -89,7 +102,6 @@ public class ArtificialSAMQueryIterator extends ArtificialSAMIterator implements
initialize(contig, start, stop);
}
@Override
public void query( String contig, int start, int stop, boolean contained ) {
if (contained)
queryContained(contig, start, stop);
@ -97,18 +109,19 @@ public class ArtificialSAMQueryIterator extends ArtificialSAMIterator implements
queryOverlapping(contig, start, stop);
}
@Override
public void queryUnmappedReads() {
initializeUnmapped();
}
/**
* initialize the iterator to an unmapped read position
*/
public void initializeUnmapped() {
// throw away data from the previous invocation, if one exists.
ensureUntouched();
while (super.hasNext() && this.peek().getReferenceIndex() >= 0) {
reset();
while (super.hasNext() && this.peek().getReferenceIndex() >= 0) {
super.next();
}
// sanity check that we have an actual matching read next
@ -131,7 +144,10 @@ public class ArtificialSAMQueryIterator extends ArtificialSAMIterator implements
* @param stop the stop postition
*/
private void initialize( String contig, int start, int stop ) {
// throw away data from the previous invocation, if one exists.
ensureUntouched();
reset();
finalPos = stop;
startPos = start;
if (finalPos < 0) {
@ -213,7 +229,7 @@ public class ArtificialSAMQueryIterator extends ArtificialSAMIterator implements
/** make sure we haven't been used as an iterator yet; this is to miror the MergingSamIterator2 action. */
public void ensureUntouched() {
if (this.currentChromo != 0 || this.currentRead > 1) {
if (open) {
throw new UnsupportedOperationException("We've already been used as an iterator; you can't query after that");
}
}

View File

@ -6,6 +6,7 @@ import java.io.File;
import java.util.*;
import org.broadinstitute.sting.gatk.iterators.QueryIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.utils.StingException;
/**
@ -219,7 +220,7 @@ public class ArtificialSAMUtils {
*
* @return StingSAMIterator representing the specified amount of fake data
*/
public static QueryIterator unmappedReadIterator( int startingChr, int endingChr, int readCount, int unmappedReadCount ) {
public static ArtificialSAMIterator unmappedReadIterator( int startingChr, int endingChr, int readCount, int unmappedReadCount ) {
SAMFileHeader header = createArtificialSamHeader(( endingChr - startingChr ) + 1, startingChr, readCount + DEFAULT_READ_LENGTH);
return new ArtificialSAMQueryIterator(startingChr, endingChr, readCount, unmappedReadCount, header);

View File

@ -58,7 +58,7 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
@Test
public void testCreateSingleIterator() {
ResourcePool iteratorPool = new ReferenceOrderedDataPool(rod);
RODIterator iterator = (RODIterator)iteratorPool.iterator( testSite1 );
RODIterator iterator = (RODIterator)iteratorPool.iterator( new MappedStreamSegment(testSite1) );
Assert.assertEquals("Number of iterators in the pool is incorrect", 1, iteratorPool.numIterators());
Assert.assertEquals("Number of available iterators in the pool is incorrect", 0, iteratorPool.numAvailableIterators());
@ -79,10 +79,10 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
@Test
public void testCreateMultipleIterators() {
ReferenceOrderedDataPool iteratorPool = new ReferenceOrderedDataPool(rod);
RODIterator iterator1 = (RODIterator)iteratorPool.iterator( testSite1 );
RODIterator iterator1 = iteratorPool.iterator( new MappedStreamSegment(testSite1) );
// Create a new iterator at position 2.
RODIterator iterator2 = iteratorPool.iterator( testSite2 );
RODIterator iterator2 = iteratorPool.iterator( new MappedStreamSegment(testSite2) );
Assert.assertEquals("Number of iterators in the pool is incorrect", 2, iteratorPool.numIterators());
Assert.assertEquals("Number of available iterators in the pool is incorrect", 0, iteratorPool.numAvailableIterators());
@ -129,7 +129,7 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
@Test
public void testIteratorConservation() {
ReferenceOrderedDataPool iteratorPool = new ReferenceOrderedDataPool(rod);
RODIterator iterator = (RODIterator)iteratorPool.iterator( testSite1 );
RODIterator iterator = (RODIterator)iteratorPool.iterator( new MappedStreamSegment(testSite1) );
Assert.assertEquals("Number of iterators in the pool is incorrect", 1, iteratorPool.numIterators());
Assert.assertEquals("Number of available iterators in the pool is incorrect", 0, iteratorPool.numAvailableIterators());
@ -143,7 +143,7 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
iteratorPool.release(iterator);
// Create another iterator after the current iterator.
iterator = iteratorPool.iterator(testSite3);
iterator = iteratorPool.iterator( new MappedStreamSegment(testSite3) );
// Make sure that the previously acquired iterator was reused.
Assert.assertEquals("Number of iterators in the pool is incorrect", 1, iteratorPool.numIterators());
@ -164,7 +164,7 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
@Test
public void testIteratorCreation() {
ReferenceOrderedDataPool iteratorPool = new ReferenceOrderedDataPool(rod);
RODIterator iterator = (RODIterator)iteratorPool.iterator( testSite3 );
RODIterator iterator = (RODIterator)iteratorPool.iterator( new MappedStreamSegment(testSite3) );
Assert.assertEquals("Number of iterators in the pool is incorrect", 1, iteratorPool.numIterators());
Assert.assertEquals("Number of available iterators in the pool is incorrect", 0, iteratorPool.numAvailableIterators());
@ -178,7 +178,7 @@ public class ReferenceOrderedDataPoolTest extends BaseTest {
iteratorPool.release(iterator);
// Create another iterator after the current iterator.
iterator = iteratorPool.iterator(testSite1);
iterator = iteratorPool.iterator(new MappedStreamSegment(testSite1) );
// Make sure that the previously acquired iterator was reused.
Assert.assertEquals("Number of iterators in the pool is incorrect", 2, iteratorPool.numIterators());

View File

@ -9,10 +9,11 @@ import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator;
import org.broadinstitute.sting.gatk.iterators.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.ArtificialSAMQueryIterator;
import org.broadinstitute.sting.utils.sam.ArtificialSAMIterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
@ -76,16 +77,20 @@ public class SAMByReadsTest extends BaseTest {
/** Test out that we can shard the file and iterate over every read */
@Test
public void testToUnmappedReads() {
ArtificialResourcePool gen = new ArtificialResourcePool(1,10,100,1000);
ArtificialResourcePool gen = new ArtificialResourcePool(createArtificialSamHeader(1,10,100,1000),
ArtificialSAMUtils.unmappedReadIterator(1, 100, 10, 1000) );
GenomeLocParser.setupRefContigOrdering(gen.getHeader().getSequenceDictionary());
try {
int unmappedReadsSeen = 0;
int iterations = 0;
SAMDataSource data = new SAMDataSource(reads,true);
data.setResourcePool(gen);
for (int x = 0; x < 10; x++) {
++iterations;
QueryIterator iter = ArtificialSAMUtils.unmappedReadIterator(1, 100, 10, 1000);
BoundedReadIterator ret = data.toUnmappedReads(100, iter);
StingSAMIterator ret = data.toUnmappedReads(100);
// count the reads we've gotten back
if (ret == null) {
fail("On iteration " + iterations + " we were returned a null pointer, after seeing " + unmappedReadsSeen + " reads out of a 1000");
@ -109,7 +114,8 @@ public class SAMByReadsTest extends BaseTest {
/** Test out that we can shard the file and iterate over every read */
@Test
public void testShardingOfReadsSize14() {
ArtificialResourcePool gen = new ArtificialResourcePool(1,10,100,1000);
ArtificialResourcePool gen = new ArtificialResourcePool(createArtificialSamHeader(1,10,100,1000),
ArtificialSAMUtils.queryReadIterator(1,10,100,1000) );
GenomeLocParser.setupRefContigOrdering(gen.getHeader().getSequenceDictionary());
targetReadCount = 14;
try {
@ -117,18 +123,22 @@ public class SAMByReadsTest extends BaseTest {
int readCount = 0;
SAMDataSource data = new SAMDataSource(reads,true);
ArrayList<Integer> readsPerShard = new ArrayList<Integer>();
data.setResourcePool(gen);
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
while (shardStrategy.hasNext()) {
int initialReadCount = readCount;
BoundedReadIterator ret = (BoundedReadIterator)data.seek(shardStrategy.next());
StingSAMIterator ret = data.seek(shardStrategy.next());
assertTrue(ret != null);
while (ret.hasNext()) {
ret.next();
readCount++;
}
readsPerShard.add(readCount-initialReadCount);
ret.close();
iterations++;
}
@ -159,7 +169,8 @@ public class SAMByReadsTest extends BaseTest {
/** Test out that we can shard the file and iterate over every read */
@Test
public void testShardingOfReadsSize25() {
ArtificialResourcePool gen = new ArtificialResourcePool(1,10,100,1000);
ArtificialResourcePool gen = new ArtificialResourcePool(createArtificialSamHeader(1,10,100,1000),
ArtificialSAMUtils.queryReadIterator(1,10,100,1000) );
GenomeLocParser.setupRefContigOrdering(gen.getHeader().getSequenceDictionary());
targetReadCount = 25;
try {
@ -206,7 +217,11 @@ public class SAMByReadsTest extends BaseTest {
}
private SAMFileHeader createArtificialSamHeader(int startingChr, int endingChr, int readCount, int readSize) {
return ArtificialSAMUtils.createArtificialSamHeader( ( endingChr - startingChr ) + 1,
startingChr,
readCount + readSize );
}
}
/**
@ -218,21 +233,42 @@ class ArtificialResourcePool extends SAMIteratorPool {
// the header
private SAMFileHeader header;
private final SAMFileHeader.SortOrder sortOrder = SAMFileHeader.SortOrder.coordinate;
private ArtificialSAMIterator iterator;
public ArtificialResourcePool( int startingChr, int endingChr, int readCount, int readSize) {
/**
* Track the iterator to see whether it's venturing into unmapped reads for the first
* time. If so, query straight there. Only works for query iterators.
*
* TODO: Clean up.
*/
private boolean intoUnmappedReads = false;
public ArtificialResourcePool( SAMFileHeader header, ArtificialSAMIterator iterator ) {
super( new Reads(Collections.<File>emptyList()),true );
header = ArtificialSAMUtils.createArtificialSamHeader(( endingChr - startingChr ) + 1, startingChr, readCount + readSize);
this.header = header;
this.iterator = iterator;
}
@Override
public QueryIterator iterator( GenomeLoc loc ) {
ArtificialSAMQueryIterator iter = ArtificialSAMUtils.queryReadIterator(1, 10, 100, 1000);
if (loc != null) {
iter.queryContained(loc.getContig(), (int)loc.getStart(), (int)loc.getStop());
public StingSAMIterator iterator( DataStreamSegment segment ) {
if (segment instanceof MappedStreamSegment && iterator instanceof ArtificialSAMQueryIterator) {
ArtificialSAMQueryIterator queryIterator = (ArtificialSAMQueryIterator)iterator;
MappedStreamSegment mappedSegment = (MappedStreamSegment)segment;
queryIterator.queryContained(mappedSegment.locus.getContig(), (int)mappedSegment.locus.getStart(), (int)mappedSegment.locus.getStop());
return queryIterator;
}
return iter;
else if (segment instanceof UnmappedStreamSegment) {
if( !intoUnmappedReads ) {
if( iterator instanceof ArtificialSAMQueryIterator ) {
ArtificialSAMQueryIterator queryIterator = (ArtificialSAMQueryIterator)iterator;
queryIterator.queryUnmappedReads();
}
intoUnmappedReads = true;
}
return new BoundedReadIterator(iterator,((UnmappedStreamSegment)segment).size);
}
else
throw new StingException("Unsupported segment type passed to test");
}
/**
@ -243,4 +279,4 @@ class ArtificialResourcePool extends SAMIteratorPool {
public SAMFileHeader getHeader() {
return this.header;
}
}
}