From c5f1ceaa95d17b9aedd9b2e9a33d7d516fee95b8 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 5 Sep 2012 16:38:21 -0400 Subject: [PATCH] All read and loci traversals go through NanoScheduler now -- The NanoScheduler is doing a good job at tracking important information like time spent in map/reduce/input etc. -- Can be disabled with static boolean in MicroScheduler if we have problems -- See GSA-515 Nanoscheduler GSA-549 Retire TraverseReads and TraverseLoci after testing confirms nano scheduler version in single threaded version is fine --- .../sting/gatk/executive/MicroScheduler.java | 8 +++++--- .../utils/nanoScheduler/NanoScheduler.java | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index bc0d5da96..490f44470 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -59,6 +59,8 @@ import java.util.Collection; /** Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler implements MicroSchedulerMBean { + // TODO -- remove me and retire non nano scheduled versions of traversals + private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true; protected static final Logger logger = Logger.getLogger(MicroScheduler.class); /** @@ -101,7 +103,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { if ( threadAllocation.isRunningInParallelMode() ) - logger.info(String.format("Running the GATK in parallel mode with %d CPU threads for each of %d data threads", + logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)", threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads())); if ( threadAllocation.getNumDataThreads() > 1 ) { @@ -147,11 +149,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { this.rods = rods; if (walker instanceof ReadWalker) { - traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1 + traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()) : new TraverseReads(); } else if (walker instanceof LocusWalker) { - traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1 + traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()) : new TraverseLociLinear(); } else if (walker instanceof DuplicateWalker) { diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 89e44ce93..ade6dcaf5 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -193,6 +193,7 @@ public class NanoScheduler { if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); outsideSchedulerTimer.stop(); + ReduceType result; if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { result = executeSingleThreaded(inputReader, map, initialValue, reduce); @@ -214,13 +215,29 @@ public class NanoScheduler { final NanoSchedulerReduceFunction reduce) { ReduceType sum = initialValue; int i = 0; + + // start timer to ensure that both hasNext and next are caught by the timer + if ( TIME_CALLS ) inputTimer.restart(); while ( inputReader.hasNext() ) { final InputType input = inputReader.next(); + if ( TIME_CALLS ) inputTimer.stop(); + + // map + if ( TIME_CALLS ) mapTimer.restart(); final MapType mapValue = map.apply(input); + if ( TIME_CALLS ) mapTimer.stop(); + if ( i++ % bufferSize == 0 && progressFunction != null ) progressFunction.progress(input); + + // reduce + if ( TIME_CALLS ) reduceTimer.restart(); sum = reduce.apply(mapValue, sum); + if ( TIME_CALLS ) reduceTimer.stop(); + + if ( TIME_CALLS ) inputTimer.restart(); } + return sum; }