diff --git a/java/src/org/broadinstitute/sting/gatk/executive/Accumulator.java b/java/src/org/broadinstitute/sting/gatk/executive/Accumulator.java new file mode 100755 index 000000000..f8243f378 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/Accumulator.java @@ -0,0 +1,148 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.Pair; + +import java.util.ArrayList; +import java.util.List; +/** + * User: hanna + * Date: May 18, 2009 + * Time: 2:27:17 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Manages the + */ + +public abstract class Accumulator { + /** + * The walker for which to accumulate. + */ + protected final Walker walker; + + /** + * Create a new Accumulator. Forbid outside classes from performing this operation. + * @param walker + */ + protected Accumulator( Walker walker ) { + this.walker = walker; + } + + /** + * Creates an accumulator suitable for accumulating results of the given walker. + * @param walker Walker for which to build an accumulator. + * @return Accumulator suitable for this walker.s + */ + public static Accumulator create( Walker walker ) { + if( walker.isReduceByInterval() ) + return new IntervalAccumulator( walker ); + else + return new StandardAccumulator( walker ); + } + + /** + * Gets the appropriate reduce initializer for this accumulator. + * @return Traversal reduce init to feed into traversal engine. + */ + public abstract Object getReduceInit(); + + /** + * Roll this traversal result into the given accumulator. + * @param result Result of the most recent accumulation. + * @return the newest accumulation of the given data. + */ + public abstract void accumulate( Shard shard, Object result ); + + /** + * Finishes off the traversal. Submits accumulated results to + * the walker and returns them. + * TODO: Its a bit funky to delegate the finishing of the traversal + * to an accumulator, but we're doing it for type safety so the + * right Walker override gets called. Clean this up. + * @return Final result of accumulation. + */ + public abstract Object finishTraversal(); + + /** + * Accumulates in the 'standard' fashion; basically funnels + * the reduce result back into the reduce init and relies on + * the user-supplied reduce to handle the accumulation. + */ + private static class StandardAccumulator extends Accumulator { + private Object accumulator = null; + private boolean initialized = false; + + protected StandardAccumulator( Walker walker ) { + super(walker); + } + + /** + * Standard accumulator returns reduceInit first, then the + * results of the previous accumulation. + */ + public Object getReduceInit() { + if( !initialized ) { + initialized = true; + return walker.reduceInit(); + } + else + return accumulator; + } + + /** + * The result of the accumulator in a non-intervals walker + * already takes the accumulation into account. return the result. + */ + public void accumulate( Shard shard, Object result ) { this.accumulator = result; } + + /** + * The result of the traversal is the list of accumulated intervals. + */ + public Object finishTraversal() { + walker.onTraversalDone(accumulator); + return this.accumulator; + } + } + + /** + * An interval-based accumulator. Treats each reduce result independently, + * and aggregates those results into a single list. + */ + private static class IntervalAccumulator extends Accumulator { + private List> intervalAccumulator = new ArrayList>(); + + protected IntervalAccumulator( Walker walker ) { + super(walker); + } + + /** + * Interval accumulator always feeds reduceInit into every new traversal. + */ + public Object getReduceInit() { return walker.reduceInit(); } + + /** + * Create a holder for interval results if none exists. Add the result to the holder. + */ + public void accumulate( Shard shard, Object result ) { + intervalAccumulator.add( new Pair( shard.getGenomeLoc(), result ) ); + } + + /** + * The result of the traversal is the list of accumulated intervals. + */ + public Object finishTraversal() { + walker.onTraversalDone(intervalAccumulator); + return intervalAccumulator; + } + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 4a4d6075c..c5acce291 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -71,7 +71,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce if( !(walker instanceof TreeReducible) ) throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers"); - ShardStrategy shardStrategy = getShardStrategy( reference, intervals ); + ShardStrategy shardStrategy = getShardStrategy( walker, reference, intervals ); ReduceTree reduceTree = new ReduceTree( this ); walker.initialize(); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index d01f5afcc..ce6db40b0 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -32,20 +32,23 @@ public class LinearMicroScheduler extends MicroScheduler { * @param locations Subset of the dataset over which to walk. */ public Object execute(Walker walker, List locations) { - ShardStrategy shardStrategy = getShardStrategy(reference, locations); + ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations); walker.initialize(); - Object accumulator = walker.reduceInit(); + Accumulator accumulator = Accumulator.create(walker); for (Shard shard : shardStrategy) { ShardDataProvider dataProvider = getShardDataProvider( shard ); - accumulator = traversalEngine.traverse(walker, shard, dataProvider, accumulator); + + Object result = traversalEngine.traverse(walker, shard, dataProvider, accumulator.getReduceInit()); + accumulator.accumulate( shard, result ); + dataProvider.close(); } - traversalEngine.printOnTraversalDone(accumulator); + Object result = accumulator.finishTraversal(); - walker.onTraversalDone(accumulator); + traversalEngine.printOnTraversalDone(result); return accumulator; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index e147efb21..0e5984167 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -13,6 +13,7 @@ import org.broadinstitute.sting.gatk.traversals.TraverseLoci; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.Reads; @@ -98,26 +99,40 @@ public abstract class MicroScheduler { /** * Get the sharding strategy given a driving data source. + * @param walker Walker for which to infer sharding strategy. * @param drivingDataSource Data on which to shard. * @param intervals Intervals to use when limiting sharding. * @return Sharding strategy for this driving data source. */ - protected ShardStrategy getShardStrategy( ReferenceSequenceFile drivingDataSource, List intervals ) { + protected ShardStrategy getShardStrategy( Walker walker, ReferenceSequenceFile drivingDataSource, List intervals ) { ShardStrategy shardStrategy = null; - ShardStrategyFactory.SHATTER_STRATEGY shardType = (traversalEngine instanceof TraverseReads) ? - ShardStrategyFactory.SHATTER_STRATEGY.READS : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR; - - if( intervals != null && shardType != ShardStrategyFactory.SHATTER_STRATEGY.READS) - shardStrategy = ShardStrategyFactory.shatter( shardType, - drivingDataSource.getSequenceDictionary(), - SHARD_SIZE, - intervals ); - else - shardStrategy = ShardStrategyFactory.shatter( shardType, + + if( walker instanceof LocusWalker ) { + if( intervals != null ) { + ShardStrategyFactory.SHATTER_STRATEGY shardType = (walker.isReduceByInterval()) ? + ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL : + ShardStrategyFactory.SHATTER_STRATEGY.LINEAR; + + shardStrategy = ShardStrategyFactory.shatter( shardType, + drivingDataSource.getSequenceDictionary(), + SHARD_SIZE, + intervals ); + } + else + shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, + drivingDataSource.getSequenceDictionary(), + SHARD_SIZE ); + + } + else if( walker instanceof ReadWalker ) { + shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.READS, drivingDataSource.getSequenceDictionary(), SHARD_SIZE ); + } + else + throw new StingException("Unable to support walker of type" + walker.getClass().getName()); - return shardStrategy; + return shardStrategy; } /** diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java index 0e1646268..12a48f659 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java @@ -17,7 +17,6 @@ import net.sf.samtools.SAMRecord; * Time: 11:23:14 AM * To change this template use File | Settings | File Templates. */ -@By(DataSource.REFERENCE) public class PrintLocusContextWalker extends LocusWalker implements TreeReducible { public LocusContext map(RefMetaDataTracker tracker, char ref, LocusContext context) { out.printf( "In map: ref = %c, loc = %s, reads = %s%n", ref,