diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index f17450247..bee25dc2f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -588,7 +588,10 @@ public class GenomeAnalysisEngine { else return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), readShardBalancer); case ACTIVEREGIONSHARD: - throw new UserException.CommandLineException("Not implemented."); + if(intervals == null) + return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new ActiveRegionShardBalancer()); + else + return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new ActiveRegionShardBalancer()); default: throw new UserException.CommandLineException("Invalid active region shard type."); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ActiveRegionShardDataProvider.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ActiveRegionShardDataProvider.java new file mode 100644 index 000000000..55e51f934 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ActiveRegionShardDataProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2012, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.datasources.providers; + +import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.gatk.ReadProperties; +import org.broadinstitute.sting.gatk.datasources.reads.Shard; +import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; +import org.broadinstitute.sting.gatk.iterators.LocusIterator; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; + +import java.util.Collection; + +/** + * @author Joel Thibault + */ +public class ActiveRegionShardDataProvider extends ShardDataProvider { + final private ReadShardDataProvider readProvider; + final private LocusShardDataProvider locusProvider; + + public ActiveRegionShardDataProvider(Shard shard, ReadProperties sourceInfo, GenomeLocParser genomeLocParser, StingSAMIterator reads, GenomeLoc locus, LocusIterator locusIterator, IndexedFastaSequenceFile reference, Collection rods) { + super(shard, genomeLocParser, reference, rods); // TODO: necessary? + readProvider = new ReadShardDataProvider(shard, genomeLocParser, reads, reference, rods); + locusProvider = new LocusShardDataProvider(shard, sourceInfo, genomeLocParser, locus, locusIterator, reference, rods); + } + + public ReadShardDataProvider getReadShardDataProvider() { + return readProvider; + } + + public LocusShardDataProvider getLocusShardDataProvider(LocusIterator iterator) { + return locusProvider; + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShard.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShard.java new file mode 100755 index 000000000..381b193e9 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShard.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2012, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.datasources.reads; + +import net.sf.samtools.SAMFileSpan; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; + +import java.util.List; +import java.util.Map; + +/** + * @author Joel Thibault + */ +public class ActiveRegionShard extends ReadShard { + public ActiveRegionShard(GenomeLocParser parser, SAMDataSource readsDataSource, Map fileSpans, List loci, boolean isUnmapped) { + super(parser, readsDataSource, fileSpans, loci, isUnmapped); + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java new file mode 100644 index 000000000..338dd1bdf --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2012, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.datasources.reads; + +/** + * @author Joel Thibault + */ +public class ActiveRegionShardBalancer extends ReadShardBalancer { + // TODO ? +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/Shard.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/Shard.java index e22a7a54d..314156af6 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/Shard.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/Shard.java @@ -40,7 +40,9 @@ import java.util.Map; */ public abstract class Shard implements HasGenomeLocation { public enum ShardType { - READ, LOCUS + READ, + LOCUS, + ACTIVEREGION // Used only by ExperimentalActiveRegionShardTraverseActiveRegions } protected final GenomeLocParser parser; // incredibly annoying! diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 84d975879..44f9978a6 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -2,6 +2,7 @@ package org.broadinstitute.sting.gatk.executive; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; +import org.broadinstitute.sting.gatk.datasources.providers.ActiveRegionShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; @@ -80,6 +81,18 @@ public class LinearMicroScheduler extends MicroScheduler { } windowMaker.close(); } + else if(shard.getShardType() == Shard.ShardType.ACTIVEREGION) { + WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), + getReadIterator(shard), shard.getGenomeLocs(), SampleUtils.getSAMFileSamples(engine)); + for(WindowMaker.WindowMakerIterator iterator: windowMaker) { + ShardDataProvider dataProvider = new ActiveRegionShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),getReadIterator(shard),iterator.getLocus(),iterator,reference,rods); + Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); + accumulator.accumulate(dataProvider,result); + dataProvider.close(); + if ( walker.isDone() ) break; + } + windowMaker.close(); + } else { ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalActiveRegionShardTraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalActiveRegionShardTraverseActiveRegions.java index 71cb89ad9..45d132678 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalActiveRegionShardTraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalActiveRegionShardTraverseActiveRegions.java @@ -1,32 +1,35 @@ package org.broadinstitute.sting.gatk.traversals; +import net.sf.samtools.SAMFileHeader; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.WalkerManager; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.datasources.providers.*; +import org.broadinstitute.sting.gatk.datasources.reads.Shard; +import org.broadinstitute.sting.gatk.executive.WindowMaker; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension; import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.SampleUtils; import org.broadinstitute.sting.utils.activeregion.ActiveRegion; import org.broadinstitute.sting.utils.activeregion.ActivityProfile; import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult; -import org.broadinstitute.sting.utils.pileup.PileupElement; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import java.util.*; -public class ExperimentalActiveRegionShardTraverseActiveRegions extends TraversalEngine,LocusShardDataProvider> { +public class ExperimentalActiveRegionShardTraverseActiveRegions extends TraversalEngine,ActiveRegionShardDataProvider> { /** * our log, which we want to capture anything from this class */ protected final static Logger logger = Logger.getLogger(TraversalEngine.class); private final LinkedList workQueue = new LinkedList(); - private final LinkedHashSet myReads = new LinkedHashSet(); + private final LinkedList myReads = new LinkedList(); @Override public String getTraversalUnits() { @@ -35,71 +38,65 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra @Override public T traverse( final ActiveRegionWalker walker, - final LocusShardDataProvider dataProvider, + final ActiveRegionShardDataProvider dataProvider, T sum) { - logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); + logger.debug(String.format("ExperimentalActiveRegionShardTraverseActiveRegions.traverse: Shard is %s", dataProvider)); - final LocusView locusView = new AllLocusView(dataProvider); + ReadShardDataProvider readDataProvider = dataProvider.getReadShardDataProvider(); - final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider ); final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension(); final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion(); - int minStart = Integer.MAX_VALUE; + final ReadView readView = new ReadView(readDataProvider); + final List activeRegions = new LinkedList(); - ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() ); + ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions()); - ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView); + Shard readShard = readDataProvider.getShard(); + SAMFileHeader header = readShard.getReadProperties().getHeader(); + WindowMaker windowMaker = new WindowMaker(readShard, engine.getGenomeLocParser(), + readView.iterator(), readShard.getGenomeLocs(), SampleUtils.getSAMFileSamples(header)); - // We keep processing while the next reference location is within the interval - GenomeLoc prevLoc = null; - while( locusView.hasNext() ) { - final AlignmentContext locus = locusView.next(); - final GenomeLoc location = locus.getLocation(); + for(WindowMaker.WindowMakerIterator iterator: windowMaker) { + LocusShardDataProvider locusDataProvider = dataProvider.getLocusShardDataProvider(iterator); + final LocusView locusView = new AllLocusView(locusDataProvider); + final LocusReferenceView referenceView = new LocusReferenceView( walker, locusDataProvider ); + ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, locusDataProvider, locusView); - // Grab all the previously unseen reads from this pileup and add them to the massive read list - // Note that this must occur before we leave because we are outside the intervals because - // reads may occur outside our intervals but overlap them in the future - // TODO -- this whole HashSet logic should be changed to a linked list of reads with - // TODO -- subsequent pass over them to find the ones overlapping the active regions - for( final PileupElement p : locus.getBasePileup() ) { - final GATKSAMRecord read = p.getRead(); - if( !myReads.contains(read) ) { - myReads.add(read); + // We keep processing while the next reference location is within the interval + GenomeLoc prevLoc = null; + while( locusView.hasNext() ) { + final AlignmentContext locus = locusView.next(); + final GenomeLoc location = locus.getLocation(); + + if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) { + // we've move across some interval boundary, restart profile + profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); } - // If this is the last pileup for this shard calculate the minimum alignment start so that we know - // which active regions in the work queue are now safe to process - minStart = Math.min(minStart, read.getAlignmentStart()); + readDataProvider.getShard().getReadMetrics().incrementNumIterations(); + + // create reference context. Note that if we have a pileup of "extended events", the context will + // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). + final ReferenceContext refContext = referenceView.getReferenceContext(location); + + // Iterate forward to get all reference ordered data covering this location + final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); + + // Call the walkers isActive function for this locus and add them to the list to be integrated later + profile.add(walkerActiveProb(walker, tracker, refContext, locus, location)); + + prevLoc = location; + + printProgress(locus.getLocation()); } - // skip this location -- it's not part of our engine intervals - if ( outsideEngineIntervals(location) ) - continue; - - if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) { - // we've move across some interval boundary, restart profile - profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); - } - - dataProvider.getShard().getReadMetrics().incrementNumIterations(); - - // create reference context. Note that if we have a pileup of "extended events", the context will - // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). - final ReferenceContext refContext = referenceView.getReferenceContext(location); - - // Iterate forward to get all reference ordered data covering this location - final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); - - // Call the walkers isActive function for this locus and add them to the list to be integrated later - profile.add(walkerActiveProb(walker, tracker, refContext, locus, location)); - - prevLoc = location; - - printProgress(locus.getLocation()); + locusDataProvider.close(); } - updateCumulativeMetrics(dataProvider.getShard()); + windowMaker.close(); + + updateCumulativeMetrics(readDataProvider.getShard()); if ( ! profile.isEmpty() ) incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); @@ -113,7 +110,7 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) { workQueue.removeLast(); activeRegions.remove(first); - workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) ); + workQueue.addLast(new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension)); } } workQueue.addAll( activeRegions ); @@ -121,21 +118,13 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." ); - // now go and process all of the active regions - sum = processActiveRegions(walker, sum, minStart, dataProvider.getLocus().getContig()); + // now process the active regions, where possible + boolean emptyQueue = false; + sum = processActiveRegions(walker, sum, emptyQueue); return sum; } - /** - * Is the loc outside of the intervals being requested for processing by the GATK? - * @param loc - * @return - */ - private boolean outsideEngineIntervals(final GenomeLoc loc) { - return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc); - } - /** * Take the individual isActive calls and integrate them into contiguous active regions and * add these blocks of work to the work queue @@ -191,12 +180,12 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra // // -------------------------------------------------------------------------------- - private T processActiveRegions( final ActiveRegionWalker walker, T sum, final int minStart, final String currentContig ) { + private T processActiveRegions( final ActiveRegionWalker walker, T sum, boolean emptyQueue ) { if( walker.activeRegionOutStream != null ) { writeActiveRegionsToStream(walker); return sum; } else { - return callWalkerMapOnActiveRegions(walker, sum, minStart, currentContig); + return callWalkerMapOnActiveRegions(walker, sum, emptyQueue); } } @@ -214,70 +203,99 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra } } - private T callWalkerMapOnActiveRegions( final ActiveRegionWalker walker, T sum, final int minStart, final String currentContig ) { - // Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them + private T callWalkerMapOnActiveRegions( final ActiveRegionWalker walker, T sum, boolean emptyQueue ) { + final int lastRegionStart = workQueue.getLast().getLocation().getStart(); + final String lastRegionContig = workQueue.getLast().getLocation().getContig(); + + // If we've traversed sufficiently past the beginning of the workQueue we can unload those regions and process them // TODO can implement parallel traversal here - while( workQueue.peek() != null ) { - final GenomeLoc extendedLoc = workQueue.peek().getExtendedLoc(); - if ( extendedLoc.getStop() < minStart || (currentContig != null && !workQueue.peek().getExtendedLoc().getContig().equals(currentContig))) { - final ActiveRegion activeRegion = workQueue.remove(); - sum = processActiveRegion(activeRegion, sum, walker); - } else { - break; + while( workQueue.peekFirst() != null ) { + ActiveRegion firstRegion = workQueue.getFirst(); + final String firstRegionContig = firstRegion.getLocation().getContig(); + if (emptyQueue || firstRegionContig != lastRegionContig) { + sum = processFirstActiveRegion(sum, walker); + } + else { + final int firstRegionMaxReadStop = walker.wantsExtendedReads() ? firstRegion.getMaxReadStop() : firstRegion.getExtendedMaxReadStop(); + if (lastRegionStart > firstRegionMaxReadStop) { + sum = processFirstActiveRegion( sum, walker ); + } + else { + break; + } } } return sum; } - private T processActiveRegion( final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker walker ) { - final ArrayList placedReads = new ArrayList(); - for( final GATKSAMRecord read : myReads ) { - final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read ); - if( activeRegion.getLocation().overlapsP( readLoc ) ) { + /** + * Process the first active region and all remaining reads which overlap + * + * Remove the first active region from the queue + * (NB: some reads associated with this active region may have already been processed) + * + * Remove all of these reads from the queue + * (NB: some may be associated with other active regions) + * + * @param sum + * @param walker + * @return + */ + private T processFirstActiveRegion( final T sum, final ActiveRegionWalker walker ) { + final ActiveRegion firstRegion = workQueue.removeFirst(); + + GATKSAMRecord firstRead = myReads.peekFirst(); // don't remove because it may not be placed here + GenomeLoc firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead ); + + while ( firstRegion.getLocation().overlapsP( firstReadLoc ) || + (walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc ))) { + if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) { // The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region) - long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc ); - ActiveRegion bestRegion = activeRegion; + long maxOverlap = firstRegion.getLocation().sizeOfOverlap( firstReadLoc ); + ActiveRegion bestRegion = firstRegion; for( final ActiveRegion otherRegionToTest : workQueue ) { - if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) { - maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc ); + if( otherRegionToTest.getLocation().sizeOfOverlap(firstReadLoc) >= maxOverlap ) { + maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( firstReadLoc ); bestRegion = otherRegionToTest; } } - bestRegion.add( read ); + bestRegion.add( firstRead ); // The read is also added to all other regions in which it overlaps but marked as non-primary if( walker.wantsNonPrimaryReads() ) { - if( !bestRegion.equals(activeRegion) ) { - activeRegion.add( read ); + if( !bestRegion.equals(firstRegion) ) { + firstRegion.add(firstRead); } for( final ActiveRegion otherRegionToTest : workQueue ) { if( !bestRegion.equals(otherRegionToTest) ) { // check for non-primary vs. extended - if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) { - otherRegionToTest.add( read ); - } else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) { - otherRegionToTest.add( read ); + if ( otherRegionToTest.getLocation().overlapsP( firstReadLoc ) ) { + otherRegionToTest.add( firstRead ); + } else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( firstReadLoc ) ) { + otherRegionToTest.add( firstRead ); } } } } - placedReads.add( read ); - // check for non-primary vs. extended - } else if( activeRegion.getLocation().overlapsP( readLoc ) ) { - if ( walker.wantsNonPrimaryReads() ) { - activeRegion.add( read ); - } - } else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) { - activeRegion.add( read ); - } - } - myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region - // WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way. - logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc()); - final M x = walker.map(activeRegion, null); - return walker.reduce( x, sum ); + // check for non-primary vs. extended + } else if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) { + if ( walker.wantsNonPrimaryReads() ) { + firstRegion.add( firstRead ); + } + } else if( walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc )) { + firstRegion.add( firstRead ); + } + + myReads.removeFirst(); + firstRead = myReads.peekFirst(); + firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead ); + } + + logger.debug(">> Map call with " + firstRegion.getReads().size() + " " + (firstRegion.isActive ? "active" : "inactive") + " reads @ " + firstRegion.getLocation() + " with full extent: " + firstRegion.getReferenceLoc()); + final M x = walker.map( firstRegion, null ); + return walker.reduce(x, sum); } /** @@ -285,6 +303,7 @@ public class ExperimentalActiveRegionShardTraverseActiveRegions extends Tra * Ugly for now but will be cleaned up when we push this functionality more into the engine */ public T endTraversal( final Walker walker, T sum) { - return processActiveRegions((ActiveRegionWalker)walker, sum, Integer.MAX_VALUE, null); + boolean emptyQueue = true; + return processActiveRegions((ActiveRegionWalker)walker, sum, emptyQueue); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java index 672d37f7f..299ee4f56 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java @@ -40,7 +40,7 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalE public T traverse( final ActiveRegionWalker walker, final ReadShardDataProvider readDataProvider, T sum) { - logger.debug(String.format("TraverseActiveRegion.traverse: Read Shard is %s", readDataProvider)); + logger.debug(String.format("ExperimentalReadShardTraverseActiveRegions.traverse: Read Shard is %s", readDataProvider)); final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension(); final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index 33323ba67..2562fcccd 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -43,7 +43,7 @@ public class TraverseActiveRegions extends TraversalEngine walker, final LocusShardDataProvider dataProvider, T sum) { - logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); + logger.debug(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider)); final LocusView locusView = new AllLocusView(dataProvider); diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java index 1204e639e..5051bc35f 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java @@ -4,6 +4,7 @@ import com.google.java.contract.PreconditionError; import net.sf.samtools.*; import org.broadinstitute.sting.commandline.Tags; import org.broadinstitute.sting.gatk.arguments.GATKArgumentCollection; +import org.broadinstitute.sting.gatk.datasources.providers.ActiveRegionShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; @@ -32,6 +33,7 @@ import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile; import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils; import org.broadinstitute.sting.utils.sam.ReadUtils; import org.testng.Assert; +import org.testng.TestException; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -513,7 +515,15 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { providers.add(new ReadShardDataProvider(shard, genomeLocParser, shard.iterator(), reference, new ArrayList())); } break; - default: // other types not implemented + case ACTIVEREGIONSHARD: + activeRegionShardTraverse.initialize(engine); + for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ActiveRegionShardBalancer())) { + for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) { + providers.add(new ActiveRegionShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, shard.iterator(), window.getLocus(), window, reference, new ArrayList())); + } + } + break; + default: throw new TestException("Invalid shard type"); } return providers; @@ -527,7 +537,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { case READSHARD: readShardTraverse.traverse(walker, (ReadShardDataProvider) dataProvider, i); break; - default: // other types not implemented + case ACTIVEREGIONSHARD: + activeRegionShardTraverse.traverse(walker, (ActiveRegionShardDataProvider) dataProvider, i); + break; + default: throw new TestException("Invalid shard type"); } } @@ -539,7 +552,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { case READSHARD: readShardTraverse.endTraversal(walker, i); break; - default: // other types not implemented + case ACTIVEREGIONSHARD: + activeRegionShardTraverse.endTraversal(walker, i); + break; + default: throw new TestException("Invalid shard type"); } }