Optimization to TraverseReadsNano
-- Don't just read all inputs into a list, and then provide an iterator to that list, actually make a real iterator so NanoScheduler input thread can contribute meaningfully to the work load -- Use NanoScheduler progress function, instead of home-grown updater
This commit is contained in:
parent
b33f804cdc
commit
16eb1c5436
|
|
@ -33,14 +33,13 @@ import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider
|
||||||
import org.broadinstitute.sting.gatk.datasources.providers.ReadView;
|
import org.broadinstitute.sting.gatk.datasources.providers.ReadView;
|
||||||
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
||||||
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
|
||||||
import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction;
|
import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction;
|
||||||
|
import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction;
|
||||||
import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction;
|
import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction;
|
||||||
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
||||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A nano-scheduling version of TraverseReads.
|
* A nano-scheduling version of TraverseReads.
|
||||||
|
|
@ -60,6 +59,13 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
|
|
||||||
public TraverseReadsNano(int nThreads) {
|
public TraverseReadsNano(int nThreads) {
|
||||||
nanoScheduler = new NanoScheduler<MapData, MapResult, T>(nThreads);
|
nanoScheduler = new NanoScheduler<MapData, MapResult, T>(nThreads);
|
||||||
|
nanoScheduler.setProgressFunction(new NSProgressFunction<MapData>() {
|
||||||
|
@Override
|
||||||
|
public void progress(MapData lastProcessedMap) {
|
||||||
|
if ( lastProcessedMap.refContext != null )
|
||||||
|
printProgress(lastProcessedMap.refContext.getLocus());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -88,14 +94,10 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
final TraverseReadsMap myMap = new TraverseReadsMap(walker);
|
final TraverseReadsMap myMap = new TraverseReadsMap(walker);
|
||||||
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
|
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
|
||||||
|
|
||||||
final List<MapData> aggregatedInputs = aggregateMapData(dataProvider);
|
final Iterator<MapData> aggregatedInputs = aggregateMapData(dataProvider);
|
||||||
final T result = nanoScheduler.execute(aggregatedInputs.iterator(), myMap, sum, myReduce);
|
final T result = nanoScheduler.execute(aggregatedInputs, myMap, sum, myReduce);
|
||||||
|
|
||||||
final GATKSAMRecord lastRead = aggregatedInputs.get(aggregatedInputs.size() - 1).read;
|
|
||||||
final GenomeLoc locus = engine.getGenomeLocParser().createGenomeLoc(lastRead);
|
|
||||||
|
|
||||||
updateCumulativeMetrics(dataProvider.getShard());
|
updateCumulativeMetrics(dataProvider.getShard());
|
||||||
printProgress(locus);
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -108,29 +110,37 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
* @return a linked list of MapData objects holding the read, ref, and ROD info for every map/reduce
|
* @return a linked list of MapData objects holding the read, ref, and ROD info for every map/reduce
|
||||||
* should execute
|
* should execute
|
||||||
*/
|
*/
|
||||||
private List<MapData> aggregateMapData(final ReadShardDataProvider dataProvider) {
|
private Iterator<MapData> aggregateMapData(final ReadShardDataProvider dataProvider) {
|
||||||
final ReadView reads = new ReadView(dataProvider);
|
return new Iterator<MapData>() {
|
||||||
final ReadReferenceView reference = new ReadReferenceView(dataProvider);
|
final ReadView reads = new ReadView(dataProvider);
|
||||||
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
final ReadReferenceView reference = new ReadReferenceView(dataProvider);
|
||||||
|
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
||||||
|
final Iterator<SAMRecord> readIterator = reads.iterator();
|
||||||
|
|
||||||
final List<MapData> mapData = new LinkedList<MapData>();
|
@Override public boolean hasNext() { return readIterator.hasNext(); }
|
||||||
for ( final SAMRecord read : reads ) {
|
|
||||||
final ReferenceContext refContext = ! read.getReadUnmappedFlag()
|
|
||||||
? reference.getReferenceContext(read)
|
|
||||||
: null;
|
|
||||||
|
|
||||||
// if the read is mapped, create a metadata tracker
|
@Override
|
||||||
final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0
|
public MapData next() {
|
||||||
? rodView.getReferenceOrderedDataForRead(read)
|
final SAMRecord read = readIterator.next();
|
||||||
: null;
|
final ReferenceContext refContext = ! read.getReadUnmappedFlag()
|
||||||
|
? reference.getReferenceContext(read)
|
||||||
|
: null;
|
||||||
|
|
||||||
// update the number of reads we've seen
|
// if the read is mapped, create a metadata tracker
|
||||||
dataProvider.getShard().getReadMetrics().incrementNumIterations();
|
final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0
|
||||||
|
? rodView.getReferenceOrderedDataForRead(read)
|
||||||
|
: null;
|
||||||
|
|
||||||
mapData.add(new MapData((GATKSAMRecord)read, refContext, tracker));
|
// update the number of reads we've seen
|
||||||
}
|
dataProvider.getShard().getReadMetrics().incrementNumIterations();
|
||||||
|
|
||||||
return mapData;
|
return new MapData((GATKSAMRecord)read, refContext, tracker);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void remove() {
|
||||||
|
throw new UnsupportedOperationException("Remove not supported");
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue