All read and loci traversals go through NanoScheduler now

-- The NanoScheduler is doing a good job at tracking important information like time spent in map/reduce/input etc.
-- Can be disabled with static boolean in MicroScheduler if we have problems
-- See GSA-515 Nanoscheduler GSA-549 Retire TraverseReads and TraverseLoci after testing confirms nano scheduler version in single threaded version is fine
This commit is contained in:
Mark DePristo 2012-09-05 16:38:21 -04:00
parent dddf148a59
commit c5f1ceaa95
2 changed files with 22 additions and 3 deletions

View File

@ -59,6 +59,8 @@ import java.util.Collection;
/** Shards and schedules data in manageable chunks. */
public abstract class MicroScheduler implements MicroSchedulerMBean {
// TODO -- remove me and retire non nano scheduled versions of traversals
private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true;
protected static final Logger logger = Logger.getLogger(MicroScheduler.class);
/**
@ -101,7 +103,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
if ( threadAllocation.isRunningInParallelMode() )
logger.info(String.format("Running the GATK in parallel mode with %d CPU threads for each of %d data threads",
logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)",
threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads()));
if ( threadAllocation.getNumDataThreads() > 1 ) {
@ -147,11 +149,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
this.rods = rods;
if (walker instanceof ReadWalker) {
traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1
traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1
? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread())
: new TraverseReads();
} else if (walker instanceof LocusWalker) {
traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1
traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1
? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread())
: new TraverseLociLinear();
} else if (walker instanceof DuplicateWalker) {

View File

@ -193,6 +193,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
outsideSchedulerTimer.stop();
ReduceType result;
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
result = executeSingleThreaded(inputReader, map, initialValue, reduce);
@ -214,13 +215,29 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
ReduceType sum = initialValue;
int i = 0;
// start timer to ensure that both hasNext and next are caught by the timer
if ( TIME_CALLS ) inputTimer.restart();
while ( inputReader.hasNext() ) {
final InputType input = inputReader.next();
if ( TIME_CALLS ) inputTimer.stop();
// map
if ( TIME_CALLS ) mapTimer.restart();
final MapType mapValue = map.apply(input);
if ( TIME_CALLS ) mapTimer.stop();
if ( i++ % bufferSize == 0 && progressFunction != null )
progressFunction.progress(input);
// reduce
if ( TIME_CALLS ) reduceTimer.restart();
sum = reduce.apply(mapValue, sum);
if ( TIME_CALLS ) reduceTimer.stop();
if ( TIME_CALLS ) inputTimer.restart();
}
return sum;
}