TraverseReadsNano supports walker.filter and walker.done

-- Instead of returning directly the result of map(), returns a MapResult object with the value and a reduceMe flag.
-- Reduce function respects the reduceMe flag
-- Code cleanup and more documentation
This commit is contained in:
Mark DePristo 2012-09-02 12:16:56 -04:00
parent 1a8f5fc374
commit 9823102c0c
1 changed files with 86 additions and 34 deletions

View File

@ -40,27 +40,28 @@ import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* @author aaron
* A nano-scheduling version of TraverseReads.
*
* Implements the traversal of a walker that accepts individual reads, the reference, and
* RODs per map call. Directly supports shared memory parallelism via NanoScheduler
*
* @author depristo
* @version 1.0
* @date Apr 24, 2009
* <p/>
* Class TraverseReads
* <p/>
* This class handles traversing by reads in the new shardable style
* @date 9/2/2012
*/
public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,ReadShardDataProvider> {
/** our log, which we want to capture anything from this class */
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
private static final boolean DEBUG = false;
final NanoScheduler<MapData, M, T> nanoScheduler;
final NanoScheduler<MapData, MapResult, T> nanoScheduler;
public TraverseReadsNano(int nThreads) {
final int bufferSize = ReadShard.getReadBufferSize() + 1; // actually has 1 more than max
nanoScheduler = new NanoScheduler<MapData, M, T>(bufferSize, nThreads);
nanoScheduler = new NanoScheduler<MapData, MapResult, T>(bufferSize, nThreads);
}
@Override
@ -95,18 +96,23 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
final GenomeLoc locus = engine.getGenomeLocParser().createGenomeLoc(lastRead);
printProgress(dataProvider.getShard(), locus, aggregatedInputs.size());
// TODO -- how can I get done value?
// done = walker.isDone();
return result;
}
/**
* Aggregate all of the inputs for all map calls into MapData, to be provided
* to NanoScheduler for Map/Reduce
*
* @param dataProvider the source of our data
* @return a linked list of MapData objects holding the read, ref, and ROD info for every map/reduce
* should execute
*/
private List<MapData> aggregateMapData(final ReadShardDataProvider dataProvider) {
final ReadView reads = new ReadView(dataProvider);
final ReadReferenceView reference = new ReadReferenceView(dataProvider);
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
final List<MapData> mapData = new ArrayList<MapData>(); // TODO -- need size of reads
final List<MapData> mapData = new LinkedList<MapData>();
for ( final SAMRecord read : reads ) {
final ReferenceContext refContext = ! read.getReadUnmappedFlag()
? reference.getReferenceContext(read)
@ -132,19 +138,9 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
super.printOnTraversalDone();
}
private class TraverseReadsReduce implements ReduceFunction<M, T> {
final ReadWalker<M,T> walker;
private TraverseReadsReduce(ReadWalker<M, T> walker) {
this.walker = walker;
}
@Override
public T apply(M one, T sum) {
return walker.reduce(one, sum);
}
}
/**
* The input data needed for each map call. The read, the reference, and the RODs
*/
private class MapData {
final GATKSAMRecord read;
final ReferenceContext refContext;
@ -157,7 +153,43 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
}
}
private class TraverseReadsMap implements MapFunction<MapData, M> {
/**
* Contains the results of a map call, indicating whether the call was good, filtered, or done
*/
private class MapResult {
final M value;
final boolean reduceMe;
/**
* Create a MapResult with value that should be reduced
*
* @param value the value to reduce
*/
private MapResult(final M value) {
this.value = value;
this.reduceMe = true;
}
/**
* Create a MapResult that shouldn't be reduced
*/
private MapResult() {
this.value = null;
this.reduceMe = false;
}
}
/**
* A static object that tells reduce that the result of map should be skipped (filtered or done)
*/
private final MapResult SKIP_REDUCE = new MapResult();
/**
* MapFunction for TraverseReads meeting NanoScheduler interface requirements
*
* Applies walker.map to MapData, returning a MapResult object containing the result
*/
private class TraverseReadsMap implements MapFunction<MapData, MapResult> {
final ReadWalker<M,T> walker;
private TraverseReadsMap(ReadWalker<M, T> walker) {
@ -165,16 +197,36 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
}
@Override
public M apply(final MapData data) {
public MapResult apply(final MapData data) {
if ( ! walker.isDone() ) {
final boolean keepMeP = walker.filter(data.refContext, data.read);
if (keepMeP) {
return walker.map(data.refContext, data.read, data.tracker);
}
if (keepMeP)
return new MapResult(walker.map(data.refContext, data.read, data.tracker));
}
// TODO -- how can we cleanly support done and filtered. Need to return
// TODO -- a MapResult object that says the status
return null;
return SKIP_REDUCE;
}
}
/**
* ReduceFunction for TraverseReads meeting NanoScheduler interface requirements
*
* Takes a MapResult object and applies the walkers reduce function to each map result, when applicable
*/
private class TraverseReadsReduce implements ReduceFunction<MapResult, T> {
final ReadWalker<M,T> walker;
private TraverseReadsReduce(ReadWalker<M, T> walker) {
this.walker = walker;
}
@Override
public T apply(MapResult one, T sum) {
if ( one.reduceMe )
// only run reduce on values that aren't DONE or FAILED
return walker.reduce(one.value, sum);
else
return sum;
}
}
}