From 7796ba7601689ea864934853e4509d2abd4dae9a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 20 Dec 2012 12:34:05 -0500 Subject: [PATCH] Minor optimizations for NanoScheduler -- Reducer.maybeReleaseLatch is no longer synchronized -- NanoScheduler only prints progress every 100 or so map calls --- .../utils/nanoScheduler/NanoScheduler.java | 18 +++++++++++++----- .../sting/utils/nanoScheduler/Reducer.java | 2 +- .../nanoScheduler/NanoSchedulerUnitTest.java | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 6aabc2c99..d8325f83e 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -43,7 +43,7 @@ import java.util.concurrent.*; public class NanoScheduler { private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; - private final static boolean LOG_MAP_TIMES = false; + protected final static int UPDATE_PROGRESS_FREQ = 100; final int bufferSize; final int nThreads; @@ -243,8 +243,7 @@ public class NanoScheduler { // map final MapType mapValue = map.apply(input); - if ( progressFunction != null ) - progressFunction.progress(input); + updateProgress(i++, input); // reduce sum = reduce.apply(mapValue, sum); @@ -254,6 +253,16 @@ public class NanoScheduler { return sum; } + /** + * Maybe update the progress meter (maybe because we don't want to do so so often that it costs cpu time) + * @param counter increasing counter to use to cut down on updates + * @param input the input we're currently at + */ + private void updateProgress(final int counter, final InputType input) { + if ( progressFunction != null && counter % UPDATE_PROGRESS_FREQ == 0 ) + progressFunction.progress(input); + } + /** * Efficient parallel version of Map/Reduce * @@ -453,8 +462,7 @@ public class NanoScheduler { // enqueue the result into the mapResultQueue result = new MapResult(mapValue, inputWrapper.getId()); - if ( progressFunction != null ) - progressFunction.progress(input); + updateProgress(inputWrapper.getId(), input); } else { // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer result = new MapResult(inputWrapper.getId()); diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 66927d073..a3d3f9056 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -117,7 +117,7 @@ class Reducer { * * Appropriate means we've seen the last job, or there's only a single job id */ - private synchronized void maybeReleaseLatch() { + private void maybeReleaseLatch() { if ( numJobsReduced == numSubmittedJobs ) { // either we've already seen the last one prevJobID == numSubmittedJobs or // the last job ID is -1, meaning that no jobs were ever submitted diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 61e8ec0a1..52cd904db 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -101,7 +101,7 @@ public class NanoSchedulerUnitTest extends BaseTest { public int nExpectedCallbacks() { int nElements = Math.max(end - start, 0); - return nElements / bufferSize; + return nElements / bufferSize / NanoScheduler.UPDATE_PROGRESS_FREQ; } public Map2x makeMap() { return addDelays ? new Map2xWithDelays() : new Map2x(); }