Initial updates for ActiveRegionShard

This commit is contained in:
Joel Thibault 2013-01-03 15:49:40 -05:00
parent e7553545ef
commit 319d651e4a
10 changed files with 298 additions and 114 deletions

View File

@ -588,7 +588,10 @@ public class GenomeAnalysisEngine {
else else
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), readShardBalancer); return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), readShardBalancer);
case ACTIVEREGIONSHARD: case ACTIVEREGIONSHARD:
throw new UserException.CommandLineException("Not implemented."); if(intervals == null)
return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new ActiveRegionShardBalancer());
else
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new ActiveRegionShardBalancer());
default: default:
throw new UserException.CommandLineException("Invalid active region shard type."); throw new UserException.CommandLineException("Invalid active region shard type.");
} }

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.providers;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.ReadProperties;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.iterators.LocusIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import java.util.Collection;
/**
* @author Joel Thibault
*/
public class ActiveRegionShardDataProvider extends ShardDataProvider {
final private ReadShardDataProvider readProvider;
final private LocusShardDataProvider locusProvider;
public ActiveRegionShardDataProvider(Shard shard, ReadProperties sourceInfo, GenomeLocParser genomeLocParser, StingSAMIterator reads, GenomeLoc locus, LocusIterator locusIterator, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods) {
super(shard, genomeLocParser, reference, rods); // TODO: necessary?
readProvider = new ReadShardDataProvider(shard, genomeLocParser, reads, reference, rods);
locusProvider = new LocusShardDataProvider(shard, sourceInfo, genomeLocParser, locus, locusIterator, reference, rods);
}
public ReadShardDataProvider getReadShardDataProvider() {
return readProvider;
}
public LocusShardDataProvider getLocusShardDataProvider(LocusIterator iterator) {
return locusProvider;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.SAMFileSpan;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import java.util.List;
import java.util.Map;
/**
* @author Joel Thibault
*/
public class ActiveRegionShard extends ReadShard {
public ActiveRegionShard(GenomeLocParser parser, SAMDataSource readsDataSource, Map<SAMReaderID,SAMFileSpan> fileSpans, List<GenomeLoc> loci, boolean isUnmapped) {
super(parser, readsDataSource, fileSpans, loci, isUnmapped);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
/**
* @author Joel Thibault
*/
public class ActiveRegionShardBalancer extends ReadShardBalancer {
// TODO ?
}

View File

@ -40,7 +40,9 @@ import java.util.Map;
*/ */
public abstract class Shard implements HasGenomeLocation { public abstract class Shard implements HasGenomeLocation {
public enum ShardType { public enum ShardType {
READ, LOCUS READ,
LOCUS,
ACTIVEREGION // Used only by ExperimentalActiveRegionShardTraverseActiveRegions
} }
protected final GenomeLocParser parser; // incredibly annoying! protected final GenomeLocParser parser; // incredibly annoying!

View File

@ -2,6 +2,7 @@ package org.broadinstitute.sting.gatk.executive;
import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.datasources.providers.ActiveRegionShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
@ -80,6 +81,18 @@ public class LinearMicroScheduler extends MicroScheduler {
} }
windowMaker.close(); windowMaker.close();
} }
else if(shard.getShardType() == Shard.ShardType.ACTIVEREGION) {
WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(),
getReadIterator(shard), shard.getGenomeLocs(), SampleUtils.getSAMFileSamples(engine));
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
ShardDataProvider dataProvider = new ActiveRegionShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),getReadIterator(shard),iterator.getLocus(),iterator,reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
if ( walker.isDone() ) break;
}
windowMaker.close();
}
else { else {
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());

View File

@ -1,32 +1,35 @@
package org.broadinstitute.sting.gatk.traversals; package org.broadinstitute.sting.gatk.traversals;
import net.sf.samtools.SAMFileHeader;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.WalkerManager; import org.broadinstitute.sting.gatk.WalkerManager;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.*; import org.broadinstitute.sting.gatk.datasources.providers.*;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.executive.WindowMaker;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension; import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.SampleUtils;
import org.broadinstitute.sting.utils.activeregion.ActiveRegion; import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
import org.broadinstitute.sting.utils.activeregion.ActivityProfile; import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult; import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*; import java.util.*;
public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> { public class ExperimentalActiveRegionShardTraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,ActiveRegionShardDataProvider> {
/** /**
* our log, which we want to capture anything from this class * our log, which we want to capture anything from this class
*/ */
protected final static Logger logger = Logger.getLogger(TraversalEngine.class); protected final static Logger logger = Logger.getLogger(TraversalEngine.class);
private final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>(); private final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>();
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>(); private final LinkedList<GATKSAMRecord> myReads = new LinkedList<GATKSAMRecord>();
@Override @Override
public String getTraversalUnits() { public String getTraversalUnits() {
@ -35,71 +38,65 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
@Override @Override
public T traverse( final ActiveRegionWalker<M,T> walker, public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider, final ActiveRegionShardDataProvider dataProvider,
T sum) { T sum) {
logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); logger.debug(String.format("ExperimentalActiveRegionShardTraverseActiveRegions.traverse: Shard is %s", dataProvider));
final LocusView locusView = new AllLocusView(dataProvider); ReadShardDataProvider readDataProvider = dataProvider.getReadShardDataProvider();
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension(); final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion(); final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();
int minStart = Integer.MAX_VALUE; final ReadView readView = new ReadView(readDataProvider);
final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>(); final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>();
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() ); ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions());
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView); Shard readShard = readDataProvider.getShard();
SAMFileHeader header = readShard.getReadProperties().getHeader();
WindowMaker windowMaker = new WindowMaker(readShard, engine.getGenomeLocParser(),
readView.iterator(), readShard.getGenomeLocs(), SampleUtils.getSAMFileSamples(header));
// We keep processing while the next reference location is within the interval for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
GenomeLoc prevLoc = null; LocusShardDataProvider locusDataProvider = dataProvider.getLocusShardDataProvider(iterator);
while( locusView.hasNext() ) { final LocusView locusView = new AllLocusView(locusDataProvider);
final AlignmentContext locus = locusView.next(); final LocusReferenceView referenceView = new LocusReferenceView( walker, locusDataProvider );
final GenomeLoc location = locus.getLocation(); ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, locusDataProvider, locusView);
// Grab all the previously unseen reads from this pileup and add them to the massive read list // We keep processing while the next reference location is within the interval
// Note that this must occur before we leave because we are outside the intervals because GenomeLoc prevLoc = null;
// reads may occur outside our intervals but overlap them in the future while( locusView.hasNext() ) {
// TODO -- this whole HashSet logic should be changed to a linked list of reads with final AlignmentContext locus = locusView.next();
// TODO -- subsequent pass over them to find the ones overlapping the active regions final GenomeLoc location = locus.getLocation();
for( final PileupElement p : locus.getBasePileup() ) {
final GATKSAMRecord read = p.getRead(); if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
if( !myReads.contains(read) ) { // we've move across some interval boundary, restart profile
myReads.add(read); profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
} }
// If this is the last pileup for this shard calculate the minimum alignment start so that we know readDataProvider.getShard().getReadMetrics().incrementNumIterations();
// which active regions in the work queue are now safe to process
minStart = Math.min(minStart, read.getAlignmentStart()); // create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
} }
// skip this location -- it's not part of our engine intervals locusDataProvider.close();
if ( outsideEngineIntervals(location) )
continue;
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
dataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
} }
updateCumulativeMetrics(dataProvider.getShard()); windowMaker.close();
updateCumulativeMetrics(readDataProvider.getShard());
if ( ! profile.isEmpty() ) if ( ! profile.isEmpty() )
incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
@ -113,7 +110,7 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) { if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) {
workQueue.removeLast(); workQueue.removeLast();
activeRegions.remove(first); activeRegions.remove(first);
workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) ); workQueue.addLast(new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension));
} }
} }
workQueue.addAll( activeRegions ); workQueue.addAll( activeRegions );
@ -121,21 +118,13 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." ); logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." );
// now go and process all of the active regions // now process the active regions, where possible
sum = processActiveRegions(walker, sum, minStart, dataProvider.getLocus().getContig()); boolean emptyQueue = false;
sum = processActiveRegions(walker, sum, emptyQueue);
return sum; return sum;
} }
/**
* Is the loc outside of the intervals being requested for processing by the GATK?
* @param loc
* @return
*/
private boolean outsideEngineIntervals(final GenomeLoc loc) {
return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc);
}
/** /**
* Take the individual isActive calls and integrate them into contiguous active regions and * Take the individual isActive calls and integrate them into contiguous active regions and
* add these blocks of work to the work queue * add these blocks of work to the work queue
@ -191,12 +180,12 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
// //
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
private T processActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) { private T processActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, boolean emptyQueue ) {
if( walker.activeRegionOutStream != null ) { if( walker.activeRegionOutStream != null ) {
writeActiveRegionsToStream(walker); writeActiveRegionsToStream(walker);
return sum; return sum;
} else { } else {
return callWalkerMapOnActiveRegions(walker, sum, minStart, currentContig); return callWalkerMapOnActiveRegions(walker, sum, emptyQueue);
} }
} }
@ -214,70 +203,99 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
} }
} }
private T callWalkerMapOnActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) { private T callWalkerMapOnActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, boolean emptyQueue ) {
// Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them final int lastRegionStart = workQueue.getLast().getLocation().getStart();
final String lastRegionContig = workQueue.getLast().getLocation().getContig();
// If we've traversed sufficiently past the beginning of the workQueue we can unload those regions and process them
// TODO can implement parallel traversal here // TODO can implement parallel traversal here
while( workQueue.peek() != null ) { while( workQueue.peekFirst() != null ) {
final GenomeLoc extendedLoc = workQueue.peek().getExtendedLoc(); ActiveRegion firstRegion = workQueue.getFirst();
if ( extendedLoc.getStop() < minStart || (currentContig != null && !workQueue.peek().getExtendedLoc().getContig().equals(currentContig))) { final String firstRegionContig = firstRegion.getLocation().getContig();
final ActiveRegion activeRegion = workQueue.remove(); if (emptyQueue || firstRegionContig != lastRegionContig) {
sum = processActiveRegion(activeRegion, sum, walker); sum = processFirstActiveRegion(sum, walker);
} else { }
break; else {
final int firstRegionMaxReadStop = walker.wantsExtendedReads() ? firstRegion.getMaxReadStop() : firstRegion.getExtendedMaxReadStop();
if (lastRegionStart > firstRegionMaxReadStop) {
sum = processFirstActiveRegion( sum, walker );
}
else {
break;
}
} }
} }
return sum; return sum;
} }
private T processActiveRegion( final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M,T> walker ) { /**
final ArrayList<GATKSAMRecord> placedReads = new ArrayList<GATKSAMRecord>(); * Process the first active region and all remaining reads which overlap
for( final GATKSAMRecord read : myReads ) { *
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read ); * Remove the first active region from the queue
if( activeRegion.getLocation().overlapsP( readLoc ) ) { * (NB: some reads associated with this active region may have already been processed)
*
* Remove all of these reads from the queue
* (NB: some may be associated with other active regions)
*
* @param sum
* @param walker
* @return
*/
private T processFirstActiveRegion( final T sum, final ActiveRegionWalker<M,T> walker ) {
final ActiveRegion firstRegion = workQueue.removeFirst();
GATKSAMRecord firstRead = myReads.peekFirst(); // don't remove because it may not be placed here
GenomeLoc firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead );
while ( firstRegion.getLocation().overlapsP( firstReadLoc ) ||
(walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc ))) {
if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) {
// The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region) // The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region)
long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc ); long maxOverlap = firstRegion.getLocation().sizeOfOverlap( firstReadLoc );
ActiveRegion bestRegion = activeRegion; ActiveRegion bestRegion = firstRegion;
for( final ActiveRegion otherRegionToTest : workQueue ) { for( final ActiveRegion otherRegionToTest : workQueue ) {
if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) { if( otherRegionToTest.getLocation().sizeOfOverlap(firstReadLoc) >= maxOverlap ) {
maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc ); maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( firstReadLoc );
bestRegion = otherRegionToTest; bestRegion = otherRegionToTest;
} }
} }
bestRegion.add( read ); bestRegion.add( firstRead );
// The read is also added to all other regions in which it overlaps but marked as non-primary // The read is also added to all other regions in which it overlaps but marked as non-primary
if( walker.wantsNonPrimaryReads() ) { if( walker.wantsNonPrimaryReads() ) {
if( !bestRegion.equals(activeRegion) ) { if( !bestRegion.equals(firstRegion) ) {
activeRegion.add( read ); firstRegion.add(firstRead);
} }
for( final ActiveRegion otherRegionToTest : workQueue ) { for( final ActiveRegion otherRegionToTest : workQueue ) {
if( !bestRegion.equals(otherRegionToTest) ) { if( !bestRegion.equals(otherRegionToTest) ) {
// check for non-primary vs. extended // check for non-primary vs. extended
if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) { if ( otherRegionToTest.getLocation().overlapsP( firstReadLoc ) ) {
otherRegionToTest.add( read ); otherRegionToTest.add( firstRead );
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) { } else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( firstReadLoc ) ) {
otherRegionToTest.add( read ); otherRegionToTest.add( firstRead );
} }
} }
} }
} }
placedReads.add( read );
// check for non-primary vs. extended
} else if( activeRegion.getLocation().overlapsP( readLoc ) ) {
if ( walker.wantsNonPrimaryReads() ) {
activeRegion.add( read );
}
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
}
myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region
// WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way.
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc()); // check for non-primary vs. extended
final M x = walker.map(activeRegion, null); } else if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) {
return walker.reduce( x, sum ); if ( walker.wantsNonPrimaryReads() ) {
firstRegion.add( firstRead );
}
} else if( walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc )) {
firstRegion.add( firstRead );
}
myReads.removeFirst();
firstRead = myReads.peekFirst();
firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead );
}
logger.debug(">> Map call with " + firstRegion.getReads().size() + " " + (firstRegion.isActive ? "active" : "inactive") + " reads @ " + firstRegion.getLocation() + " with full extent: " + firstRegion.getReferenceLoc());
final M x = walker.map( firstRegion, null );
return walker.reduce(x, sum);
} }
/** /**
@ -285,6 +303,7 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions<M,T> extends Tra
* Ugly for now but will be cleaned up when we push this functionality more into the engine * Ugly for now but will be cleaned up when we push this functionality more into the engine
*/ */
public T endTraversal( final Walker<M,T> walker, T sum) { public T endTraversal( final Walker<M,T> walker, T sum) {
return processActiveRegions((ActiveRegionWalker<M,T>)walker, sum, Integer.MAX_VALUE, null); boolean emptyQueue = true;
return processActiveRegions((ActiveRegionWalker<M,T>)walker, sum, emptyQueue);
} }
} }

View File

@ -40,7 +40,7 @@ public class ExperimentalReadShardTraverseActiveRegions <M,T> extends TraversalE
public T traverse( final ActiveRegionWalker<M,T> walker, public T traverse( final ActiveRegionWalker<M,T> walker,
final ReadShardDataProvider readDataProvider, final ReadShardDataProvider readDataProvider,
T sum) { T sum) {
logger.debug(String.format("TraverseActiveRegion.traverse: Read Shard is %s", readDataProvider)); logger.debug(String.format("ExperimentalReadShardTraverseActiveRegions.traverse: Read Shard is %s", readDataProvider));
final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension(); final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion(); final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();

View File

@ -43,7 +43,7 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
public T traverse( final ActiveRegionWalker<M,T> walker, public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider, final LocusShardDataProvider dataProvider,
T sum) { T sum) {
logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); logger.debug(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));
final LocusView locusView = new AllLocusView(dataProvider); final LocusView locusView = new AllLocusView(dataProvider);

View File

@ -4,6 +4,7 @@ import com.google.java.contract.PreconditionError;
import net.sf.samtools.*; import net.sf.samtools.*;
import org.broadinstitute.sting.commandline.Tags; import org.broadinstitute.sting.commandline.Tags;
import org.broadinstitute.sting.gatk.arguments.GATKArgumentCollection; import org.broadinstitute.sting.gatk.arguments.GATKArgumentCollection;
import org.broadinstitute.sting.gatk.datasources.providers.ActiveRegionShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
@ -32,6 +33,7 @@ import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils; import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.ReadUtils; import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.testng.Assert; import org.testng.Assert;
import org.testng.TestException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -513,7 +515,15 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
providers.add(new ReadShardDataProvider(shard, genomeLocParser, shard.iterator(), reference, new ArrayList<ReferenceOrderedDataSource>())); providers.add(new ReadShardDataProvider(shard, genomeLocParser, shard.iterator(), reference, new ArrayList<ReferenceOrderedDataSource>()));
} }
break; break;
default: // other types not implemented case ACTIVEREGIONSHARD:
activeRegionShardTraverse.initialize(engine);
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ActiveRegionShardBalancer())) {
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) {
providers.add(new ActiveRegionShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, shard.iterator(), window.getLocus(), window, reference, new ArrayList<ReferenceOrderedDataSource>()));
}
}
break;
default: throw new TestException("Invalid shard type");
} }
return providers; return providers;
@ -527,7 +537,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
case READSHARD: case READSHARD:
readShardTraverse.traverse(walker, (ReadShardDataProvider) dataProvider, i); readShardTraverse.traverse(walker, (ReadShardDataProvider) dataProvider, i);
break; break;
default: // other types not implemented case ACTIVEREGIONSHARD:
activeRegionShardTraverse.traverse(walker, (ActiveRegionShardDataProvider) dataProvider, i);
break;
default: throw new TestException("Invalid shard type");
} }
} }
@ -539,7 +552,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
case READSHARD: case READSHARD:
readShardTraverse.endTraversal(walker, i); readShardTraverse.endTraversal(walker, i);
break; break;
default: // other types not implemented case ACTIVEREGIONSHARD:
activeRegionShardTraverse.endTraversal(walker, i);
break;
default: throw new TestException("Invalid shard type");
} }
} }