diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 5c6aa6a35..39b541944 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -3,6 +3,8 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.AutoFormattingTime; +import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; @@ -44,6 +46,7 @@ import java.util.concurrent.*; public class NanoScheduler { private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; + private final static boolean TIME_CALLS = true; final int bufferSize; final int nThreads; @@ -51,6 +54,10 @@ public class NanoScheduler { boolean shutdown = false; boolean debug = false; + final SimpleTimer inputTimer = new SimpleTimer(); + final SimpleTimer mapTimer = new SimpleTimer(); + final SimpleTimer reduceTimer = new SimpleTimer(); + /** * Create a new nanoschedule with the desire characteristics requested by the argument * @@ -97,6 +104,19 @@ public class NanoScheduler { throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); } shutdown = true; + + if (TIME_CALLS) { + printTimerInfo("Input time", inputTimer); + printTimerInfo("Map time", mapTimer); + printTimerInfo("Reduce time", reduceTimer); + } + } + + private void printTimerInfo(final String label, final SimpleTimer timer) { + final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime() + reduceTimer.getElapsedTime(); + final double myTimeInSec = timer.getElapsedTime(); + final double myTimePercent = myTimeInSec / total * 100; + logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); } /** @@ -134,10 +154,10 @@ public class NanoScheduler { * It is safe to call this function repeatedly on a single nanoScheduler, at least until the * shutdown method is called. * - * @param inputReader - * @param map - * @param reduce - * @return + * @param inputReader an iterator providing us with the input data to nanoSchedule map/reduce over + * @param map the map function from input type -> map type, will be applied in parallel to each input + * @param reduce the reduce function from map type + reduce type -> reduce type to be applied in order to map results + * @return the last reduce value */ public ReduceType execute(final Iterator inputReader, final MapFunction map, @@ -213,7 +233,10 @@ public class NanoScheduler { // while mapQueue has something in it to reduce for ( final Future future : mapQueue ) { final MapType value = future.get(); // block until we get the values for this task + + if ( TIME_CALLS) reduceTimer.restart(); sum = reduce.apply(value, sum); + if ( TIME_CALLS) reduceTimer.stop(); } return sum; @@ -229,11 +252,15 @@ public class NanoScheduler { private List readInputs(final Iterator inputReader) { int n = 0; final List inputs = new LinkedList(); + + if ( TIME_CALLS) inputTimer.restart(); while ( inputReader.hasNext() && n < getBufferSize() ) { final InputType input = inputReader.next(); inputs.add(input); n++; } + if ( TIME_CALLS) inputTimer.stop(); + return inputs; } @@ -266,7 +293,10 @@ public class NanoScheduler { } @Override public MapType call() throws Exception { - return map.apply(input); + if ( TIME_CALLS) mapTimer.restart(); + final MapType result = map.apply(input); + if ( TIME_CALLS) mapTimer.stop(); + return result; } } }