Initial working version of new ActiveRegionTraversal based on the LocusIteratorByState read stream

-- Implemented as a subclass of TraverseActiveRegions
-- Passes all unit tests
-- Will be very slow -- needs logical fixes
This commit is contained in:
Mark DePristo 2013-01-10 15:18:17 -05:00
parent 8b83f4d6c7
commit 9b2be795a7
10 changed files with 555 additions and 255 deletions

View File

@ -52,7 +52,6 @@ import org.broadinstitute.sting.gatk.refdata.utils.RMDTriplet;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.samples.SampleDB;
import org.broadinstitute.sting.gatk.samples.SampleDBBuilder;
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.classloader.PluginManager;
@ -843,7 +842,7 @@ public class GenomeAnalysisEngine {
if (argCollection.keepProgramRecords)
removeProgramRecords = false;
final boolean keepReadsInLIBS = walker instanceof ActiveRegionWalker && TraverseActiveRegions.KEEP_READS_IN_LIBS;
final boolean keepReadsInLIBS = walker instanceof ActiveRegionWalker && argCollection.newART;
return new SAMDataSource(
samReaderIDs,

View File

@ -448,5 +448,10 @@ public class GATKArgumentCollection {
@Hidden
public boolean generateShadowBCF = false;
// TODO -- remove all code tagged with TODO -- remove me when argument generateShadowBCF is removed
@Hidden
@Argument(fullName="newART", shortName = "newART", doc = "use the new ART traversal", required=false)
public boolean newART = false;
}

View File

@ -31,6 +31,7 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.utils.locusiterator.LocusIterator;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.locusiterator.LocusIteratorByState;
import java.util.Arrays;
import java.util.Collection;
@ -212,4 +213,10 @@ public abstract class LocusView extends LocusIterator implements View {
private boolean isContainedInShard(GenomeLoc location) {
return locus.containsP(location);
}
// TODO -- remove me
@Override
public LocusIteratorByState getLIBS() {
return loci.getLIBS();
}
}

View File

@ -114,7 +114,7 @@ public class LinearMicroScheduler extends MicroScheduler {
}
// Special function call to empty out the work queue. Ugly for now but will be cleaned up when we eventually push this functionality more into the engine
if( traversalEngine instanceof TraverseActiveRegions ) {
if( traversalEngine instanceof TraverseActiveRegions) {
final Object result = ((TraverseActiveRegions) traversalEngine).endTraversal(walker, accumulator.getReduceInit());
accumulator.accumulate(null, result); // Assumes only used with StandardAccumulator
}

View File

@ -245,7 +245,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
} else if (walker instanceof ReadPairWalker) {
return new TraverseReadPairs();
} else if (walker instanceof ActiveRegionWalker) {
return new TraverseActiveRegions();
if ( engine.getArguments().newART ) {
// todo -- create optimized traversal
return new TraverseActiveRegionsOptimized();
} else {
return new TraverseActiveRegionsOriginal();
}
} else {
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
}

View File

@ -104,16 +104,17 @@ public class WindowMaker implements Iterable<WindowMaker.WindowMakerIterator>, I
* @param sampleNames The complete set of sample names in the reads in shard
*/
private final LocusIteratorByState libs;
public WindowMaker(Shard shard, GenomeLocParser genomeLocParser, StingSAMIterator iterator, List<GenomeLoc> intervals, Collection<String> sampleNames) {
this.sourceInfo = shard.getReadProperties();
this.readIterator = iterator;
// Use the legacy version of LocusIteratorByState if legacy downsampling was requested:
this.sourceIterator = sourceInfo.getDownsamplingMethod().useLegacyDownsampler ?
new PeekableIterator<AlignmentContext>(new LegacyLocusIteratorByState(iterator,sourceInfo,genomeLocParser,sampleNames))
:
new PeekableIterator<AlignmentContext>(new LocusIteratorByState(iterator,sourceInfo,genomeLocParser,sampleNames));
libs = ! sourceInfo.getDownsamplingMethod().useLegacyDownsampler ? new LocusIteratorByState(iterator,sourceInfo,genomeLocParser,sampleNames) : null;
this.sourceIterator = sourceInfo.getDownsamplingMethod().useLegacyDownsampler
? new PeekableIterator<AlignmentContext>(new LegacyLocusIteratorByState(iterator,sourceInfo,genomeLocParser,sampleNames))
: new PeekableIterator<AlignmentContext>(libs);
this.intervalIterator = intervals.size()>0 ? new PeekableIterator<GenomeLoc>(intervals.iterator()) : null;
}
@ -209,5 +210,10 @@ public class WindowMaker implements Iterable<WindowMaker.WindowMakerIterator>, I
throw new ReviewedStingException("BUG: filtering locus does not contain, is not before, and is not past the given alignment context");
}
}
@Override
public LocusIteratorByState getLIBS() {
return libs;
}
}
}

View File

@ -39,136 +39,42 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
/**
* Created by IntelliJ IDEA.
* User: rpoplin
* Date: 12/9/11
* Created with IntelliJ IDEA.
* User: depristo
* Date: 1/9/13
* Time: 4:45 PM
* To change this template use File | Settings | File Templates.
*/
public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
// TODO
// TODO -- remove me when ART uses the LIBS traversal
// TODO
public static final boolean KEEP_READS_IN_LIBS = false;
public abstract class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
// set by the tranversal
protected int activeRegionExtension = -1;
protected int maxRegionSize = -1;
/**
* our log, which we want to capture anything from this class
*/
protected final static Logger logger = Logger.getLogger(TraversalEngine.class);
protected final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>();
private final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>();
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
abstract protected T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M, T> walker);
@Override
public String getTraversalUnits() {
return "active regions";
}
@Override
public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider,
T sum) {
logger.debug(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));
final LocusView locusView = new AllLocusView(dataProvider);
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();
int minStart = Integer.MAX_VALUE;
final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>();
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() );
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
// We keep processing while the next reference location is within the interval
GenomeLoc prevLoc = null;
while( locusView.hasNext() ) {
final AlignmentContext locus = locusView.next();
final GenomeLoc location = locus.getLocation();
// Grab all the previously unseen reads from this pileup and add them to the massive read list
// Note that this must occur before we leave because we are outside the intervals because
// reads may occur outside our intervals but overlap them in the future
// TODO -- this whole HashSet logic should be changed to a linked list of reads with
// TODO -- subsequent pass over them to find the ones overlapping the active regions
for( final PileupElement p : locus.getBasePileup() ) {
final GATKSAMRecord read = p.getRead();
if( !myReads.contains(read) ) {
myReads.add(read);
}
// If this is the last pileup for this shard calculate the minimum alignment start so that we know
// which active regions in the work queue are now safe to process
minStart = Math.min(minStart, read.getAlignmentStart());
}
// skip this location -- it's not part of our engine intervals
if ( outsideEngineIntervals(location) )
continue;
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
dataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
}
updateCumulativeMetrics(dataProvider.getShard());
if ( ! profile.isEmpty() )
incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
// add active regions to queue of regions to process
// first check if can merge active regions over shard boundaries
if( !activeRegions.isEmpty() ) {
if( !workQueue.isEmpty() ) {
final ActiveRegion last = workQueue.getLast();
final ActiveRegion first = activeRegions.get(0);
if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) {
workQueue.removeLast();
activeRegions.remove(first);
workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) );
}
}
workQueue.addAll( activeRegions );
}
logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." );
// now go and process all of the active regions
sum = processActiveRegions(walker, sum, minStart, dataProvider.getLocus().getContig());
return sum;
}
/**
* Is the loc outside of the intervals being requested for processing by the GATK?
* @param loc
* @return
*/
private boolean outsideEngineIntervals(final GenomeLoc loc) {
protected boolean outsideEngineIntervals(final GenomeLoc loc) {
return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc);
}
@ -183,10 +89,10 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
* @param maxRegionSize
* @return
*/
private ActivityProfile incorporateActiveRegions(final ActivityProfile profile,
final List<ActiveRegion> activeRegions,
final int activeRegionExtension,
final int maxRegionSize) {
protected ActivityProfile incorporateActiveRegions(final ActivityProfile profile,
final List<ActiveRegion> activeRegions,
final int activeRegionExtension,
final int maxRegionSize) {
if ( profile.isEmpty() )
throw new IllegalStateException("trying to incorporate an empty active profile " + profile);
@ -195,16 +101,9 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
return new ActivityProfile( engine.getGenomeLocParser(), profile.hasPresetRegions() );
}
// --------------------------------------------------------------------------------
//
// simple utility functions
//
// --------------------------------------------------------------------------------
private final ActivityProfileResult walkerActiveProb(final ActiveRegionWalker<M,T> walker,
final RefMetaDataTracker tracker, final ReferenceContext refContext,
final AlignmentContext locus, final GenomeLoc location) {
protected final ActivityProfileResult walkerActiveProb(final ActiveRegionWalker<M, T> walker,
final RefMetaDataTracker tracker, final ReferenceContext refContext,
final AlignmentContext locus, final GenomeLoc location) {
if ( walker.hasPresetActiveRegions() ) {
return new ActivityProfileResult(location, walker.presetActiveRegions.overlaps(location) ? 1.0 : 0.0);
} else {
@ -212,27 +111,21 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
}
}
private ReferenceOrderedView getReferenceOrderedView( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider,
final LocusView locusView) {
protected ReferenceOrderedView getReferenceOrderedView(final ActiveRegionWalker<M, T> walker,
final LocusShardDataProvider dataProvider,
final LocusView locusView) {
if ( WalkerManager.getWalkerDataSource(walker) != DataSource.REFERENCE_ORDERED_DATA )
return new ManagingReferenceOrderedView( dataProvider );
else
return (RodLocusView)locusView;
}
// --------------------------------------------------------------------------------
//
// code to handle processing active regions
//
// --------------------------------------------------------------------------------
private T processActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) {
protected T processActiveRegions(final ActiveRegionWalker<M, T> walker, T sum, final boolean forceRegionsToBeActive) {
if( walker.activeRegionOutStream != null ) {
writeActiveRegionsToStream(walker);
return sum;
} else {
return callWalkerMapOnActiveRegions(walker, sum, minStart, currentContig);
return callWalkerMapOnActiveRegions(walker, sum, forceRegionsToBeActive);
}
}
@ -241,7 +134,7 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
*
* @param walker
*/
private void writeActiveRegionsToStream( final ActiveRegionWalker<M,T> walker ) {
private void writeActiveRegionsToStream( final ActiveRegionWalker<M, T> walker ) {
// Just want to output the active regions to a file, not actually process them
for( final ActiveRegion activeRegion : workQueue ) {
if( activeRegion.isActive ) {
@ -250,13 +143,36 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
}
}
private T callWalkerMapOnActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) {
private GenomeLoc startOfLiveRegion = null;
protected void notifyOfCurrentPosition(final GATKSAMRecord read) {
notifyOfCurrentPosition(engine.getGenomeLocParser().createGenomeLoc(read));
}
protected void notifyOfCurrentPosition(final GenomeLoc currentLocation) {
if ( startOfLiveRegion == null )
startOfLiveRegion = currentLocation;
else
startOfLiveRegion = startOfLiveRegion.max(currentLocation.getStartLocation());
}
protected GenomeLoc getStartOfLiveRegion() {
return startOfLiveRegion;
}
protected boolean regionCompletelyWithinDeadZone(final GenomeLoc region, final boolean includeExtension) {
return (region.getStop() < (getStartOfLiveRegion().getStart() - (includeExtension ? activeRegionExtension : 0)))
|| ! region.onSameContig(getStartOfLiveRegion());
}
private T callWalkerMapOnActiveRegions(final ActiveRegionWalker<M, T> walker, T sum, final boolean forceRegionsToBeActive) {
// Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them
// TODO can implement parallel traversal here
while( workQueue.peek() != null ) {
final GenomeLoc extendedLoc = workQueue.peek().getExtendedLoc();
if ( extendedLoc.getStop() < minStart || (currentContig != null && !workQueue.peek().getExtendedLoc().getContig().equals(currentContig))) {
if ( forceRegionsToBeActive || regionCompletelyWithinDeadZone(extendedLoc, false) ) {
final ActiveRegion activeRegion = workQueue.remove();
logger.warn("Processing active region " + activeRegion + " dead zone " + getStartOfLiveRegion());
sum = processActiveRegion( activeRegion, sum, walker );
} else {
break;
@ -266,61 +182,23 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
return sum;
}
private T processActiveRegion( final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M,T> walker ) {
final ArrayList<GATKSAMRecord> placedReads = new ArrayList<GATKSAMRecord>();
for( final GATKSAMRecord read : myReads ) {
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );
if( activeRegion.getLocation().overlapsP( readLoc ) ) {
// The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region)
long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc );
ActiveRegion bestRegion = activeRegion;
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) {
maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc );
bestRegion = otherRegionToTest;
}
}
bestRegion.add( read );
// The read is also added to all other regions in which it overlaps but marked as non-primary
if( walker.wantsNonPrimaryReads() ) {
if( !bestRegion.equals(activeRegion) ) {
activeRegion.add( read );
}
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( !bestRegion.equals(otherRegionToTest) ) {
// check for non-primary vs. extended
if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
}
}
}
}
placedReads.add( read );
// check for non-primary vs. extended
} else if( activeRegion.getLocation().overlapsP( readLoc ) ) {
if ( walker.wantsNonPrimaryReads() ) {
activeRegion.add( read );
}
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
}
myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region
// WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way.
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc());
final M x = walker.map( activeRegion, null );
return walker.reduce( x, sum );
}
/**
* Special function called in LinearMicroScheduler to empty out the work queue.
* Ugly for now but will be cleaned up when we push this functionality more into the engine
*/
public T endTraversal( final Walker<M,T> walker, T sum) {
return processActiveRegions((ActiveRegionWalker<M,T>)walker, sum, Integer.MAX_VALUE, null);
public T endTraversal(final Walker<M, T> walker, T sum) {
return processActiveRegions((ActiveRegionWalker<M, T>)walker, sum, true);
}
protected ActiveRegion getBestRegion(final ActiveRegion activeRegion, final GenomeLoc readLoc) {
ActiveRegion bestRegion = activeRegion;
long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc );
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) {
maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc );
bestRegion = otherRegionToTest;
}
}
return bestRegion;
}
}

View File

@ -0,0 +1,194 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.*;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
/**
* Created by IntelliJ IDEA.
* User: rpoplin
* Date: 12/9/11
*/
public class TraverseActiveRegionsOptimized<M,T> extends TraverseActiveRegions<M,T> {
private LinkedList<GATKSAMRecord> myReads = new LinkedList<GATKSAMRecord>();
@Override
public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider,
T sum) {
logger.debug(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));
final LocusView locusView = new AllLocusView(dataProvider);
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();
final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>();
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() );
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
// We keep processing while the next reference location is within the interval
GenomeLoc prevLoc = null;
while( locusView.hasNext() ) {
final AlignmentContext locus = locusView.next();
final GenomeLoc location = locus.getLocation();
// Grab all the previously unseen reads from this pileup and add them to the massive read list
// Note that this must occur before we leave because we are outside the intervals because
// reads may occur outside our intervals but overlap them in the future
final Collection<SAMRecord> reads = locusView.getLIBS().transferReadsFromAllPreviousPileups();
for( final SAMRecord read : reads ) {
notifyOfCurrentPosition((GATKSAMRecord)read);
myReads.add((GATKSAMRecord)read);
}
// skip this location -- it's not part of our engine intervals
if ( outsideEngineIntervals(location) )
continue;
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
dataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
}
updateCumulativeMetrics(dataProvider.getShard());
if ( ! profile.isEmpty() )
incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
// add active regions to queue of regions to process
// first check if can merge active regions over shard boundaries
if( !activeRegions.isEmpty() ) {
if( !workQueue.isEmpty() ) {
final ActiveRegion last = workQueue.getLast();
final ActiveRegion first = activeRegions.get(0);
if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) {
workQueue.removeLast();
activeRegions.remove(first);
workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) );
}
}
workQueue.addAll( activeRegions );
}
logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." );
// now go and process all of the active regions
sum = processActiveRegions(walker, sum, false);
return sum;
}
@Override
public String toString() {
return "TraverseActiveRegionsOptimized";
}
// TODO -- remove me when we fix the traversal
private final void addToRegion(final ActiveRegion region, final GATKSAMRecord read) {
if ( ! region.getReads().contains(read) )
region.add(read);
}
@Override
protected T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M, T> walker) {
final Iterator<GATKSAMRecord> liveReads = myReads.iterator();
while ( liveReads.hasNext() ) {
final GATKSAMRecord read = liveReads.next();
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );
if( activeRegion.getLocation().overlapsP( readLoc ) ) {
// TODO -- this test assumes that we've successfully defined all regions that might be
// TODO -- the primary home for read. Doesn't seem safe to me
// The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region)
final ActiveRegion bestRegion = getBestRegion(activeRegion, readLoc);
addToRegion(bestRegion, read);
// The read is also added to all other regions in which it overlaps but marked as non-primary
if( walker.wantsNonPrimaryReads() ) {
if( !bestRegion.equals(activeRegion) ) {
addToRegion(activeRegion, read);
}
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( !bestRegion.equals(otherRegionToTest) ) {
// check for non-primary vs. extended
if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) {
addToRegion(otherRegionToTest, read);
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) {
addToRegion(otherRegionToTest, read);
}
}
}
}
// check for non-primary vs. extended
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
if ( regionCompletelyWithinDeadZone(readLoc, true) ) {
logger.info("Removing read " + read.getReadName() + " at " + readLoc + " with dead zone start " + getStartOfLiveRegion());
liveReads.remove();
}
}
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc());
final M x = walker.map(activeRegion, null);
return walker.reduce( x, sum );
}
}

View File

@ -0,0 +1,177 @@
package org.broadinstitute.sting.gatk.traversals;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.*;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
/**
* Created by IntelliJ IDEA.
* User: rpoplin
* Date: 12/9/11
*/
public class TraverseActiveRegionsOriginal<M,T> extends TraverseActiveRegions<M,T> {
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
protected Collection<GATKSAMRecord> getReadsInCurrentRegion() {
return myReads;
}
protected void removeReadsFromCurrentRegion(final List<GATKSAMRecord> placedReads) {
myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region
}
@Override
public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider,
T sum) {
logger.debug(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));
final LocusView locusView = new AllLocusView(dataProvider);
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();
int minStart = Integer.MAX_VALUE;
final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>();
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() );
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
// We keep processing while the next reference location is within the interval
GenomeLoc prevLoc = null;
while( locusView.hasNext() ) {
final AlignmentContext locus = locusView.next();
final GenomeLoc location = locus.getLocation();
// Grab all the previously unseen reads from this pileup and add them to the massive read list
// Note that this must occur before we leave because we are outside the intervals because
// reads may occur outside our intervals but overlap them in the future
// TODO -- this whole HashSet logic should be changed to a linked list of reads with
// TODO -- subsequent pass over them to find the ones overlapping the active regions
for( final PileupElement p : locus.getBasePileup() ) {
final GATKSAMRecord read = p.getRead();
if( !myReads.contains(read) ) {
myReads.add(read);
}
// If this is the last pileup for this shard calculate the minimum alignment start so that we know
// which active regions in the work queue are now safe to process
minStart = Math.min(minStart, read.getAlignmentStart());
}
// skip this location -- it's not part of our engine intervals
if ( outsideEngineIntervals(location) )
continue;
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
dataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
}
updateCumulativeMetrics(dataProvider.getShard());
if ( ! profile.isEmpty() )
incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
// add active regions to queue of regions to process
// first check if can merge active regions over shard boundaries
if( !activeRegions.isEmpty() ) {
if( !workQueue.isEmpty() ) {
final ActiveRegion last = workQueue.getLast();
final ActiveRegion first = activeRegions.get(0);
if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) {
workQueue.removeLast();
activeRegions.remove(first);
workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) );
}
}
workQueue.addAll( activeRegions );
}
logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." );
// set the dead zone to the min. This is incorrect but necessary because of the way we handle things in processActiveRegion
notifyOfCurrentPosition(engine.getGenomeLocParser().createGenomeLoc(dataProvider.getLocus().getContig(), minStart));
// now go and process all of the active regions
sum = processActiveRegions(walker, sum, false);
return sum;
}
@Override
public String toString() {
return "TraverseActiveRegionsOriginal";
}
@Override
protected T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M, T> walker) {
final ArrayList<GATKSAMRecord> placedReads = new ArrayList<GATKSAMRecord>();
for( final GATKSAMRecord read : getReadsInCurrentRegion() ) {
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );
if( activeRegion.getLocation().overlapsP( readLoc ) ) {
// The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region)
final ActiveRegion bestRegion = getBestRegion(activeRegion, readLoc);
bestRegion.add( read );
// The read is also added to all other regions in which it overlaps but marked as non-primary
if( walker.wantsNonPrimaryReads() ) {
if( !bestRegion.equals(activeRegion) ) {
activeRegion.add( read );
}
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( !bestRegion.equals(otherRegionToTest) ) {
// check for non-primary vs. extended
if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
}
}
}
}
placedReads.add( read );
// check for non-primary vs. extended
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
}
removeReadsFromCurrentRegion(placedReads);
// WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way.
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc());
final M x = walker.map(activeRegion, null);
return walker.reduce( x, sum );
}
}

View File

@ -1,34 +1,38 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import com.google.java.contract.PreconditionError;
import net.sf.samtools.*;
import org.broadinstitute.sting.commandline.Tags;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.datasources.reads.*;
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
import org.broadinstitute.sting.gatk.filters.ReadFilter;
import org.broadinstitute.sting.gatk.iterators.ReadTransformer;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.activeregion.ActiveRegionReadState;
@ -54,6 +58,7 @@ import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@ -71,6 +76,10 @@ import java.util.*;
* http://iwww.broadinstitute.org/gsa/wiki/index.php/Active_Region_Traversal_Contract
*/
public class TraverseActiveRegionsUnitTest extends BaseTest {
private final static boolean INCLUDE_OLD = false;
private final static boolean INCLUDE_NEW = true;
private final static boolean ENFORCE_CONTRACTS = false;
private final static boolean DEBUG = false;
private class DummyActiveRegionWalker extends ActiveRegionWalker<Integer, Integer> {
private final double prob;
@ -120,7 +129,13 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
}
}
private final TraverseActiveRegions<Integer, Integer> t = new TraverseActiveRegions<Integer, Integer>();
@DataProvider(name = "TraversalEngineProvider")
public Object[][] makeTraversals() {
final List<Object[]> traversals = new LinkedList<Object[]>();
if ( INCLUDE_OLD ) traversals.add(new Object[]{new TraverseActiveRegionsOriginal<Integer, Integer>()});
if ( INCLUDE_NEW ) traversals.add(new Object[]{new TraverseActiveRegionsOptimized<Integer, Integer>()});
return traversals.toArray(new Object[][]{});
}
private IndexedFastaSequenceFile reference;
private SAMSequenceDictionary dictionary;
@ -187,18 +202,18 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
out.close();
}
@Test
public void testAllBasesSeen() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testAllBasesSeen(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker();
List<GenomeLoc> activeIntervals = getIsActiveIntervals(walker, intervals);
List<GenomeLoc> activeIntervals = getIsActiveIntervals(t, walker, intervals);
// Contract: Every genome position in the analysis interval(s) is processed by the walker's isActive() call
verifyEqualIntervals(intervals, activeIntervals);
}
private List<GenomeLoc> getIsActiveIntervals(DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
private List<GenomeLoc> getIsActiveIntervals(final TraverseActiveRegions t, DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
List<GenomeLoc> activeIntervals = new ArrayList<GenomeLoc>();
for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) {
for (LocusShardDataProvider dataProvider : createDataProviders(t, intervals, testBAM)) {
t.traverse(walker, dataProvider, 0);
activeIntervals.addAll(walker.isActiveCalls);
}
@ -206,23 +221,23 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
return activeIntervals;
}
@Test (expectedExceptions = PreconditionError.class)
public void testIsActiveRangeLow () {
@Test (enabled = ENFORCE_CONTRACTS, dataProvider = "TraversalEngineProvider", expectedExceptions = PreconditionError.class)
public void testIsActiveRangeLow (TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker(-0.1);
getActiveRegions(walker, intervals).values();
getActiveRegions(t, walker, intervals).values();
}
@Test (expectedExceptions = PreconditionError.class)
public void testIsActiveRangeHigh () {
@Test (enabled = ENFORCE_CONTRACTS, dataProvider = "TraversalEngineProvider", expectedExceptions = PreconditionError.class)
public void testIsActiveRangeHigh (TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker(1.1);
getActiveRegions(walker, intervals).values();
getActiveRegions(t, walker, intervals).values();
}
@Test
public void testActiveRegionCoverage() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testActiveRegionCoverage(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker();
Collection<ActiveRegion> activeRegions = getActiveRegions(walker, intervals).values();
Collection<ActiveRegion> activeRegions = getActiveRegions(t, walker, intervals).values();
verifyActiveRegionCoverage(intervals, activeRegions);
}
@ -268,11 +283,11 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
Assert.assertEquals(intervalStops.size(), 0, "Interval stop location does not match an active region stop location");
}
@Test
public void testActiveRegionExtensionOnContig() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testActiveRegionExtensionOnContig(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker();
Collection<ActiveRegion> activeRegions = getActiveRegions(walker, intervals).values();
Collection<ActiveRegion> activeRegions = getActiveRegions(t, walker, intervals).values();
for (ActiveRegion activeRegion : activeRegions) {
GenomeLoc loc = activeRegion.getExtendedLoc();
@ -283,8 +298,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
}
}
@Test
public void testPrimaryReadMapping() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testPrimaryReadMapping(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker();
// Contract: Each read has the Primary state in a single region (or none)
@ -304,7 +319,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
// shard_boundary_equal: Non-Primary in 1:14908-16384, Primary in 1:16385-16927
// simple20: Primary in 20:10000-10100
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(walker, intervals);
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(t, walker, intervals);
ActiveRegion region;
region = activeRegions.get(genomeLocParser.createGenomeLoc("1", 1, 999));
@ -326,8 +341,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
verifyReadMapping(region, "simple20");
}
@Test
public void testNonPrimaryReadMapping() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testNonPrimaryReadMapping(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker(
EnumSet.of(ActiveRegionReadState.PRIMARY, ActiveRegionReadState.NONPRIMARY));
@ -350,7 +365,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
// shard_boundary_equal: Non-Primary in 1:14908-16384, Primary in 1:16385-16927
// simple20: Primary in 20:10000-10100
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(walker, intervals);
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(t, walker, intervals);
ActiveRegion region;
region = activeRegions.get(genomeLocParser.createGenomeLoc("1", 1, 999));
@ -372,8 +387,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
verifyReadMapping(region, "simple20");
}
@Test
public void testExtendedReadMapping() {
@Test(enabled = true, dataProvider = "TraversalEngineProvider")
public void testExtendedReadMapping(TraverseActiveRegions t) {
DummyActiveRegionWalker walker = new DummyActiveRegionWalker(
EnumSet.of(ActiveRegionReadState.PRIMARY, ActiveRegionReadState.NONPRIMARY, ActiveRegionReadState.EXTENDED));
@ -397,7 +412,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
// shard_boundary_equal: Non-Primary in 1:14908-16384, Primary in 1:16385-16927
// simple20: Primary in 20:10000-10100
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(walker, intervals);
Map<GenomeLoc, ActiveRegion> activeRegions = getActiveRegions(t, walker, intervals);
ActiveRegion region;
region = activeRegions.get(genomeLocParser.createGenomeLoc("1", 1, 999));
@ -419,24 +434,30 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
verifyReadMapping(region, "simple20");
}
@Test
public void testUnmappedReads() {
@Test(enabled = true && ! DEBUG, dataProvider = "TraversalEngineProvider")
public void testUnmappedReads(TraverseActiveRegions t) {
// TODO
}
private void verifyReadMapping(ActiveRegion region, String... reads) {
final Set<String> regionReads = new HashSet<String>();
for (SAMRecord read : region.getReads()) {
Assert.assertFalse(regionReads.contains(read.getReadName()), "Duplicate reads detected in region " + region + " read " + read.getReadName());
regionReads.add(read.getReadName());
}
Collection<String> wantReads = new ArrayList<String>(Arrays.asList(reads));
for (SAMRecord read : region.getReads()) {
String regionReadName = read.getReadName();
Assert.assertTrue(wantReads.contains(regionReadName), "Read " + regionReadName + " assigned to active region " + region);
Assert.assertTrue(wantReads.contains(regionReadName), "Read " + regionReadName + " incorrectly assigned to active region " + region);
wantReads.remove(regionReadName);
}
Assert.assertTrue(wantReads.isEmpty(), "Reads missing in active region " + region);
Assert.assertTrue(wantReads.isEmpty(), "Reads missing in active region " + region + ", wanted " + (wantReads.isEmpty() ? "" : wantReads.iterator().next()));
}
private Map<GenomeLoc, ActiveRegion> getActiveRegions(DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM))
private Map<GenomeLoc, ActiveRegion> getActiveRegions(TraverseActiveRegions t, DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
for (LocusShardDataProvider dataProvider : createDataProviders(t, intervals, testBAM))
t.traverse(walker, dataProvider, 0);
t.endTraversal(walker, 0);
@ -500,7 +521,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
return record;
}
private List<LocusShardDataProvider> createDataProviders(List<GenomeLoc> intervals, String bamFile) {
private List<LocusShardDataProvider> createDataProviders(TraverseActiveRegions t, List<GenomeLoc> intervals, String bamFile) {
GenomeAnalysisEngine engine = new GenomeAnalysisEngine();
engine.setGenomeLocParser(genomeLocParser);
t.initialize(engine);
@ -509,7 +530,15 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
SAMReaderID readerID = new SAMReaderID(new File(bamFile), new Tags());
samFiles.add(readerID);
SAMDataSource dataSource = new SAMDataSource(samFiles, new ThreadAllocation(), null, genomeLocParser);
SAMDataSource dataSource = new SAMDataSource(samFiles, new ThreadAllocation(), null, genomeLocParser,
false,
SAMFileReader.ValidationStringency.STRICT,
null,
null,
new ValidationExclusion(),
new ArrayList<ReadFilter>(),
new ArrayList<ReadTransformer>(),
false, (byte)30, false, t instanceof TraverseActiveRegionsOptimized);
List<LocusShardDataProvider> providers = new ArrayList<LocusShardDataProvider>();
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {