Reintroduce support for interval-based traversals.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@749 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-05-18 22:54:18 +00:00
parent e9f85ef920
commit ff798fe483
5 changed files with 184 additions and 19 deletions

View File

@ -0,0 +1,148 @@
package org.broadinstitute.sting.gatk.executive;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.Pair;
import java.util.ArrayList;
import java.util.List;
/**
* User: hanna
* Date: May 18, 2009
* Time: 2:27:17 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Manages the
*/
public abstract class Accumulator {
/**
* The walker for which to accumulate.
*/
protected final Walker walker;
/**
* Create a new Accumulator. Forbid outside classes from performing this operation.
* @param walker
*/
protected Accumulator( Walker walker ) {
this.walker = walker;
}
/**
* Creates an accumulator suitable for accumulating results of the given walker.
* @param walker Walker for which to build an accumulator.
* @return Accumulator suitable for this walker.s
*/
public static Accumulator create( Walker walker ) {
if( walker.isReduceByInterval() )
return new IntervalAccumulator( walker );
else
return new StandardAccumulator( walker );
}
/**
* Gets the appropriate reduce initializer for this accumulator.
* @return Traversal reduce init to feed into traversal engine.
*/
public abstract Object getReduceInit();
/**
* Roll this traversal result into the given accumulator.
* @param result Result of the most recent accumulation.
* @return the newest accumulation of the given data.
*/
public abstract void accumulate( Shard shard, Object result );
/**
* Finishes off the traversal. Submits accumulated results to
* the walker and returns them.
* TODO: Its a bit funky to delegate the finishing of the traversal
* to an accumulator, but we're doing it for type safety so the
* right Walker override gets called. Clean this up.
* @return Final result of accumulation.
*/
public abstract Object finishTraversal();
/**
* Accumulates in the 'standard' fashion; basically funnels
* the reduce result back into the reduce init and relies on
* the user-supplied reduce to handle the accumulation.
*/
private static class StandardAccumulator extends Accumulator {
private Object accumulator = null;
private boolean initialized = false;
protected StandardAccumulator( Walker walker ) {
super(walker);
}
/**
* Standard accumulator returns reduceInit first, then the
* results of the previous accumulation.
*/
public Object getReduceInit() {
if( !initialized ) {
initialized = true;
return walker.reduceInit();
}
else
return accumulator;
}
/**
* The result of the accumulator in a non-intervals walker
* already takes the accumulation into account. return the result.
*/
public void accumulate( Shard shard, Object result ) { this.accumulator = result; }
/**
* The result of the traversal is the list of accumulated intervals.
*/
public Object finishTraversal() {
walker.onTraversalDone(accumulator);
return this.accumulator;
}
}
/**
* An interval-based accumulator. Treats each reduce result independently,
* and aggregates those results into a single list.
*/
private static class IntervalAccumulator extends Accumulator {
private List<Pair<GenomeLoc,Object>> intervalAccumulator = new ArrayList<Pair<GenomeLoc,Object>>();
protected IntervalAccumulator( Walker walker ) {
super(walker);
}
/**
* Interval accumulator always feeds reduceInit into every new traversal.
*/
public Object getReduceInit() { return walker.reduceInit(); }
/**
* Create a holder for interval results if none exists. Add the result to the holder.
*/
public void accumulate( Shard shard, Object result ) {
intervalAccumulator.add( new Pair<GenomeLoc,Object>( shard.getGenomeLoc(), result ) );
}
/**
* The result of the traversal is the list of accumulated intervals.
*/
public Object finishTraversal() {
walker.onTraversalDone(intervalAccumulator);
return intervalAccumulator;
}
}
}

View File

@ -71,7 +71,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
if( !(walker instanceof TreeReducible) )
throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers");
ShardStrategy shardStrategy = getShardStrategy( reference, intervals );
ShardStrategy shardStrategy = getShardStrategy( walker, reference, intervals );
ReduceTree reduceTree = new ReduceTree( this );
walker.initialize();

View File

@ -32,20 +32,23 @@ public class LinearMicroScheduler extends MicroScheduler {
* @param locations Subset of the dataset over which to walk.
*/
public Object execute(Walker walker, List<GenomeLoc> locations) {
ShardStrategy shardStrategy = getShardStrategy(reference, locations);
ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations);
walker.initialize();
Object accumulator = walker.reduceInit();
Accumulator accumulator = Accumulator.create(walker);
for (Shard shard : shardStrategy) {
ShardDataProvider dataProvider = getShardDataProvider( shard );
accumulator = traversalEngine.traverse(walker, shard, dataProvider, accumulator);
Object result = traversalEngine.traverse(walker, shard, dataProvider, accumulator.getReduceInit());
accumulator.accumulate( shard, result );
dataProvider.close();
}
traversalEngine.printOnTraversalDone(accumulator);
Object result = accumulator.finishTraversal();
walker.onTraversalDone(accumulator);
traversalEngine.printOnTraversalDone(result);
return accumulator;
}

View File

@ -13,6 +13,7 @@ import org.broadinstitute.sting.gatk.traversals.TraverseLoci;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
import org.broadinstitute.sting.gatk.Reads;
@ -98,26 +99,40 @@ public abstract class MicroScheduler {
/**
* Get the sharding strategy given a driving data source.
* @param walker Walker for which to infer sharding strategy.
* @param drivingDataSource Data on which to shard.
* @param intervals Intervals to use when limiting sharding.
* @return Sharding strategy for this driving data source.
*/
protected ShardStrategy getShardStrategy( ReferenceSequenceFile drivingDataSource, List<GenomeLoc> intervals ) {
protected ShardStrategy getShardStrategy( Walker walker, ReferenceSequenceFile drivingDataSource, List<GenomeLoc> intervals ) {
ShardStrategy shardStrategy = null;
ShardStrategyFactory.SHATTER_STRATEGY shardType = (traversalEngine instanceof TraverseReads) ?
ShardStrategyFactory.SHATTER_STRATEGY.READS : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR;
if( intervals != null && shardType != ShardStrategyFactory.SHATTER_STRATEGY.READS)
shardStrategy = ShardStrategyFactory.shatter( shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals );
else
shardStrategy = ShardStrategyFactory.shatter( shardType,
if( walker instanceof LocusWalker ) {
if( intervals != null ) {
ShardStrategyFactory.SHATTER_STRATEGY shardType = (walker.isReduceByInterval()) ?
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL :
ShardStrategyFactory.SHATTER_STRATEGY.LINEAR;
shardStrategy = ShardStrategyFactory.shatter( shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals );
}
else
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE );
}
else if( walker instanceof ReadWalker ) {
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.READS,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE );
}
else
throw new StingException("Unable to support walker of type" + walker.getClass().getName());
return shardStrategy;
return shardStrategy;
}
/**

View File

@ -17,7 +17,6 @@ import net.sf.samtools.SAMRecord;
* Time: 11:23:14 AM
* To change this template use File | Settings | File Templates.
*/
@By(DataSource.REFERENCE)
public class PrintLocusContextWalker extends LocusWalker<LocusContext, Integer> implements TreeReducible<Integer> {
public LocusContext map(RefMetaDataTracker tracker, char ref, LocusContext context) {
out.printf( "In map: ref = %c, loc = %s, reads = %s%n", ref,