From 295455eee248fc1932e49b7e11f1f84ff743216f Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 23 Dec 2012 08:53:49 -0500 Subject: [PATCH] NanoScheduler optimizations and simplification -- The previous model was to enqueue individual map jobs (with a resolution of 1 map job per map call), to track the number of map calls submitted via a counter and a semaphore, and to use this information in each map job and reduce to control the number of map jobs, when reduce was complete, etc. All hideously complex. -- This new model is vastly simply. The reducer basically knows nothing about the control mechanisms in the NanoScheduler. It just supports multi-threaded reduce. The NanoScheduler enqueues exactly nThread jobs to be run, which continually loop reading, mapping, and reducing until they run out of material to read, when they shut down. The master thread of the NS just holds a CountDownLatch, initialized to nThreads, and when each thread exits it reduces the latch by 1. The master thread gets the final reduce result when its free by the latch reaching 0. It's all super super simple. -- Because this model uses vastly fewer synchronization primitives within the NS itself, it's naturally much faster at getting things done, without any of the overhead obvious in profiles of BQSR -nct 2. --- .../utils/nanoScheduler/EOFMarkedValue.java | 25 +++ .../utils/nanoScheduler/InputProducer.java | 25 +++ .../sting/utils/nanoScheduler/MapResult.java | 25 +++ .../utils/nanoScheduler/NSMapFunction.java | 25 +++ .../nanoScheduler/NSProgressFunction.java | 25 +++ .../utils/nanoScheduler/NSReduceFunction.java | 25 +++ .../utils/nanoScheduler/NanoScheduler.java | 143 +++++++++--------- .../sting/utils/nanoScheduler/Reducer.java | 134 +++++++--------- .../utils/nanoScheduler/ReducerUnitTest.java | 64 ++------ 9 files changed, 287 insertions(+), 204 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java index d0ad51cb0..464ebfcd5 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index ee5c46642..0ccb2b8cc 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.Logger; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java index 4544f376c..fd23b11d8 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java index cc5335051..1311126d0 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java index 8b12c62c4..785a7f4fd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java index 879a33a1d..8191b16c9 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d3b8c8149..c3854eef2 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; @@ -45,11 +70,18 @@ public class NanoScheduler { private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; protected final static int UPDATE_PROGRESS_FREQ = 100; + /** + * Currently not used, but kept because it's conceptual reasonable to have a buffer + */ final int bufferSize; + + /** + * The number of threads we're using to execute the map jobs in this nano scheduler + */ final int nThreads; + final ExecutorService masterExecutor; final ExecutorService mapExecutor; - final Semaphore runningMapJobSlots; final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); boolean shutdown = false; @@ -75,11 +107,9 @@ public class NanoScheduler { if ( nThreads == 1 ) { this.mapExecutor = this.masterExecutor = null; - runningMapJobSlots = null; } else { this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d")); this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d")); - runningMapJobSlots = new Semaphore(this.bufferSize); } } @@ -358,32 +388,23 @@ public class NanoScheduler { // Create the input producer and start it running final InputProducer inputProducer = new InputProducer(inputReader); - // a priority queue that stores up to bufferSize elements - // produced by completed map jobs. + // create the MapResultsQueue to store results of map jobs. final MapResultsQueue mapResultQueue = new MapResultsQueue(); - final Reducer reducer - = new Reducer(reduce, errorTracker, initialValue); + // create the reducer we'll use for this nano scheduling run + final Reducer reducer = new Reducer(reduce, errorTracker, initialValue); + + final CountDownLatch runningMapJobs = new CountDownLatch(nThreads); try { - int nSubmittedJobs = 0; - - while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { - // acquire a slot to run a map job. Blocks if too many jobs are enqueued - runningMapJobSlots.acquire(); - - mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, map, reducer)); - nSubmittedJobs++; + // create and submit the info needed by the read/map/reduce threads to do their work + for ( int i = 0; i < nThreads; i++ ) { + mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, runningMapJobs, map, reducer)); } - // mark the last job id we've submitted so we now the id to wait for - //logger.warn("setting jobs submitted to " + nSubmittedJobs); - reducer.setTotalJobCount(nSubmittedJobs); - // wait for all of the input and map threads to finish - return waitForCompletion(mapResultQueue, reducer); + return waitForCompletion(mapResultQueue, runningMapJobs, reducer); } catch (Throwable ex) { -// logger.warn("Reduce job got exception " + ex); errorTracker.notifyOfError(ex); return initialValue; } @@ -393,10 +414,10 @@ public class NanoScheduler { * Wait until the input thread and all map threads have completed running, and return the final reduce result */ private ReduceType waitForCompletion(final MapResultsQueue mapResultsQueue, + final CountDownLatch runningMapJobs, final Reducer reducer) throws InterruptedException { - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores - runningMapJobSlots.acquire(bufferSize); - runningMapJobSlots.release(bufferSize); + // wait for all the map threads to finish by waiting on the runningMapJobs latch + runningMapJobs.await(); // do a final reduce here. This is critically important because the InputMapReduce jobs // no longer block on reducing, so it's possible for all the threads to end with a few @@ -404,30 +425,11 @@ public class NanoScheduler { reducer.reduceAsMuchAsPossible(mapResultsQueue, true); // wait until we have a final reduce result - final ReduceType finalSum = reducer.waitForFinalReduce(); - + final ReduceType finalSum = reducer.getReduceResult(); // everything is finally shutdown, return the final reduce value return finalSum; } - - /** - * Should we continue to submit jobs given the number of jobs already submitted and the - * number of read items in inputProducer? - * - * We continue to submit jobs while inputProducer hasn't reached EOF or the number - * of jobs we've enqueued isn't the number of read elements. This means that in - * some cases we submit more jobs than total read elements (cannot know because of - * multi-threading) so map jobs must handle the case where getNext() returns EOF. - * - * @param nJobsSubmitted - * @param inputProducer - * @return - */ - private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNumInputValues(); - return nReadItems == -1 || nJobsSubmitted < nReadItems; - } } private class ReadMapReduceJob implements Runnable { @@ -435,13 +437,16 @@ public class NanoScheduler { final MapResultsQueue mapResultQueue; final NSMapFunction map; final Reducer reducer; + final CountDownLatch runningMapJobs; private ReadMapReduceJob(final InputProducer inputProducer, final MapResultsQueue mapResultQueue, + final CountDownLatch runningMapJobs, final NSMapFunction map, final Reducer reducer) { this.inputProducer = inputProducer; this.mapResultQueue = mapResultQueue; + this.runningMapJobs = runningMapJobs; this.map = map; this.reducer = reducer; } @@ -449,39 +454,41 @@ public class NanoScheduler { @Override public void run() { try { - // get the next item from the input producer - final InputProducer.InputValue inputWrapper = inputProducer.next(); + boolean done = false; + while ( ! done ) { + // get the next item from the input producer + final InputProducer.InputValue inputWrapper = inputProducer.next(); - // depending on inputWrapper, actually do some work or not, putting result input result object - final MapResult result; - if ( ! inputWrapper.isEOFMarker() ) { - // just skip doing anything if we don't have work to do, which is possible - // because we don't necessarily know how much input there is when we queue - // up our jobs - final InputType input = inputWrapper.getValue(); + // depending on inputWrapper, actually do some work or not, putting result input result object + final MapResult result; + if ( ! inputWrapper.isEOFMarker() ) { + // just skip doing anything if we don't have work to do, which is possible + // because we don't necessarily know how much input there is when we queue + // up our jobs + final InputType input = inputWrapper.getValue(); - // map - final MapType mapValue = map.apply(input); + // actually execute the map + final MapType mapValue = map.apply(input); - // enqueue the result into the mapResultQueue - result = new MapResult(mapValue, inputWrapper.getId()); + // enqueue the result into the mapResultQueue + result = new MapResult(mapValue, inputWrapper.getId()); - updateProgress(inputWrapper.getId(), input); - } else { - // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer - result = new MapResult(inputWrapper.getId()); + mapResultQueue.put(result); + + // reduce as much as possible, without blocking, if another thread is already doing reduces + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false); + + updateProgress(inputWrapper.getId(), input); + } else { + done = true; + } } - - mapResultQueue.put(result); - - // reduce as much as possible, without blocking, if another thread is already doing reduces - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false); } catch (Throwable ex) { errorTracker.notifyOfError(ex); } finally { // we finished a map job, release the job queue semaphore - runningMapJobSlots.release(); + runningMapJobs.countDown(); } } } -} +} \ No newline at end of file diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 25e8b1fe6..294065838 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -1,39 +1,67 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * Reducer supporting two-threaded reduce of the map/reduce. + * Reducer supporting multi-threaded reduce of the map/reduce. * - * The first thread, using the reduceAsMuchAsPossible function, actually reduces the data - * as it arrives in the blockingQueue. + * reduceAsMuchAsPossible is the key function. Multiple threads can call into this, providing + * the map results queue, and this class accumulates the result of calling reduce + * on the maps objects. reduceAsMuchAsPossible isn't directly synchronized, but manages multi-threading + * directly with a lock. Threads can request either to block on the reduce call until it can be + * executed, or immediately exit if the lock isn't available. That allows multi-threaded users + * to avoid piling up waiting to reduce while one thread is reducing. They can instead immediately + * leave to go do something else productive * - * The second thread, using the waitForFinalReduce, can block on this data structure - * until that all jobs have arrived and been reduced. - * - * The key function for communication here is setTotalJobCount(), which the thread that submits - * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the - * Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs - * have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. - * - * This thread reads from mapResultsQueue until the poison EOF object arrives. At each - * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the - * queue waits until the mapResultQueue has a value to take. Then, it gets and waits - * until the map result Future has a value. + * @author depristo + * @since 2012 */ class Reducer { private final static Logger logger = Logger.getLogger(Reducer.class); - private final static int UNSET_NUM_SUBMITTED_JOBS = -2; - private final CountDownLatch countDownLatch = new CountDownLatch(1); + /** + * The reduce function to execute + */ private final NSReduceFunction reduce; + + /** + * Used to communicate errors to the outer master thread + */ private final MultiThreadedErrorTracker errorTracker; + + /** + * Lock used to protect the call reduceAsMuchAsPossible from race conditions + */ private final Lock reduceLock = new ReentrantLock(); /** @@ -42,13 +70,6 @@ class Reducer { */ ReduceType sum; - int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set - - /** - * A counter keeping track of the number of jobs we're reduced - */ - int numJobsReduced = 0; - /** * Create a new Reducer that will apply the reduce function with initialSum value * to values via reduceAsMuchAsPossible, timing the reduce function call costs with @@ -96,14 +117,10 @@ class Reducer { // apply reduce, keeping track of sum sum = reduce.apply(result.getValue(), sum); } - - numJobsReduced++; - maybeReleaseLatch(); } } } catch (Exception ex) { errorTracker.notifyOfError(ex); - countDownLatch.countDown(); } finally { if ( haveLock ) // if we acquired the lock, unlock it releaseReduceLock(); @@ -138,64 +155,15 @@ class Reducer { } /** - * release the latch if appropriate + * Get the current reduce result resulting from applying reduce(...) to all MapResult elements. * - * Appropriate means we've seen the last job, or there's only a single job id - */ - private void maybeReleaseLatch() { - if ( numJobsReduced == numSubmittedJobs ) { - // either we've already seen the last one prevJobID == numSubmittedJobs or - // the last job ID is -1, meaning that no jobs were ever submitted - countDownLatch.countDown(); - } - } - - /** - * For testing only - * - * @return true if latch is released - */ - protected synchronized boolean latchIsReleased() { - return countDownLatch.getCount() == 0; - } - - /** - * Key function: tell this class the total number of jobs will provide data in the mapResultsQueue - * - * The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs - * MapResults from the queue, those threads are released. - * - * Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints. - * First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no - * data coming) so the latch should be opened immediately. If it's >= 1, we will wait until - * we see numOfSubmittedJobs jobs before freeing them. - * - * Note that we throw an IllegalStateException if this function is called twice. - * - * @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will - * enqueue results into our queue - */ - public synchronized void setTotalJobCount(final int numOfSubmittedJobs) { - if ( numOfSubmittedJobs < 0 ) - throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs); - if ( numJobsReduced > numOfSubmittedJobs ) - throw new IllegalArgumentException("numOfSubmittedJobs " + numOfSubmittedJobs + " < numJobsReduced " + numJobsReduced); - if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS) - throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); - - this.numSubmittedJobs = numOfSubmittedJobs; - maybeReleaseLatch(); - } - - /** - * Block until the last job has submitted its MapResult to our queue, and we've reduced it, and - * return the reduce result resulting from applying reduce(...) to all MapResult elements. + * Note that this method cannot know if future reduce calls are coming in. So it simply gets + * the current reduce result. It is up to the caller to know whether the returned value is + * a partial result, or the full final value * * @return the total reduce result across all jobs - * @throws InterruptedException */ - public ReduceType waitForFinalReduce() throws InterruptedException { - countDownLatch.await(); + public ReduceType getReduceResult() { return sum; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 5db5acde0..4fd875c0e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -27,19 +27,17 @@ public class ReducerUnitTest extends BaseTest { List tests = new ArrayList(); for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) { - for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) { - for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { - if ( groupSize < nElements ) { - for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { - tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize }); - } + for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { + if ( groupSize < nElements ) { + for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { + tests.add(new Object[]{ new ListOfJobs(jobs), groupSize }); } } + } - for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { - if ( groupSize < nElements ) { - tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize }); - } + for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { + if ( groupSize < nElements ) { + tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), groupSize }); } } } @@ -77,11 +75,7 @@ public class ReducerUnitTest extends BaseTest { } @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) - public void testReducerThread(final List> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception { - runTests(jobs, setJobIDAtStart, groupSize); - } - - private void runTests( final List> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception { + public void testReducerThread(final List> allJobs, int groupSize) throws Exception { if ( groupSize == -1 ) groupSize = allJobs.size(); @@ -99,7 +93,6 @@ public class ReducerUnitTest extends BaseTest { int nJobsSubmitted = 0; int jobGroupCount = 0; final int lastJobGroupCount = jobGroups.size() - 1; - setJobIDAtStart = setJobIDAtStart && groupSize == 1; for ( final List> jobs : jobGroups ) { //logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs"); @@ -114,48 +107,17 @@ public class ReducerUnitTest extends BaseTest { nJobsSubmitted++; } - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start"); - - if ( jobGroupCount == 0 && setJobIDAtStart ) { - // only can do the setJobID if jobs cannot be submitted out of order - reducer.setTotalJobCount(allJobs.size()+1); - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); - } - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true); Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted"); - if ( setJobIDAtStart ) { - final boolean submittedLastJob = jobGroupCount == lastJobGroupCount; - Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob, - "When last job is set, latch should only be released if the last job has been submitted"); - } else { - Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release"); - } - jobGroupCount++; } - if ( setJobIDAtStart ) - Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); - else { - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); - reducer.setTotalJobCount(allJobs.size() + 1); - Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); - } - Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue"); es.shutdown(); es.awaitTermination(1, TimeUnit.HOURS); } - @Test(enabled = true, expectedExceptions = IllegalStateException.class) - private void runSettingJobIDTwice() throws Exception { - final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); - reducer.setTotalJobCount(10); - reducer.setTotalJobCount(15); - } - @Test(timeOut = 1000, invocationCount = 100) private void testNonBlockingReduce() throws Exception { final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); @@ -242,12 +204,8 @@ public class ReducerUnitTest extends BaseTest { @Override public void run() { - try { - final int observedSum = reducer.waitForFinalReduce(); - Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); - } catch ( InterruptedException ex ) { - Assert.fail("Got interrupted"); - } + final int observedSum = reducer.getReduceResult(); + Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); } } } \ No newline at end of file