diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java index d0ad51cb0..464ebfcd5 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index ee5c46642..0ccb2b8cc 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.Logger; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java index 4544f376c..fd23b11d8 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java index cc5335051..1311126d0 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java index 8b12c62c4..785a7f4fd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java index 879a33a1d..8191b16c9 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; /** diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d3b8c8149..c3854eef2 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -1,3 +1,28 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; @@ -45,11 +70,18 @@ public class NanoScheduler { private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; protected final static int UPDATE_PROGRESS_FREQ = 100; + /** + * Currently not used, but kept because it's conceptual reasonable to have a buffer + */ final int bufferSize; + + /** + * The number of threads we're using to execute the map jobs in this nano scheduler + */ final int nThreads; + final ExecutorService masterExecutor; final ExecutorService mapExecutor; - final Semaphore runningMapJobSlots; final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); boolean shutdown = false; @@ -75,11 +107,9 @@ public class NanoScheduler { if ( nThreads == 1 ) { this.mapExecutor = this.masterExecutor = null; - runningMapJobSlots = null; } else { this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d")); this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d")); - runningMapJobSlots = new Semaphore(this.bufferSize); } } @@ -358,32 +388,23 @@ public class NanoScheduler { // Create the input producer and start it running final InputProducer inputProducer = new InputProducer(inputReader); - // a priority queue that stores up to bufferSize elements - // produced by completed map jobs. + // create the MapResultsQueue to store results of map jobs. final MapResultsQueue mapResultQueue = new MapResultsQueue(); - final Reducer reducer - = new Reducer(reduce, errorTracker, initialValue); + // create the reducer we'll use for this nano scheduling run + final Reducer reducer = new Reducer(reduce, errorTracker, initialValue); + + final CountDownLatch runningMapJobs = new CountDownLatch(nThreads); try { - int nSubmittedJobs = 0; - - while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { - // acquire a slot to run a map job. Blocks if too many jobs are enqueued - runningMapJobSlots.acquire(); - - mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, map, reducer)); - nSubmittedJobs++; + // create and submit the info needed by the read/map/reduce threads to do their work + for ( int i = 0; i < nThreads; i++ ) { + mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, runningMapJobs, map, reducer)); } - // mark the last job id we've submitted so we now the id to wait for - //logger.warn("setting jobs submitted to " + nSubmittedJobs); - reducer.setTotalJobCount(nSubmittedJobs); - // wait for all of the input and map threads to finish - return waitForCompletion(mapResultQueue, reducer); + return waitForCompletion(mapResultQueue, runningMapJobs, reducer); } catch (Throwable ex) { -// logger.warn("Reduce job got exception " + ex); errorTracker.notifyOfError(ex); return initialValue; } @@ -393,10 +414,10 @@ public class NanoScheduler { * Wait until the input thread and all map threads have completed running, and return the final reduce result */ private ReduceType waitForCompletion(final MapResultsQueue mapResultsQueue, + final CountDownLatch runningMapJobs, final Reducer reducer) throws InterruptedException { - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores - runningMapJobSlots.acquire(bufferSize); - runningMapJobSlots.release(bufferSize); + // wait for all the map threads to finish by waiting on the runningMapJobs latch + runningMapJobs.await(); // do a final reduce here. This is critically important because the InputMapReduce jobs // no longer block on reducing, so it's possible for all the threads to end with a few @@ -404,30 +425,11 @@ public class NanoScheduler { reducer.reduceAsMuchAsPossible(mapResultsQueue, true); // wait until we have a final reduce result - final ReduceType finalSum = reducer.waitForFinalReduce(); - + final ReduceType finalSum = reducer.getReduceResult(); // everything is finally shutdown, return the final reduce value return finalSum; } - - /** - * Should we continue to submit jobs given the number of jobs already submitted and the - * number of read items in inputProducer? - * - * We continue to submit jobs while inputProducer hasn't reached EOF or the number - * of jobs we've enqueued isn't the number of read elements. This means that in - * some cases we submit more jobs than total read elements (cannot know because of - * multi-threading) so map jobs must handle the case where getNext() returns EOF. - * - * @param nJobsSubmitted - * @param inputProducer - * @return - */ - private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNumInputValues(); - return nReadItems == -1 || nJobsSubmitted < nReadItems; - } } private class ReadMapReduceJob implements Runnable { @@ -435,13 +437,16 @@ public class NanoScheduler { final MapResultsQueue mapResultQueue; final NSMapFunction map; final Reducer reducer; + final CountDownLatch runningMapJobs; private ReadMapReduceJob(final InputProducer inputProducer, final MapResultsQueue mapResultQueue, + final CountDownLatch runningMapJobs, final NSMapFunction map, final Reducer reducer) { this.inputProducer = inputProducer; this.mapResultQueue = mapResultQueue; + this.runningMapJobs = runningMapJobs; this.map = map; this.reducer = reducer; } @@ -449,39 +454,41 @@ public class NanoScheduler { @Override public void run() { try { - // get the next item from the input producer - final InputProducer.InputValue inputWrapper = inputProducer.next(); + boolean done = false; + while ( ! done ) { + // get the next item from the input producer + final InputProducer.InputValue inputWrapper = inputProducer.next(); - // depending on inputWrapper, actually do some work or not, putting result input result object - final MapResult result; - if ( ! inputWrapper.isEOFMarker() ) { - // just skip doing anything if we don't have work to do, which is possible - // because we don't necessarily know how much input there is when we queue - // up our jobs - final InputType input = inputWrapper.getValue(); + // depending on inputWrapper, actually do some work or not, putting result input result object + final MapResult result; + if ( ! inputWrapper.isEOFMarker() ) { + // just skip doing anything if we don't have work to do, which is possible + // because we don't necessarily know how much input there is when we queue + // up our jobs + final InputType input = inputWrapper.getValue(); - // map - final MapType mapValue = map.apply(input); + // actually execute the map + final MapType mapValue = map.apply(input); - // enqueue the result into the mapResultQueue - result = new MapResult(mapValue, inputWrapper.getId()); + // enqueue the result into the mapResultQueue + result = new MapResult(mapValue, inputWrapper.getId()); - updateProgress(inputWrapper.getId(), input); - } else { - // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer - result = new MapResult(inputWrapper.getId()); + mapResultQueue.put(result); + + // reduce as much as possible, without blocking, if another thread is already doing reduces + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false); + + updateProgress(inputWrapper.getId(), input); + } else { + done = true; + } } - - mapResultQueue.put(result); - - // reduce as much as possible, without blocking, if another thread is already doing reduces - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false); } catch (Throwable ex) { errorTracker.notifyOfError(ex); } finally { // we finished a map job, release the job queue semaphore - runningMapJobSlots.release(); + runningMapJobs.countDown(); } } } -} +} \ No newline at end of file diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 25e8b1fe6..294065838 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -1,39 +1,67 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * Reducer supporting two-threaded reduce of the map/reduce. + * Reducer supporting multi-threaded reduce of the map/reduce. * - * The first thread, using the reduceAsMuchAsPossible function, actually reduces the data - * as it arrives in the blockingQueue. + * reduceAsMuchAsPossible is the key function. Multiple threads can call into this, providing + * the map results queue, and this class accumulates the result of calling reduce + * on the maps objects. reduceAsMuchAsPossible isn't directly synchronized, but manages multi-threading + * directly with a lock. Threads can request either to block on the reduce call until it can be + * executed, or immediately exit if the lock isn't available. That allows multi-threaded users + * to avoid piling up waiting to reduce while one thread is reducing. They can instead immediately + * leave to go do something else productive * - * The second thread, using the waitForFinalReduce, can block on this data structure - * until that all jobs have arrived and been reduced. - * - * The key function for communication here is setTotalJobCount(), which the thread that submits - * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the - * Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs - * have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. - * - * This thread reads from mapResultsQueue until the poison EOF object arrives. At each - * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the - * queue waits until the mapResultQueue has a value to take. Then, it gets and waits - * until the map result Future has a value. + * @author depristo + * @since 2012 */ class Reducer { private final static Logger logger = Logger.getLogger(Reducer.class); - private final static int UNSET_NUM_SUBMITTED_JOBS = -2; - private final CountDownLatch countDownLatch = new CountDownLatch(1); + /** + * The reduce function to execute + */ private final NSReduceFunction reduce; + + /** + * Used to communicate errors to the outer master thread + */ private final MultiThreadedErrorTracker errorTracker; + + /** + * Lock used to protect the call reduceAsMuchAsPossible from race conditions + */ private final Lock reduceLock = new ReentrantLock(); /** @@ -42,13 +70,6 @@ class Reducer { */ ReduceType sum; - int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set - - /** - * A counter keeping track of the number of jobs we're reduced - */ - int numJobsReduced = 0; - /** * Create a new Reducer that will apply the reduce function with initialSum value * to values via reduceAsMuchAsPossible, timing the reduce function call costs with @@ -96,14 +117,10 @@ class Reducer { // apply reduce, keeping track of sum sum = reduce.apply(result.getValue(), sum); } - - numJobsReduced++; - maybeReleaseLatch(); } } } catch (Exception ex) { errorTracker.notifyOfError(ex); - countDownLatch.countDown(); } finally { if ( haveLock ) // if we acquired the lock, unlock it releaseReduceLock(); @@ -138,64 +155,15 @@ class Reducer { } /** - * release the latch if appropriate + * Get the current reduce result resulting from applying reduce(...) to all MapResult elements. * - * Appropriate means we've seen the last job, or there's only a single job id - */ - private void maybeReleaseLatch() { - if ( numJobsReduced == numSubmittedJobs ) { - // either we've already seen the last one prevJobID == numSubmittedJobs or - // the last job ID is -1, meaning that no jobs were ever submitted - countDownLatch.countDown(); - } - } - - /** - * For testing only - * - * @return true if latch is released - */ - protected synchronized boolean latchIsReleased() { - return countDownLatch.getCount() == 0; - } - - /** - * Key function: tell this class the total number of jobs will provide data in the mapResultsQueue - * - * The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs - * MapResults from the queue, those threads are released. - * - * Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints. - * First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no - * data coming) so the latch should be opened immediately. If it's >= 1, we will wait until - * we see numOfSubmittedJobs jobs before freeing them. - * - * Note that we throw an IllegalStateException if this function is called twice. - * - * @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will - * enqueue results into our queue - */ - public synchronized void setTotalJobCount(final int numOfSubmittedJobs) { - if ( numOfSubmittedJobs < 0 ) - throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs); - if ( numJobsReduced > numOfSubmittedJobs ) - throw new IllegalArgumentException("numOfSubmittedJobs " + numOfSubmittedJobs + " < numJobsReduced " + numJobsReduced); - if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS) - throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); - - this.numSubmittedJobs = numOfSubmittedJobs; - maybeReleaseLatch(); - } - - /** - * Block until the last job has submitted its MapResult to our queue, and we've reduced it, and - * return the reduce result resulting from applying reduce(...) to all MapResult elements. + * Note that this method cannot know if future reduce calls are coming in. So it simply gets + * the current reduce result. It is up to the caller to know whether the returned value is + * a partial result, or the full final value * * @return the total reduce result across all jobs - * @throws InterruptedException */ - public ReduceType waitForFinalReduce() throws InterruptedException { - countDownLatch.await(); + public ReduceType getReduceResult() { return sum; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 5db5acde0..4fd875c0e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -27,19 +27,17 @@ public class ReducerUnitTest extends BaseTest { List tests = new ArrayList(); for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) { - for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) { - for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { - if ( groupSize < nElements ) { - for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { - tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize }); - } + for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { + if ( groupSize < nElements ) { + for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { + tests.add(new Object[]{ new ListOfJobs(jobs), groupSize }); } } + } - for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { - if ( groupSize < nElements ) { - tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize }); - } + for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { + if ( groupSize < nElements ) { + tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), groupSize }); } } } @@ -77,11 +75,7 @@ public class ReducerUnitTest extends BaseTest { } @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) - public void testReducerThread(final List> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception { - runTests(jobs, setJobIDAtStart, groupSize); - } - - private void runTests( final List> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception { + public void testReducerThread(final List> allJobs, int groupSize) throws Exception { if ( groupSize == -1 ) groupSize = allJobs.size(); @@ -99,7 +93,6 @@ public class ReducerUnitTest extends BaseTest { int nJobsSubmitted = 0; int jobGroupCount = 0; final int lastJobGroupCount = jobGroups.size() - 1; - setJobIDAtStart = setJobIDAtStart && groupSize == 1; for ( final List> jobs : jobGroups ) { //logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs"); @@ -114,48 +107,17 @@ public class ReducerUnitTest extends BaseTest { nJobsSubmitted++; } - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start"); - - if ( jobGroupCount == 0 && setJobIDAtStart ) { - // only can do the setJobID if jobs cannot be submitted out of order - reducer.setTotalJobCount(allJobs.size()+1); - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); - } - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true); Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted"); - if ( setJobIDAtStart ) { - final boolean submittedLastJob = jobGroupCount == lastJobGroupCount; - Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob, - "When last job is set, latch should only be released if the last job has been submitted"); - } else { - Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release"); - } - jobGroupCount++; } - if ( setJobIDAtStart ) - Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); - else { - Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); - reducer.setTotalJobCount(allJobs.size() + 1); - Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); - } - Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue"); es.shutdown(); es.awaitTermination(1, TimeUnit.HOURS); } - @Test(enabled = true, expectedExceptions = IllegalStateException.class) - private void runSettingJobIDTwice() throws Exception { - final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); - reducer.setTotalJobCount(10); - reducer.setTotalJobCount(15); - } - @Test(timeOut = 1000, invocationCount = 100) private void testNonBlockingReduce() throws Exception { final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); @@ -242,12 +204,8 @@ public class ReducerUnitTest extends BaseTest { @Override public void run() { - try { - final int observedSum = reducer.waitForFinalReduce(); - Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); - } catch ( InterruptedException ex ) { - Assert.fail("Got interrupted"); - } + final int observedSum = reducer.getReduceResult(); + Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); } } } \ No newline at end of file