Experimental, downsampler-friendly read shard balancer

-Only used when experimental downsampling is enabled

-Persists read iterators across shards, creating a new set only when we've exhausted
the current BAM file region(s). This prevents the engine from revisiting regions discarded
by the downsamplers / filters, as could happen in the old implementation.

-SAMDataSource no longer tracks low-level file positions in experimental mode. Can strip
out all related code when the engine fork is collapsed.

-Defensive implementation that assumes BAM file regions coming out of the BAM Schedule
can overlap; should be able to improve performance if we can prove they cannot possibly
overlap.

-Tests a bit on the extreme side (~8 minute runtime) for now; will scale these back
once confidence in the code is gained
This commit is contained in:
David Roazen 2012-09-12 13:00:29 -04:00
parent ab8fa8f359
commit 133085469f
15 changed files with 630 additions and 222 deletions

View File

@ -125,6 +125,37 @@ public class GATKBAMFileSpan extends BAMFileSpan {
return size;
}
/**
* Get a GATKChunk representing the "extent" of this file span, from the start of the first
* chunk to the end of the last chunk.The chunks list must be sorted in order to use this method.
*
* @return a GATKChunk representing the extent of this file span, or a GATKChunk representing
* a span of size 0 if there are no chunks
*/
public GATKChunk getExtent() {
validateSorted(); // TODO: defensive measure: may be unnecessary
List<Chunk> chunks = getChunks();
if ( chunks.isEmpty() ) {
return new GATKChunk(0L, 0L);
}
return new GATKChunk(chunks.get(0).getChunkStart(), chunks.get(chunks.size() - 1).getChunkEnd());
}
/**
* Validates the list of chunks to ensure that they appear in sorted order.
*/
private void validateSorted() {
List<Chunk> chunks = getChunks();
for ( int i = 1; i < chunks.size(); i++ ) {
if ( chunks.get(i).getChunkStart() < chunks.get(i-1).getChunkEnd() ) {
throw new ReviewedStingException(String.format("Chunk list is unsorted; chunk %s is before chunk %s", chunks.get(i-1), chunks.get(i)));
}
}
}
/**
* Computes the union of two FileSpans.
* @param other FileSpan to union with this one.

View File

@ -548,6 +548,7 @@ public class GenomeAnalysisEngine {
*/
protected Iterable<Shard> getShardStrategy(SAMDataSource readsDataSource, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) {
ValidationExclusion exclusions = (readsDataSource != null ? readsDataSource.getReadsInfo().getValidationExclusionList() : null);
DownsamplingMethod downsamplingMethod = readsDataSource != null ? readsDataSource.getReadsInfo().getDownsamplingMethod() : null;
ReferenceDataSource referenceDataSource = this.getReferenceDataSource();
// If reads are present, assume that accessing the reads is always the dominant factor and shard based on that supposition.
@ -582,10 +583,15 @@ public class GenomeAnalysisEngine {
throw new UserException.CommandLineException("Pairs traversal cannot be used in conjunction with intervals.");
}
// Use the experimental ReadShardBalancer if experimental downsampling is enabled
ShardBalancer readShardBalancer = downsamplingMethod != null && downsamplingMethod.useExperimentalDownsampling ?
new ExperimentalReadShardBalancer() :
new ReadShardBalancer();
if(intervals == null)
return readsDataSource.createShardIteratorOverAllReads(new ReadShardBalancer());
return readsDataSource.createShardIteratorOverAllReads(readShardBalancer);
else
return readsDataSource.createShardIteratorOverIntervals(intervals,new ReadShardBalancer());
return readsDataSource.createShardIteratorOverIntervals(intervals, readShardBalancer);
}
else
throw new ReviewedStingException("Unable to determine walker type for walker " + walker.getClass().getName());

View File

@ -30,6 +30,7 @@ import java.util.List;
public class ReadProperties {
private final Collection<SAMReaderID> readers;
private final SAMFileHeader header;
private final SAMFileHeader.SortOrder sortOrder;
private final SAMFileReader.ValidationStringency validationStringency;
private final DownsamplingMethod downsamplingMethod;
private final ValidationExclusion exclusionList;
@ -64,6 +65,14 @@ public class ReadProperties {
return header;
}
/**
* Gets the sort order of the reads
* @return the sort order of the reads
*/
public SAMFileHeader.SortOrder getSortOrder() {
return sortOrder;
}
/**
* How strict should validation be?
* @return Stringency of validation.
@ -130,6 +139,7 @@ public class ReadProperties {
*/
public ReadProperties( Collection<SAMReaderID> samFiles,
SAMFileHeader header,
SAMFileHeader.SortOrder sortOrder,
boolean useOriginalBaseQualities,
SAMFileReader.ValidationStringency strictness,
DownsamplingMethod downsamplingMethod,
@ -140,6 +150,7 @@ public class ReadProperties {
byte defaultBaseQualities) {
this.readers = samFiles;
this.header = header;
this.sortOrder = sortOrder;
this.validationStringency = strictness;
this.downsamplingMethod = downsamplingMethod == null ? DownsamplingMethod.NONE : downsamplingMethod;
this.exclusionList = exclusionList == null ? new ValidationExclusion() : exclusionList;

View File

@ -124,7 +124,18 @@ public class BAMScheduler implements Iterator<FilePointer> {
*/
private FilePointer generatePointerOverEntireFileset() {
FilePointer filePointer = new FilePointer();
Map<SAMReaderID,GATKBAMFileSpan> currentPosition = dataSource.getCurrentPosition();
Map<SAMReaderID,GATKBAMFileSpan> currentPosition;
// Only use the deprecated SAMDataSource.getCurrentPosition() if we're not using experimental downsampling
// TODO: clean this up once the experimental downsampling engine fork collapses
if ( dataSource.getReadsInfo().getDownsamplingMethod() != null && dataSource.getReadsInfo().getDownsamplingMethod().useExperimentalDownsampling ) {
currentPosition = dataSource.getInitialReaderPositions();
}
else {
currentPosition = dataSource.getCurrentPosition();
}
for(SAMReaderID reader: dataSource.getReaderIDs())
filePointer.addFileSpans(reader,createSpanToEndOfFile(currentPosition.get(reader).getGATKChunks().get(0).getChunkStart()));
return filePointer;

View File

@ -0,0 +1,179 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import java.util.*;
/**
* Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards
*
* @author David Roazen
*/
public class ExperimentalReadShardBalancer extends ShardBalancer {
private static Logger logger = Logger.getLogger(ExperimentalReadShardBalancer.class);
/**
* Convert iterators of file pointers into balanced iterators of shards.
* @return An iterator over balanced shards.
*/
public Iterator<Shard> iterator() {
return new Iterator<Shard>() {
/**
* The cached shard to be returned next. Prefetched in the peekable iterator style.
*/
private Shard nextShard = null;
/**
* The file pointer currently being processed.
*/
private FilePointer currentFilePointer = null;
/**
* Iterator over the reads from the current file pointer. The same iterator will be
* used to fill all shards associated with a given file pointer
*/
private PeekableIterator<SAMRecord> currentFilePointerReadsIterator = null;
{
if ( filePointers.hasNext() )
currentFilePointer = filePointers.next();
advance();
}
public boolean hasNext() {
return nextShard != null;
}
public Shard next() {
if ( ! hasNext() )
throw new NoSuchElementException("No next read shard available");
Shard currentShard = nextShard;
advance();
return currentShard;
}
private void advance() {
nextShard = null;
// May need multiple iterations to fill the next shard if all reads in current file spans get filtered/downsampled away
while ( nextShard == null && currentFilePointer != null ) {
// If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one):
if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) {
do {
advanceFilePointer();
} while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers
// We'll need to create a fresh iterator for this file pointer when we create the first
// shard for it below.
currentFilePointerReadsIterator = null;
}
// At this point if currentFilePointer is non-null we know it is also non-empty. Our
// currentFilePointerReadsIterator may be null or non-null depending on whether or not
// this is our first shard for this file pointer.
if ( currentFilePointer != null ) {
Shard shard = new ReadShard(parser,readsDataSource,currentFilePointer.fileSpans,currentFilePointer.locations,currentFilePointer.isRegionUnmapped);
// Create a new reads iterator only when we've just advanced to a new file pointer. It's
// essential that the iterators persist across all shards that share the same file pointer
// to allow the downsampling to work properly.
if ( currentFilePointerReadsIterator == null ) {
currentFilePointerReadsIterator = new PeekableIterator<SAMRecord>(readsDataSource.getIterator(shard));
}
if ( currentFilePointerReadsIterator.hasNext() ) {
shard.fill(currentFilePointerReadsIterator);
nextShard = shard;
}
}
}
}
private void advanceFilePointer() {
FilePointer previousFilePointer = currentFilePointer;
currentFilePointer = filePointers.hasNext() ? filePointers.next() : null;
// TODO: This is a purely defensive measure to guard against the possibility of overlap
// TODO: between FilePointers. When overlap is detected, remove the overlapping regions from
// TODO: the newly-current FilePointer.
// TODO: If we later discover that overlap is theoretically impossible, this step becomes
// TODO: unnecessary and should be removed.
if ( currentFilePointer != null && previousFilePointer != null &&
previousFilePointer.hasFileSpansOverlappingWith(currentFilePointer) ) {
logger.debug(String.format("%s: found consecutive overlapping FilePointers [%s] and [%s]", getClass().getSimpleName(), previousFilePointer, currentFilePointer));
Map<SAMReaderID, SAMFileSpan> previousFileSpans = previousFilePointer.getFileSpans();
Map<SAMReaderID, SAMFileSpan> trimmedFileSpans = new HashMap<SAMReaderID, SAMFileSpan>(currentFilePointer.getFileSpans().size());
for ( Map.Entry<SAMReaderID, SAMFileSpan> fileSpanEntry : currentFilePointer.getFileSpans().entrySet() ) {
// find the corresponding file span from the previous FilePointer
SAMFileSpan previousFileSpan = previousFileSpans.get(fileSpanEntry.getKey());
if ( previousFileSpan == null ) {
// no match, so no trimming required
trimmedFileSpans.put(fileSpanEntry.getKey(), fileSpanEntry.getValue());
}
else {
// match, so remove any overlapping regions (regions before the start of the
// region immediately following the previous file span)
SAMFileSpan trimmedSpan = fileSpanEntry.getValue().removeContentsBefore(previousFileSpan.getContentsFollowing());
trimmedFileSpans.put(fileSpanEntry.getKey(), trimmedSpan);
}
}
// Replace the current file pointer with its trimmed equivalent
currentFilePointer = new FilePointer(trimmedFileSpans, currentFilePointer.locations);
}
}
/**
* Detects whether the list of file spans contain any read data.
* @param selectedSpans Mapping of readers to file spans.
* @return True if file spans are completely empty; false otherwise.
*/
private boolean isEmpty(Map<SAMReaderID,SAMFileSpan> selectedSpans) {
for(SAMFileSpan fileSpan: selectedSpans.values()) {
if(!fileSpan.isEmpty())
return false;
}
return true;
}
public void remove() {
throw new UnsupportedOperationException("Unable to remove from shard balancing iterator");
}
};
}
}

View File

@ -50,16 +50,37 @@ public class FilePointer {
public FilePointer(final GenomeLoc... locations) {
this.locations.addAll(Arrays.asList(locations));
this.isRegionUnmapped = checkUnmappedStatus();
}
public FilePointer( Map<SAMReaderID,SAMFileSpan> fileSpans, List<GenomeLoc> locations ) {
this.fileSpans.putAll(fileSpans);
this.locations.addAll(locations);
this.isRegionUnmapped = checkUnmappedStatus();
}
private boolean checkUnmappedStatus() {
boolean foundMapped = false, foundUnmapped = false;
for(GenomeLoc location: locations) {
if(GenomeLoc.isUnmapped(location))
for( GenomeLoc location: locations ) {
if ( GenomeLoc.isUnmapped(location) )
foundUnmapped = true;
else
foundMapped = true;
}
if(foundMapped && foundUnmapped)
if ( foundMapped && foundUnmapped )
throw new ReviewedStingException("BUG: File pointers cannot be mixed mapped/unmapped.");
this.isRegionUnmapped = foundUnmapped;
return foundUnmapped;
}
/**
* Returns an immutable view of this FilePointer's file spans
*
* @return an immutable view of this FilePointer's file spans
*/
public Map<SAMReaderID, SAMFileSpan> getFileSpans() {
return Collections.unmodifiableMap(fileSpans);
}
/**
@ -98,7 +119,13 @@ public class FilePointer {
}
public void addLocation(final GenomeLoc location) {
locations.add(location);
this.locations.add(location);
checkUnmappedStatus();
}
public void addLocations( final List<GenomeLoc> locations ) {
this.locations.addAll(locations);
checkUnmappedStatus();
}
public void addFileSpans(final SAMReaderID id, final SAMFileSpan fileSpan) {
@ -216,6 +243,32 @@ public class FilePointer {
combined.addFileSpans(initialElement.getKey(),fileSpan);
}
/**
* Returns true if any of the file spans in this FilePointer overlap their counterparts in
* the other FilePointer. "Overlap" is defined as having an overlapping extent (the region
* from the start of the first chunk to the end of the last chunk).
*
* @param other the FilePointer against which to check overlap with this FilePointer
* @return true if any file spans overlap their counterparts in other, otherwise false
*/
public boolean hasFileSpansOverlappingWith( FilePointer other ) {
for ( Map.Entry<SAMReaderID, SAMFileSpan> thisFilePointerEntry : fileSpans.entrySet() ) {
GATKBAMFileSpan thisFileSpan = new GATKBAMFileSpan(thisFilePointerEntry.getValue());
SAMFileSpan otherEntry = other.fileSpans.get(thisFilePointerEntry.getKey());
if ( otherEntry == null ) {
continue; // no counterpart for this file span in other
}
GATKBAMFileSpan otherFileSpan = new GATKBAMFileSpan(otherEntry);
if ( thisFileSpan.getExtent().overlaps(otherFileSpan.getExtent()) ) {
return true;
}
}
return false;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();

View File

@ -1,17 +1,15 @@
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.*;
import net.sf.samtools.util.CloseableIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
*
@ -103,6 +101,67 @@ public class ReadShard extends Shard {
reads.add(read);
}
/**
* Fills this shard's buffer with reads from the iterator passed in
*
* @param readIter Iterator from which to draw the reads to fill the shard
*/
@Override
public void fill( PeekableIterator<SAMRecord> readIter ) {
if( ! buffersReads() )
throw new ReviewedStingException("Attempting to fill a non-buffering shard.");
SAMFileHeader.SortOrder sortOrder = getReadProperties().getSortOrder();
SAMRecord read = null;
while( ! isBufferFull() && readIter.hasNext() ) {
final SAMRecord nextRead = readIter.peek();
if ( read == null || (nextRead.getReferenceIndex().equals(read.getReferenceIndex())) ) {
// only add reads to the shard if they are on the same contig
read = readIter.next();
addRead(read);
} else {
break;
}
}
// If the reads are sorted in coordinate order, ensure that all reads
// having the same alignment start become part of the same shard, to allow
// downsampling to work better across shard boundaries. Note that because our
// read stream has already been fed through the positional downsampler, which
// ensures that at each alignment start position there are no more than dcov
// reads, we're in no danger of accidentally creating a disproportionately huge
// shard
if ( sortOrder == SAMFileHeader.SortOrder.coordinate ) {
while ( readIter.hasNext() ) {
SAMRecord additionalRead = readIter.peek();
// Stop filling the shard as soon as we encounter a read having a different
// alignment start or contig from the last read added in the earlier loop
// above, or an unmapped read
if ( read == null ||
additionalRead.getReadUnmappedFlag() ||
! additionalRead.getReferenceIndex().equals(read.getReferenceIndex()) ||
additionalRead.getAlignmentStart() != read.getAlignmentStart() ) {
break;
}
addRead(readIter.next());
}
}
// If the reads are sorted in queryname order, ensure that all reads
// having the same queryname become part of the same shard.
if( sortOrder == SAMFileHeader.SortOrder.queryname ) {
while( readIter.hasNext() ) {
SAMRecord nextRead = readIter.peek();
if( read == null || ! read.getReadName().equals(nextRead.getReadName()) )
break;
addRead(readIter.next());
}
}
}
/**
* Creates an iterator over reads stored in this shard's read cache.
* @return

View File

@ -34,6 +34,8 @@ import java.util.NoSuchElementException;
/**
* Divide up large file pointers containing reads into more manageable subcomponents.
*
* TODO: delete this class once the experimental downsampling engine fork collapses
*/
public class ReadShardBalancer extends ShardBalancer {
/**

View File

@ -99,6 +99,8 @@ public class SAMDataSource {
/**
* How far along is each reader?
*
* TODO: delete this once the experimental downsampling engine fork collapses
*/
private final Map<SAMReaderID,GATKBAMFileSpan> readerPositions = new HashMap<SAMReaderID,GATKBAMFileSpan>();
@ -154,8 +156,6 @@ public class SAMDataSource {
*/
private final ThreadAllocation threadAllocation;
private final boolean expandShardsForDownsampling;
/**
* Create a new SAM data source given the supplied read metadata.
* @param samFiles list of reads files.
@ -297,6 +297,7 @@ public class SAMDataSource {
readProperties = new ReadProperties(
samFiles,
mergedHeader,
sortOrder,
useOriginalBaseQualities,
strictness,
downsamplingMethod,
@ -306,11 +307,6 @@ public class SAMDataSource {
includeReadsWithDeletionAtLoci,
defaultBaseQualities);
expandShardsForDownsampling = readProperties.getDownsamplingMethod() != null &&
readProperties.getDownsamplingMethod().useExperimentalDownsampling &&
readProperties.getDownsamplingMethod().type != DownsampleType.NONE &&
readProperties.getDownsamplingMethod().toCoverage != null;
// cache the read group id (original) -> read group id (merged)
// and read group id (merged) -> read group id (original) mappings.
for(SAMReaderID id: readerIDs) {
@ -384,7 +380,10 @@ public class SAMDataSource {
/**
* Retrieves the current position within the BAM file.
* @return A mapping of reader to current position.
*
* TODO: delete this once the experimental downsampling engine fork collapses
*/
@Deprecated
public Map<SAMReaderID,GATKBAMFileSpan> getCurrentPosition() {
return readerPositions;
}
@ -467,19 +466,15 @@ public class SAMDataSource {
}
/**
* Are we expanding shards as necessary to prevent shard boundaries from occurring at improper places?
* Legacy method to fill the given buffering shard with reads.
*
* Shard.fill() is used instead of this method when experimental downsampling is enabled
*
* TODO: delete this method once the experimental downsampling engine fork collapses
*
* @return true if we are using expanded shards, otherwise false
*/
public boolean usingExpandedShards() {
return expandShardsForDownsampling;
}
/**
* Fill the given buffering shard with reads.
* @param shard Shard to fill.
*/
@Deprecated
public void fillShard(Shard shard) {
if(!shard.buffersReads())
throw new ReviewedStingException("Attempting to fill a non-buffering shard.");
@ -503,31 +498,6 @@ public class SAMDataSource {
}
}
// If the reads are sorted in coordinate order, ensure that all reads
// having the same alignment start become part of the same shard, to allow
// downsampling to work better across shard boundaries. Note that because our
// read stream has already been fed through the positional downsampler, which
// ensures that at each alignment start position there are no more than dcov
// reads, we're in no danger of accidentally creating a disproportionately huge
// shard
if ( expandShardsForDownsampling && sortOrder == SAMFileHeader.SortOrder.coordinate ) {
while ( iterator.hasNext() ) {
SAMRecord additionalRead = iterator.next();
// Stop filling the shard as soon as we encounter a read having a different
// alignment start or contig from the last read added in the earlier loop
// above, or an unmapped read
if ( read == null ||
additionalRead.getReadUnmappedFlag() ||
! additionalRead.getReferenceIndex().equals(read.getReferenceIndex()) ||
additionalRead.getAlignmentStart() != read.getAlignmentStart() ) {
break;
}
shard.addRead(additionalRead);
noteFilePositionUpdate(positionUpdates, additionalRead);
}
}
// If the reads are sorted in queryname order, ensure that all reads
// having the same queryname become part of the same shard.
if(sortOrder == SAMFileHeader.SortOrder.queryname) {
@ -547,6 +517,10 @@ public class SAMDataSource {
readerPositions.put(readers.getReaderID(positionUpdate.getKey()),positionUpdate.getValue());
}
/*
* TODO: delete this method once the experimental downsampling engine fork collapses
*/
@Deprecated
private void noteFilePositionUpdate(Map<SAMFileReader,GATKBAMFileSpan> positionMapping, SAMRecord read) {
GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing());
positionMapping.put(read.getFileSource().getReader(),endChunk);
@ -557,8 +531,7 @@ public class SAMDataSource {
return shard.iterator();
}
else {
SAMReaders readers = resourcePool.getAvailableReaders();
return getIterator(readers,shard,shard instanceof ReadShard);
return getIterator(shard);
}
}
@ -578,13 +551,44 @@ public class SAMDataSource {
/**
* Initialize the current reader positions
*
* TODO: delete this once the experimental downsampling engine fork collapses
*
* @param readers
*/
@Deprecated
private void initializeReaderPositions(SAMReaders readers) {
for(SAMReaderID id: getReaderIDs())
readerPositions.put(id,new GATKBAMFileSpan(readers.getReader(id).getFilePointerSpanningReads()));
}
/**
* Get the initial reader positions across all BAM files
*
* @return the start positions of the first chunk of reads for all BAM files
*/
public Map<SAMReaderID, GATKBAMFileSpan> getInitialReaderPositions() {
Map<SAMReaderID, GATKBAMFileSpan> initialPositions = new HashMap<SAMReaderID, GATKBAMFileSpan>();
SAMReaders readers = resourcePool.getAvailableReaders();
for ( SAMReaderID id: getReaderIDs() ) {
initialPositions.put(id, new GATKBAMFileSpan(readers.getReader(id).getFilePointerSpanningReads()));
}
resourcePool.releaseReaders(readers);
return initialPositions;
}
/**
* Get an iterator over the data types specified in the shard.
*
* @param shard The shard specifying the data limits.
* @return An iterator over the selected data.
*/
public StingSAMIterator getIterator( Shard shard ) {
return getIterator(resourcePool.getAvailableReaders(), shard, shard instanceof ReadShard);
}
/**
* Get an iterator over the data types specified in the shard.
* @param readers Readers from which to load data.

View File

@ -1,5 +1,6 @@
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.gatk.ReadMetrics;
@ -203,6 +204,12 @@ public abstract class Shard implements HasGenomeLocation {
*/
public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Fills the shard with reads. Can only do this with shards that buffer reads
* @param readIter Iterator from which to draw the reads to fill the shard
*/
public void fill( PeekableIterator<SAMRecord> readIter ) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Gets the iterator over the elements cached in the shard.
* @return

View File

@ -26,6 +26,7 @@ package org.broadinstitute.sting.gatk.datasources.reads;
import com.google.caliper.Param;
import net.sf.picard.filter.FilteringIterator;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.commandline.Tags;
@ -71,6 +72,7 @@ public class DownsamplerBenchmark extends ReadProcessingBenchmark {
SAMFileReader reader = new SAMFileReader(inputFile);
ReadProperties readProperties = new ReadProperties(Collections.<SAMReaderID>singletonList(new SAMReaderID(inputFile,new Tags())),
reader.getFileHeader(),
SAMFileHeader.SortOrder.coordinate,
false,
SAMFileReader.ValidationStringency.SILENT,
downsampling.create(),

View File

@ -0,0 +1,203 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.*;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.commandline.Tags;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.downsampling.DownsampleType;
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
import org.broadinstitute.sting.gatk.filters.ReadFilter;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.ArtificialSingleSampleReadStream;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.Assert;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
public class ExperimentalReadShardBalancerUnitTest extends BaseTest {
/**
* Tests to ensure that ExperimentalReadShardBalancer works as expected and does not place shard boundaries
* at inappropriate places, such as within an alignment start position
*/
private static class ExperimentalReadShardBalancerTest extends TestDataProvider {
private int numContigs;
private int numStacksPerContig;
private int stackSize;
private int numUnmappedReads;
private DownsamplingMethod downsamplingMethod;
private int expectedReadCount;
private SAMFileHeader header;
private SAMReaderID testBAM;
public ExperimentalReadShardBalancerTest( int numContigs,
int numStacksPerContig,
int stackSize,
int numUnmappedReads,
int downsamplingTargetCoverage ) {
super(ExperimentalReadShardBalancerTest.class);
this.numContigs = numContigs;
this.numStacksPerContig = numStacksPerContig;
this.stackSize = stackSize;
this.numUnmappedReads = numUnmappedReads;
this.downsamplingMethod = new DownsamplingMethod(DownsampleType.BY_SAMPLE, downsamplingTargetCoverage, null, true);
this.expectedReadCount = Math.min(stackSize, downsamplingTargetCoverage) * numStacksPerContig * numContigs + numUnmappedReads;
setName(String.format("%s: numContigs=%d numStacksPerContig=%d stackSize=%d numUnmappedReads=%d downsamplingTargetCoverage=%d",
getClass().getSimpleName(), numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage));
}
public void run() {
createTestBAM();
SAMDataSource dataSource = new SAMDataSource(Arrays.asList(testBAM),
new ThreadAllocation(),
null,
new GenomeLocParser(header.getSequenceDictionary()),
false,
SAMFileReader.ValidationStringency.SILENT,
null,
downsamplingMethod,
new ValidationExclusion(),
new ArrayList<ReadFilter>(),
false);
Iterable<Shard> shardIterator = dataSource.createShardIteratorOverAllReads(new ExperimentalReadShardBalancer());
SAMRecord readAtEndOfLastShard = null;
int totalReadsSeen = 0;
for ( Shard shard : shardIterator ) {
int numContigsThisShard = 0;
SAMRecord lastRead = null;
for ( SAMRecord read : shard.iterator() ) {
totalReadsSeen++;
if ( lastRead == null ) {
numContigsThisShard = 1;
}
else if ( ! read.getReadUnmappedFlag() && ! lastRead.getReferenceIndex().equals(read.getReferenceIndex()) ) {
numContigsThisShard++;
}
// If the last read from the previous shard is not unmapped, we have to make sure
// that no reads in this shard start at the same position
if ( readAtEndOfLastShard != null && ! readAtEndOfLastShard.getReadUnmappedFlag() ) {
Assert.assertFalse(readAtEndOfLastShard.getReferenceIndex().equals(read.getReferenceIndex()) &&
readAtEndOfLastShard.getAlignmentStart() == read.getAlignmentStart(),
String.format("Reads from alignment start position %d:%d are split across multiple shards",
read.getReferenceIndex(), read.getAlignmentStart()));
}
lastRead = read;
}
// There should never be reads from more than 1 contig in a shard (ignoring unmapped reads)
Assert.assertTrue(numContigsThisShard == 1, "found a shard with reads from multiple contigs");
readAtEndOfLastShard = lastRead;
}
Assert.assertEquals(totalReadsSeen, expectedReadCount, "did not encounter the expected number of reads");
}
private void createTestBAM() {
header = ArtificialSAMUtils.createArtificialSamHeader(numContigs, 1, 100000);
SAMReadGroupRecord readGroup = new SAMReadGroupRecord("foo");
readGroup.setSample("testSample");
header.addReadGroup(readGroup);
ArtificialSingleSampleReadStream artificialReads = new ArtificialSingleSampleReadStream(header,
"foo",
numContigs,
numStacksPerContig,
stackSize,
stackSize,
1,
100,
50,
150,
numUnmappedReads);
File testBAMFile;
try {
testBAMFile = File.createTempFile("SAMDataSourceFillShardBoundaryTest", ".bam");
testBAMFile.deleteOnExit();
}
catch ( IOException e ) {
throw new ReviewedStingException(String.format("Failed to create temp bam file for test %s. %s", this, e.getMessage()));
}
SAMFileWriter bamWriter = new SAMFileWriterFactory().setCreateIndex(true).makeBAMWriter(header, true, testBAMFile);
for ( SAMRecord read : artificialReads ) {
bamWriter.addAlignment(read);
}
bamWriter.close();
testBAM = new SAMReaderID(testBAMFile, new Tags());
new File(testBAM.getSamFilePath().replace(".bam", ".bai")).deleteOnExit();
new File(testBAM.getSamFilePath() + ".bai").deleteOnExit();
}
}
@DataProvider(name = "ExperimentalReadShardBalancerTestDataProvider")
public Object[][] createExperimentalReadShardBalancerTests() {
for ( int numContigs = 1; numContigs <= 3; numContigs++ ) {
for ( int numStacksPerContig : Arrays.asList(1, 2, 4) ) {
// Use crucial read shard boundary values as the stack sizes
for ( int stackSize : Arrays.asList(ReadShard.MAX_READS / 2, ReadShard.MAX_READS / 2 + 10, ReadShard.MAX_READS, ReadShard.MAX_READS - 1, ReadShard.MAX_READS + 1, ReadShard.MAX_READS * 2) ) {
for ( int numUnmappedReads : Arrays.asList(0, ReadShard.MAX_READS / 2, ReadShard.MAX_READS * 2) ) {
// The first value will result in no downsampling at all, the others in some downsampling
for ( int downsamplingTargetCoverage : Arrays.asList(ReadShard.MAX_READS * 10, ReadShard.MAX_READS, ReadShard.MAX_READS / 2) ) {
new ExperimentalReadShardBalancerTest(numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage);
}
}
}
}
}
return ExperimentalReadShardBalancerTest.getTests(ExperimentalReadShardBalancerTest.class);
}
@Test(dataProvider = "ExperimentalReadShardBalancerTestDataProvider")
public void runExperimentalReadShardBalancerTest( ExperimentalReadShardBalancerTest test ) {
logger.warn("Running test: " + test);
test.run();
}
}

View File

@ -29,30 +29,21 @@ import net.sf.samtools.*;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.commandline.Tags;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.downsampling.DownsampleType;
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
import org.broadinstitute.sting.gatk.filters.ReadFilter;
import org.broadinstitute.sting.gatk.iterators.ReadTransformer;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.ArtificialSingleSampleReadStream;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.Assert;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -66,165 +57,12 @@ import static org.testng.Assert.*;
*/
public class SAMDataSourceUnitTest extends BaseTest {
// TODO: These legacy tests should really be replaced with a more comprehensive suite of tests for SAMDataSource
private List<SAMReaderID> readers;
private IndexedFastaSequenceFile seq;
private GenomeLocParser genomeLocParser;
/***********************************
* Tests for the fillShard() method
***********************************/
/**
* Tests to ensure that the fillShard() method does not place shard boundaries at inappropriate places,
* such as within an alignment start position
*/
private static class SAMDataSourceFillShardBoundaryTest extends TestDataProvider {
private int numContigs;
private int numStacksPerContig;
private int stackSize;
private int numUnmappedReads;
private DownsamplingMethod downsamplingMethod;
private SAMFileHeader header;
public SAMDataSourceFillShardBoundaryTest( int numContigs,
int numStacksPerContig,
int stackSize,
int numUnmappedReads,
int downsamplingTargetCoverage ) {
super(SAMDataSourceFillShardBoundaryTest.class);
this.numContigs = numContigs;
this.numStacksPerContig = numStacksPerContig;
this.stackSize = stackSize;
this.numUnmappedReads = numUnmappedReads;
this.downsamplingMethod = new DownsamplingMethod(DownsampleType.BY_SAMPLE, downsamplingTargetCoverage, null, true);
setName(String.format("%s: numContigs=%d numStacksPerContig=%d stackSize=%d numUnmappedReads=%d downsamplingTargetCoverage=%d",
getClass().getSimpleName(), numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage));
}
public void run() {
SAMDataSource dataSource = new SAMDataSource(Arrays.asList(createTestBAM()),
new ThreadAllocation(),
null,
new GenomeLocParser(header.getSequenceDictionary()),
false,
SAMFileReader.ValidationStringency.SILENT,
null,
downsamplingMethod,
new ValidationExclusion(),
new ArrayList<ReadFilter>(),
false);
Assert.assertTrue(dataSource.usingExpandedShards());
Iterable<Shard> shardIterator = dataSource.createShardIteratorOverAllReads(new ReadShardBalancer());
SAMRecord readAtEndOfLastShard = null;
for ( Shard shard : shardIterator ) {
int numContigsThisShard = 0;
SAMRecord lastRead = null;
for ( SAMRecord read : shard.iterator() ) {
if ( lastRead == null ) {
numContigsThisShard = 1;
}
else if ( ! read.getReadUnmappedFlag() && ! lastRead.getReferenceIndex().equals(read.getReferenceIndex()) ) {
numContigsThisShard++;
}
// If the last read from the previous shard is not unmapped, we have to make sure
// that no reads in this shard start at the same position
if ( readAtEndOfLastShard != null && ! readAtEndOfLastShard.getReadUnmappedFlag() ) {
Assert.assertFalse(readAtEndOfLastShard.getReferenceIndex().equals(read.getReferenceIndex()) &&
readAtEndOfLastShard.getAlignmentStart() == read.getAlignmentStart(),
String.format("Reads from alignment start position %d:%d are split across multiple shards",
read.getReferenceIndex(), read.getAlignmentStart()));
}
lastRead = read;
}
// There should never be reads from more than 1 contig in a shard (ignoring unmapped reads)
Assert.assertTrue(numContigsThisShard == 1, "found a shard with reads from multiple contigs");
readAtEndOfLastShard = lastRead;
}
}
private SAMReaderID createTestBAM() {
header = ArtificialSAMUtils.createArtificialSamHeader(numContigs, 1, 100000);
SAMReadGroupRecord readGroup = new SAMReadGroupRecord("foo");
readGroup.setSample("testSample");
header.addReadGroup(readGroup);
ArtificialSingleSampleReadStream artificialReads = new ArtificialSingleSampleReadStream(header,
"foo",
numContigs,
numStacksPerContig,
stackSize,
stackSize,
1,
100,
50,
150,
numUnmappedReads);
File testBAMFile;
try {
testBAMFile = File.createTempFile("SAMDataSourceFillShardBoundaryTest", ".bam");
testBAMFile.deleteOnExit();
}
catch ( IOException e ) {
throw new ReviewedStingException(String.format("Failed to create temp bam file for test %s. %s", this, e.getMessage()));
}
SAMFileWriter bamWriter = new SAMFileWriterFactory().setCreateIndex(true).makeBAMWriter(header, true, testBAMFile);
for ( SAMRecord read : artificialReads ) {
bamWriter.addAlignment(read);
}
bamWriter.close();
return new SAMReaderID(testBAMFile, new Tags());
}
}
@DataProvider(name = "SAMDataSourceFillShardTestDataProvider")
public Object[][] createSAMDataSourceFillShardBoundaryTests() {
// Take downsampling out of the equation for these tests -- we are only interested in whether the
// shard boundaries occur at the right places in the read stream, and removing downsampling as a
// factor simplifies that task (note that we still need to provide a specific downsampling method with
// experimental downsampling enabled to trigger the shard expansion behavior, for now)
int downsamplingTargetCoverage = ReadShard.MAX_READS * 10;
for ( int numContigs = 1; numContigs <= 3; numContigs++ ) {
for ( int numStacksPerContig : Arrays.asList(1, 2, 4) ) {
// Use crucial read shard boundary values as the stack sizes
for ( int stackSize : Arrays.asList(ReadShard.MAX_READS / 2, ReadShard.MAX_READS / 2 + 10, ReadShard.MAX_READS, ReadShard.MAX_READS - 1, ReadShard.MAX_READS + 1, ReadShard.MAX_READS * 2) ) {
for ( int numUnmappedReads : Arrays.asList(0, ReadShard.MAX_READS / 2, ReadShard.MAX_READS * 2) ) {
new SAMDataSourceFillShardBoundaryTest(numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage);
}
}
}
}
return SAMDataSourceFillShardBoundaryTest.getTests(SAMDataSourceFillShardBoundaryTest.class);
}
// TODO: re-enable these tests once the issues with filepointer ordering + the downsamplers are worked out
@Test(dataProvider = "SAMDataSourceFillShardTestDataProvider", enabled = false)
public void testSAMDataSourceFillShard( SAMDataSourceFillShardBoundaryTest test ) {
logger.warn("Running test: " + test);
test.run();
}
// TODO: the legacy tests below should really be replaced with a more comprehensive suite of tests for SAMDataSource
/**
* This function does the setup of our parser, before each method call.
* <p/>

View File

@ -502,6 +502,7 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
return new ReadProperties(
Collections.<SAMReaderID>emptyList(),
new SAMFileHeader(),
SAMFileHeader.SortOrder.coordinate,
false,
SAMFileReader.ValidationStringency.STRICT,
downsamplingMethod,

View File

@ -333,6 +333,7 @@ public class LocusIteratorByStateUnitTest extends BaseTest {
return new ReadProperties(
Collections.<SAMReaderID>emptyList(),
new SAMFileHeader(),
SAMFileHeader.SortOrder.coordinate,
false,
SAMFileReader.ValidationStringency.STRICT,
null,