NanoScheduler tracks time within input, map, and reduce

-- Helpful for understanding where the time goes to each bit of the code.
-- Controlled by a local static boolean, to avoid the potential overhead in general
This commit is contained in:
Mark DePristo 2012-09-01 12:29:59 -04:00
parent 7087b22ea3
commit 800a27c3a7
1 changed files with 35 additions and 5 deletions

View File

@ -3,6 +3,8 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures; import com.google.java.contract.Ensures;
import com.google.java.contract.Requires; import com.google.java.contract.Requires;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.AutoFormattingTime;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator; import java.util.Iterator;
@ -44,6 +46,7 @@ import java.util.concurrent.*;
public class NanoScheduler<InputType, MapType, ReduceType> { public class NanoScheduler<InputType, MapType, ReduceType> {
private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static Logger logger = Logger.getLogger(NanoScheduler.class);
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
private final static boolean TIME_CALLS = true;
final int bufferSize; final int bufferSize;
final int nThreads; final int nThreads;
@ -51,6 +54,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
boolean shutdown = false; boolean shutdown = false;
boolean debug = false; boolean debug = false;
final SimpleTimer inputTimer = new SimpleTimer();
final SimpleTimer mapTimer = new SimpleTimer();
final SimpleTimer reduceTimer = new SimpleTimer();
/** /**
* Create a new nanoschedule with the desire characteristics requested by the argument * Create a new nanoschedule with the desire characteristics requested by the argument
* *
@ -97,6 +104,19 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!");
} }
shutdown = true; shutdown = true;
if (TIME_CALLS) {
printTimerInfo("Input time", inputTimer);
printTimerInfo("Map time", mapTimer);
printTimerInfo("Reduce time", reduceTimer);
}
}
private void printTimerInfo(final String label, final SimpleTimer timer) {
final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime() + reduceTimer.getElapsedTime();
final double myTimeInSec = timer.getElapsedTime();
final double myTimePercent = myTimeInSec / total * 100;
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
} }
/** /**
@ -134,10 +154,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* It is safe to call this function repeatedly on a single nanoScheduler, at least until the * It is safe to call this function repeatedly on a single nanoScheduler, at least until the
* shutdown method is called. * shutdown method is called.
* *
* @param inputReader * @param inputReader an iterator providing us with the input data to nanoSchedule map/reduce over
* @param map * @param map the map function from input type -> map type, will be applied in parallel to each input
* @param reduce * @param reduce the reduce function from map type + reduce type -> reduce type to be applied in order to map results
* @return * @return the last reduce value
*/ */
public ReduceType execute(final Iterator<InputType> inputReader, public ReduceType execute(final Iterator<InputType> inputReader,
final MapFunction<InputType, MapType> map, final MapFunction<InputType, MapType> map,
@ -213,7 +233,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// while mapQueue has something in it to reduce // while mapQueue has something in it to reduce
for ( final Future<MapType> future : mapQueue ) { for ( final Future<MapType> future : mapQueue ) {
final MapType value = future.get(); // block until we get the values for this task final MapType value = future.get(); // block until we get the values for this task
if ( TIME_CALLS) reduceTimer.restart();
sum = reduce.apply(value, sum); sum = reduce.apply(value, sum);
if ( TIME_CALLS) reduceTimer.stop();
} }
return sum; return sum;
@ -229,11 +252,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
private List<InputType> readInputs(final Iterator<InputType> inputReader) { private List<InputType> readInputs(final Iterator<InputType> inputReader) {
int n = 0; int n = 0;
final List<InputType> inputs = new LinkedList<InputType>(); final List<InputType> inputs = new LinkedList<InputType>();
if ( TIME_CALLS) inputTimer.restart();
while ( inputReader.hasNext() && n < getBufferSize() ) { while ( inputReader.hasNext() && n < getBufferSize() ) {
final InputType input = inputReader.next(); final InputType input = inputReader.next();
inputs.add(input); inputs.add(input);
n++; n++;
} }
if ( TIME_CALLS) inputTimer.stop();
return inputs; return inputs;
} }
@ -266,7 +293,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
} }
@Override public MapType call() throws Exception { @Override public MapType call() throws Exception {
return map.apply(input); if ( TIME_CALLS) mapTimer.restart();
final MapType result = map.apply(input);
if ( TIME_CALLS) mapTimer.stop();
return result;
} }
} }
} }