diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 030f8d0f2..a78ab4375 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -42,6 +42,7 @@ import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; @@ -315,6 +316,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { for ( final TraversalEngine te : allCreatedTraversalEngines) te.shutdown(); + // horrible hack to print nano scheduling information across all nano schedulers, if any were used + NanoScheduler.printCombinedRuntimeProfile(); + allCreatedTraversalEngines.clear(); availableTraversalEngines.clear(); } diff --git a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java index b3a9986c5..4c54d4126 100644 --- a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java +++ b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java @@ -145,4 +145,13 @@ public class SimpleTimer { public synchronized long getElapsedTimeNano() { return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano; } + + /** + * Add the elapsed time from toAdd to this elapsed time + * + * @param toAdd the timer whose elapsed time we want to add to this timer + */ + public synchronized void addElapsed(final SimpleTimer toAdd) { + elapsedTimeNano += toAdd.getElapsedTimeNano(); + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 29dddbc49..f5eb53456 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -29,6 +29,7 @@ class InputProducer implements Runnable { final SimpleTimer inputTimer, final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; @@ -38,11 +39,16 @@ class InputProducer implements Runnable { public void run() { try { - while ( inputReader.hasNext() ) { - if ( inputTimer != null ) inputTimer.restart(); - final InputType input = inputReader.next(); - if ( inputTimer != null ) inputTimer.stop(); - outputQueue.put(new InputValue(input)); + while ( true ) { + inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + inputTimer.stop(); + outputQueue.put(new InputValue(input)); + } } // add the EOF object so our consumer knows we are done in all inputs diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java new file mode 100644 index 000000000..874434eae --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java @@ -0,0 +1,69 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.AutoFormattingTime; +import org.broadinstitute.sting.utils.SimpleTimer; + +/** + * Holds runtime profile (input, read, map) times as tracked by NanoScheduler + * + * User: depristo + * Date: 9/10/12 + * Time: 8:31 PM + */ +public class NSRuntimeProfile { + final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside"); + final SimpleTimer inputTimer = new SimpleTimer("input"); + final SimpleTimer mapTimer = new SimpleTimer("map"); + final SimpleTimer reduceTimer = new SimpleTimer("reduce"); + + /** + * Combine the elapsed time information from other with this profile + * + * @param other a non-null profile + */ + public void combine(final NSRuntimeProfile other) { + outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer); + inputTimer.addElapsed(other.inputTimer); + mapTimer.addElapsed(other.mapTimer); + reduceTimer.addElapsed(other.reduceTimer); + } + + /** + * Print the runtime profiling to logger + * + * @param logger + */ + public void log(final Logger logger) { + log1(logger, "Input time", inputTimer); + log1(logger, "Map time", mapTimer); + log1(logger, "Reduce time", reduceTimer); + log1(logger, "Outside time", outsideSchedulerTimer); + } + + /** + * @return the total runtime for all functions of this nano scheduler + */ + @Ensures("result >= 0.0") + public double totalRuntimeInSeconds() { + return inputTimer.getElapsedTime() + + mapTimer.getElapsedTime() + + reduceTimer.getElapsedTime() + + outsideSchedulerTimer.getElapsedTime(); + } + + /** + * Print to logger.info timing information from timer, with name label + * + * @param label the name of the timer to display. Should be human readable + * @param timer the timer whose elapsed time we will display + */ + @Requires({"label != null", "timer != null"}) + private void log1(final Logger logger, final String label, final SimpleTimer timer) { + final double myTimeInSec = timer.getElapsedTime(); + final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100; + logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 664fb7b9b..bb9afa879 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -3,8 +3,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.AutoFormattingTime; -import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; @@ -46,7 +44,6 @@ public class NanoScheduler { private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean LOG_MAP_TIMES = false; - private final static boolean TIME_CALLS = true; private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; @@ -61,10 +58,15 @@ public class NanoScheduler { boolean debug = false; private NSProgressFunction progressFunction = null; - final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null; - final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null; - final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null; - final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null; + /** + * Tracks the combined runtime profiles across all created nano schedulers + */ + final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile(); + + /** + * The profile specific to this nano scheduler + */ + final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile(); /** * Create a new nanoscheduler with the desire characteristics requested by the argument @@ -92,7 +94,7 @@ public class NanoScheduler { } // start timing the time spent outside of the nanoScheduler - outsideSchedulerTimer.start(); + myNSRuntimeProfile.outsideSchedulerTimer.start(); } /** @@ -119,21 +121,31 @@ public class NanoScheduler { * After this call, execute cannot be invoked without throwing an error */ public void shutdown() { - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); + + // add my timing information to the combined NS runtime profile + combinedNSRuntimeProfiler.combine(myNSRuntimeProfile); if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); shutdownExecutor("reduceExecutor", reduceExecutor); } - shutdown = true; - if (TIME_CALLS) { - printTimerInfo("Input time", inputTimer); - printTimerInfo("Map time", mapTimer); - printTimerInfo("Reduce time", reduceTimer); - printTimerInfo("Outside time", outsideSchedulerTimer); - } + shutdown = true; + } + + public void printRuntimeProfile() { + myNSRuntimeProfile.log(logger); + } + + public static void printCombinedRuntimeProfile() { + if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 ) + combinedNSRuntimeProfiler.log(logger); + } + + protected double getTotalRuntime() { + return myNSRuntimeProfile.totalRuntimeInSeconds(); } /** @@ -154,21 +166,6 @@ public class NanoScheduler { throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!"); } - /** - * Print to logger.info timing information from timer, with name label - * - * @param label the name of the timer to display. Should be human readable - * @param timer the timer whose elapsed time we will display - */ - @Requires({"label != null", "timer != null"}) - private void printTimerInfo(final String label, final SimpleTimer timer) { - final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime() - + reduceTimer.getElapsedTime() + outsideSchedulerTimer.getElapsedTime(); - final double myTimeInSec = timer.getElapsedTime(); - final double myTimePercent = myTimeInSec / total * 100; - logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); - } - /** * @return true if this nanoScheduler is shutdown, or false if its still open for business */ @@ -246,7 +243,7 @@ public class NanoScheduler { if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); ReduceType result; if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { @@ -255,7 +252,7 @@ public class NanoScheduler { result = executeMultiThreaded(inputReader, map, initialValue, reduce); } - outsideSchedulerTimer.restart(); + myNSRuntimeProfile.outsideSchedulerTimer.restart(); return result; } @@ -272,28 +269,31 @@ public class NanoScheduler { ReduceType sum = initialValue; int i = 0; - // start timer to ensure that both hasNext and next are caught by the timer - if ( TIME_CALLS ) inputTimer.restart(); - while ( inputReader.hasNext() ) { - final InputType input = inputReader.next(); - if ( TIME_CALLS ) inputTimer.stop(); + while ( true ) { + // start timer to ensure that both hasNext and next are caught by the timer + myNSRuntimeProfile.inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + myNSRuntimeProfile.inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + myNSRuntimeProfile.inputTimer.stop(); - // map - if ( TIME_CALLS ) mapTimer.restart(); - final long preMapTime = LOG_MAP_TIMES ? 0 : mapTimer.currentTimeNano(); - final MapType mapValue = map.apply(input); - if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime)); - if ( TIME_CALLS ) mapTimer.stop(); + // map + myNSRuntimeProfile.mapTimer.restart(); + final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano(); + final MapType mapValue = map.apply(input); + if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime)); + myNSRuntimeProfile.mapTimer.stop(); - if ( i++ % inputBufferSize == 0 && progressFunction != null ) - progressFunction.progress(input); + if ( i++ % inputBufferSize == 0 && progressFunction != null ) + progressFunction.progress(input); - // reduce - if ( TIME_CALLS ) reduceTimer.restart(); - sum = reduce.apply(mapValue, sum); - if ( TIME_CALLS ) reduceTimer.stop(); - - if ( TIME_CALLS ) inputTimer.restart(); + // reduce + myNSRuntimeProfile.reduceTimer.restart(); + sum = reduce.apply(mapValue, sum); + myNSRuntimeProfile.reduceTimer.stop(); + } } return sum; @@ -321,11 +321,11 @@ public class NanoScheduler { new LinkedBlockingDeque>>(mapBufferSize); // Start running the input reader thread - inputExecutor.submit(new InputProducer(inputReader, inputTimer, inputQueue)); + inputExecutor.submit(new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue)); // Start running the reducer thread final ReducerThread reducer - = new ReducerThread(reduce, reduceTimer, initialValue, mapResultQueue); + = new ReducerThread(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue); final Future reduceResult = reduceExecutor.submit(reducer); try { @@ -382,10 +382,10 @@ public class NanoScheduler { @Override public MapResult call() { - if ( TIME_CALLS ) mapTimer.restart(); if ( debug ) debugPrint("\t\tmap " + input); + myNSRuntimeProfile.mapTimer.restart(); final MapType result = map.apply(input); - if ( TIME_CALLS ) mapTimer.stop(); + myNSRuntimeProfile.mapTimer.stop(); return new MapResult(result, id); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java index 506e45453..dcdba3490 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java @@ -29,6 +29,7 @@ class ReducerThread implements Callable { final ReduceType sum, final BlockingQueue>> mapResultQueue) { if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); + if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); this.reduce = reduce; @@ -51,9 +52,9 @@ class ReducerThread implements Callable { } else { lastJobID = result.getJobID(); // apply reduce, keeping track of sum - if ( reduceTimer != null ) reduceTimer.restart(); + reduceTimer.restart(); sum = reduce.apply(result.getValue(), sum); - if ( reduceTimer != null ) reduceTimer.stop(); + reduceTimer.stop(); } } } catch (ExecutionException ex) { diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index b3365c13c..b3986e74e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -42,7 +43,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(queueSize); - final InputProducer ip = new InputProducer(elements.iterator(), null, readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(ip); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 47dcc1d5e..a0ab493c1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.BasicConfigurator; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -86,7 +87,7 @@ public class NanoSchedulerUnitTest extends BaseTest { static NanoSchedulerBasicTest exampleTest = null; @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { - for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000, 10000000) ) { for ( final int nt : Arrays.asList(1, 2, 4) ) { for ( final int start : Arrays.asList(0) ) { for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) { @@ -114,6 +115,7 @@ public class NanoSchedulerUnitTest extends BaseTest { } private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final SimpleTimer timer = new SimpleTimer().start(); final NanoScheduler nanoScheduler = new NanoScheduler(test.bufferSize, test.nThreads); @@ -129,6 +131,17 @@ public class NanoSchedulerUnitTest extends BaseTest { Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks); nanoScheduler.shutdown(); + + // TODO -- need to enable only in the case where there's serious time spend in + // TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up + final double myTimeEstimate = timer.getElapsedTime(); + final double tolerance = 0.1; + if ( false && myTimeEstimate > 0.1 ) { + Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance, + "NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime() + + " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of " + + tolerance); + } } @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME) diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java index 61d1330bc..08771e9ec 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -61,7 +62,7 @@ public class ReducerThreadUnitTest extends BaseTest { final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); final ReducerThread thread - = new ReducerThread(reduce, null, 0, mapResultsQueue); + = new ReducerThread(reduce, new SimpleTimer(), 0, mapResultsQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); final Future value = es.submit(thread);