diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index 2ada8bbfa..4bb700c37 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -40,27 +40,28 @@ import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; /** - * @author aaron + * A nano-scheduling version of TraverseReads. + * + * Implements the traversal of a walker that accepts individual reads, the reference, and + * RODs per map call. Directly supports shared memory parallelism via NanoScheduler + * + * @author depristo * @version 1.0 - * @date Apr 24, 2009 - *

- * Class TraverseReads - *

- * This class handles traversing by reads in the new shardable style + * @date 9/2/2012 */ public class TraverseReadsNano extends TraversalEngine,ReadShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class); private static final boolean DEBUG = false; - final NanoScheduler nanoScheduler; + final NanoScheduler nanoScheduler; public TraverseReadsNano(int nThreads) { final int bufferSize = ReadShard.getReadBufferSize() + 1; // actually has 1 more than max - nanoScheduler = new NanoScheduler(bufferSize, nThreads); + nanoScheduler = new NanoScheduler(bufferSize, nThreads); } @Override @@ -95,18 +96,23 @@ public class TraverseReadsNano extends TraversalEngine, final GenomeLoc locus = engine.getGenomeLocParser().createGenomeLoc(lastRead); printProgress(dataProvider.getShard(), locus, aggregatedInputs.size()); - // TODO -- how can I get done value? - // done = walker.isDone(); - return result; } + /** + * Aggregate all of the inputs for all map calls into MapData, to be provided + * to NanoScheduler for Map/Reduce + * + * @param dataProvider the source of our data + * @return a linked list of MapData objects holding the read, ref, and ROD info for every map/reduce + * should execute + */ private List aggregateMapData(final ReadShardDataProvider dataProvider) { final ReadView reads = new ReadView(dataProvider); final ReadReferenceView reference = new ReadReferenceView(dataProvider); final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); - final List mapData = new ArrayList(); // TODO -- need size of reads + final List mapData = new LinkedList(); for ( final SAMRecord read : reads ) { final ReferenceContext refContext = ! read.getReadUnmappedFlag() ? reference.getReferenceContext(read) @@ -132,19 +138,9 @@ public class TraverseReadsNano extends TraversalEngine, super.printOnTraversalDone(); } - private class TraverseReadsReduce implements ReduceFunction { - final ReadWalker walker; - - private TraverseReadsReduce(ReadWalker walker) { - this.walker = walker; - } - - @Override - public T apply(M one, T sum) { - return walker.reduce(one, sum); - } - } - + /** + * The input data needed for each map call. The read, the reference, and the RODs + */ private class MapData { final GATKSAMRecord read; final ReferenceContext refContext; @@ -157,7 +153,43 @@ public class TraverseReadsNano extends TraversalEngine, } } - private class TraverseReadsMap implements MapFunction { + /** + * Contains the results of a map call, indicating whether the call was good, filtered, or done + */ + private class MapResult { + final M value; + final boolean reduceMe; + + /** + * Create a MapResult with value that should be reduced + * + * @param value the value to reduce + */ + private MapResult(final M value) { + this.value = value; + this.reduceMe = true; + } + + /** + * Create a MapResult that shouldn't be reduced + */ + private MapResult() { + this.value = null; + this.reduceMe = false; + } + } + + /** + * A static object that tells reduce that the result of map should be skipped (filtered or done) + */ + private final MapResult SKIP_REDUCE = new MapResult(); + + /** + * MapFunction for TraverseReads meeting NanoScheduler interface requirements + * + * Applies walker.map to MapData, returning a MapResult object containing the result + */ + private class TraverseReadsMap implements MapFunction { final ReadWalker walker; private TraverseReadsMap(ReadWalker walker) { @@ -165,16 +197,36 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - public M apply(final MapData data) { + public MapResult apply(final MapData data) { if ( ! walker.isDone() ) { final boolean keepMeP = walker.filter(data.refContext, data.read); - if (keepMeP) { - return walker.map(data.refContext, data.read, data.tracker); - } + if (keepMeP) + return new MapResult(walker.map(data.refContext, data.read, data.tracker)); } - // TODO -- how can we cleanly support done and filtered. Need to return - // TODO -- a MapResult object that says the status - return null; + + return SKIP_REDUCE; + } + } + + /** + * ReduceFunction for TraverseReads meeting NanoScheduler interface requirements + * + * Takes a MapResult object and applies the walkers reduce function to each map result, when applicable + */ + private class TraverseReadsReduce implements ReduceFunction { + final ReadWalker walker; + + private TraverseReadsReduce(ReadWalker walker) { + this.walker = walker; + } + + @Override + public T apply(MapResult one, T sum) { + if ( one.reduceMe ) + // only run reduce on values that aren't DONE or FAILED + return walker.reduce(one.value, sum); + else + return sum; } } }