NanoScheduler optimizations and simplification

-- The previous model was to enqueue individual map jobs (with a resolution of 1 map job per map call), to track the number of map calls submitted via a counter and a semaphore, and to use this information in each map job and reduce to control the number of map jobs, when reduce was complete, etc.  All hideously complex.
-- This new model is vastly simply.  The reducer basically knows nothing about the control mechanisms in the NanoScheduler.  It just supports multi-threaded reduce.  The NanoScheduler enqueues exactly nThread jobs to be run, which continually loop reading, mapping, and reducing until they run out of material to read, when they shut down.  The master thread of the NS just holds a CountDownLatch, initialized to nThreads, and when each thread exits it reduces the latch by 1.  The master thread gets the final reduce result when its free by the latch reaching 0.  It's all super super simple.
-- Because this model uses vastly fewer synchronization primitives within the NS itself, it's naturally much faster at getting things done, without any of the overhead obvious in profiles of BQSR -nct 2.
This commit is contained in:
Mark DePristo 2012-12-23 08:53:49 -05:00
parent aa3ee29929
commit 295455eee2
9 changed files with 287 additions and 204 deletions

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
@ -45,11 +70,18 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
protected final static int UPDATE_PROGRESS_FREQ = 100;
/**
* Currently not used, but kept because it's conceptual reasonable to have a buffer
*/
final int bufferSize;
/**
* The number of threads we're using to execute the map jobs in this nano scheduler
*/
final int nThreads;
final ExecutorService masterExecutor;
final ExecutorService mapExecutor;
final Semaphore runningMapJobSlots;
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
boolean shutdown = false;
@ -75,11 +107,9 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( nThreads == 1 ) {
this.mapExecutor = this.masterExecutor = null;
runningMapJobSlots = null;
} else {
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d"));
runningMapJobSlots = new Semaphore(this.bufferSize);
}
}
@ -358,32 +388,23 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// Create the input producer and start it running
final InputProducer<InputType> inputProducer = new InputProducer<InputType>(inputReader);
// a priority queue that stores up to bufferSize elements
// produced by completed map jobs.
// create the MapResultsQueue to store results of map jobs.
final MapResultsQueue<MapType> mapResultQueue = new MapResultsQueue<MapType>();
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
// create the reducer we'll use for this nano scheduling run
final Reducer<MapType, ReduceType> reducer = new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
final CountDownLatch runningMapJobs = new CountDownLatch(nThreads);
try {
int nSubmittedJobs = 0;
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
runningMapJobSlots.acquire();
mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, map, reducer));
nSubmittedJobs++;
// create and submit the info needed by the read/map/reduce threads to do their work
for ( int i = 0; i < nThreads; i++ ) {
mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, runningMapJobs, map, reducer));
}
// mark the last job id we've submitted so we now the id to wait for
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
reducer.setTotalJobCount(nSubmittedJobs);
// wait for all of the input and map threads to finish
return waitForCompletion(mapResultQueue, reducer);
return waitForCompletion(mapResultQueue, runningMapJobs, reducer);
} catch (Throwable ex) {
// logger.warn("Reduce job got exception " + ex);
errorTracker.notifyOfError(ex);
return initialValue;
}
@ -393,10 +414,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* Wait until the input thread and all map threads have completed running, and return the final reduce result
*/
private ReduceType waitForCompletion(final MapResultsQueue<MapType> mapResultsQueue,
final CountDownLatch runningMapJobs,
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
runningMapJobSlots.acquire(bufferSize);
runningMapJobSlots.release(bufferSize);
// wait for all the map threads to finish by waiting on the runningMapJobs latch
runningMapJobs.await();
// do a final reduce here. This is critically important because the InputMapReduce jobs
// no longer block on reducing, so it's possible for all the threads to end with a few
@ -404,30 +425,11 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
// wait until we have a final reduce result
final ReduceType finalSum = reducer.waitForFinalReduce();
final ReduceType finalSum = reducer.getReduceResult();
// everything is finally shutdown, return the final reduce value
return finalSum;
}
/**
* Should we continue to submit jobs given the number of jobs already submitted and the
* number of read items in inputProducer?
*
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
* of jobs we've enqueued isn't the number of read elements. This means that in
* some cases we submit more jobs than total read elements (cannot know because of
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
*
* @param nJobsSubmitted
* @param inputProducer
* @return
*/
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems;
}
}
private class ReadMapReduceJob implements Runnable {
@ -435,13 +437,16 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final MapResultsQueue<MapType> mapResultQueue;
final NSMapFunction<InputType, MapType> map;
final Reducer<MapType, ReduceType> reducer;
final CountDownLatch runningMapJobs;
private ReadMapReduceJob(final InputProducer<InputType> inputProducer,
final MapResultsQueue<MapType> mapResultQueue,
final CountDownLatch runningMapJobs,
final NSMapFunction<InputType, MapType> map,
final Reducer<MapType, ReduceType> reducer) {
this.inputProducer = inputProducer;
this.mapResultQueue = mapResultQueue;
this.runningMapJobs = runningMapJobs;
this.map = map;
this.reducer = reducer;
}
@ -449,39 +454,41 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
@Override
public void run() {
try {
// get the next item from the input producer
final InputProducer<InputType>.InputValue inputWrapper = inputProducer.next();
boolean done = false;
while ( ! done ) {
// get the next item from the input producer
final InputProducer<InputType>.InputValue inputWrapper = inputProducer.next();
// depending on inputWrapper, actually do some work or not, putting result input result object
final MapResult<MapType> result;
if ( ! inputWrapper.isEOFMarker() ) {
// just skip doing anything if we don't have work to do, which is possible
// because we don't necessarily know how much input there is when we queue
// up our jobs
final InputType input = inputWrapper.getValue();
// depending on inputWrapper, actually do some work or not, putting result input result object
final MapResult<MapType> result;
if ( ! inputWrapper.isEOFMarker() ) {
// just skip doing anything if we don't have work to do, which is possible
// because we don't necessarily know how much input there is when we queue
// up our jobs
final InputType input = inputWrapper.getValue();
// map
final MapType mapValue = map.apply(input);
// actually execute the map
final MapType mapValue = map.apply(input);
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, inputWrapper.getId());
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, inputWrapper.getId());
updateProgress(inputWrapper.getId(), input);
} else {
// if there's no input we push empty MapResults with jobIDs for synchronization with Reducer
result = new MapResult<MapType>(inputWrapper.getId());
mapResultQueue.put(result);
// reduce as much as possible, without blocking, if another thread is already doing reduces
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false);
updateProgress(inputWrapper.getId(), input);
} else {
done = true;
}
}
mapResultQueue.put(result);
// reduce as much as possible, without blocking, if another thread is already doing reduces
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false);
} catch (Throwable ex) {
errorTracker.notifyOfError(ex);
} finally {
// we finished a map job, release the job queue semaphore
runningMapJobSlots.release();
runningMapJobs.countDown();
}
}
}
}
}

View File

@ -1,39 +1,67 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Reducer supporting two-threaded reduce of the map/reduce.
* Reducer supporting multi-threaded reduce of the map/reduce.
*
* The first thread, using the reduceAsMuchAsPossible function, actually reduces the data
* as it arrives in the blockingQueue.
* reduceAsMuchAsPossible is the key function. Multiple threads can call into this, providing
* the map results queue, and this class accumulates the result of calling reduce
* on the maps objects. reduceAsMuchAsPossible isn't directly synchronized, but manages multi-threading
* directly with a lock. Threads can request either to block on the reduce call until it can be
* executed, or immediately exit if the lock isn't available. That allows multi-threaded users
* to avoid piling up waiting to reduce while one thread is reducing. They can instead immediately
* leave to go do something else productive
*
* The second thread, using the waitForFinalReduce, can block on this data structure
* until that all jobs have arrived and been reduced.
*
* The key function for communication here is setTotalJobCount(), which the thread that submits
* jobs that enqueue MapResults into the blocking queue must call ONCE to tell the
* Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs
* have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed.
*
* This thread reads from mapResultsQueue until the poison EOF object arrives. At each
* stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the
* queue waits until the mapResultQueue has a value to take. Then, it gets and waits
* until the map result Future has a value.
* @author depristo
* @since 2012
*/
class Reducer<MapType, ReduceType> {
private final static Logger logger = Logger.getLogger(Reducer.class);
private final static int UNSET_NUM_SUBMITTED_JOBS = -2;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* The reduce function to execute
*/
private final NSReduceFunction<MapType, ReduceType> reduce;
/**
* Used to communicate errors to the outer master thread
*/
private final MultiThreadedErrorTracker errorTracker;
/**
* Lock used to protect the call reduceAsMuchAsPossible from race conditions
*/
private final Lock reduceLock = new ReentrantLock();
/**
@ -42,13 +70,6 @@ class Reducer<MapType, ReduceType> {
*/
ReduceType sum;
int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set
/**
* A counter keeping track of the number of jobs we're reduced
*/
int numJobsReduced = 0;
/**
* Create a new Reducer that will apply the reduce function with initialSum value
* to values via reduceAsMuchAsPossible, timing the reduce function call costs with
@ -96,14 +117,10 @@ class Reducer<MapType, ReduceType> {
// apply reduce, keeping track of sum
sum = reduce.apply(result.getValue(), sum);
}
numJobsReduced++;
maybeReleaseLatch();
}
}
} catch (Exception ex) {
errorTracker.notifyOfError(ex);
countDownLatch.countDown();
} finally {
if ( haveLock ) // if we acquired the lock, unlock it
releaseReduceLock();
@ -138,64 +155,15 @@ class Reducer<MapType, ReduceType> {
}
/**
* release the latch if appropriate
* Get the current reduce result resulting from applying reduce(...) to all MapResult elements.
*
* Appropriate means we've seen the last job, or there's only a single job id
*/
private void maybeReleaseLatch() {
if ( numJobsReduced == numSubmittedJobs ) {
// either we've already seen the last one prevJobID == numSubmittedJobs or
// the last job ID is -1, meaning that no jobs were ever submitted
countDownLatch.countDown();
}
}
/**
* For testing only
*
* @return true if latch is released
*/
protected synchronized boolean latchIsReleased() {
return countDownLatch.getCount() == 0;
}
/**
* Key function: tell this class the total number of jobs will provide data in the mapResultsQueue
*
* The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs
* MapResults from the queue, those threads are released.
*
* Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints.
* First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no
* data coming) so the latch should be opened immediately. If it's >= 1, we will wait until
* we see numOfSubmittedJobs jobs before freeing them.
*
* Note that we throw an IllegalStateException if this function is called twice.
*
* @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will
* enqueue results into our queue
*/
public synchronized void setTotalJobCount(final int numOfSubmittedJobs) {
if ( numOfSubmittedJobs < 0 )
throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs);
if ( numJobsReduced > numOfSubmittedJobs )
throw new IllegalArgumentException("numOfSubmittedJobs " + numOfSubmittedJobs + " < numJobsReduced " + numJobsReduced);
if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS)
throw new IllegalStateException("setlastJobID called multiple times, but should only be called once");
this.numSubmittedJobs = numOfSubmittedJobs;
maybeReleaseLatch();
}
/**
* Block until the last job has submitted its MapResult to our queue, and we've reduced it, and
* return the reduce result resulting from applying reduce(...) to all MapResult elements.
* Note that this method cannot know if future reduce calls are coming in. So it simply gets
* the current reduce result. It is up to the caller to know whether the returned value is
* a partial result, or the full final value
*
* @return the total reduce result across all jobs
* @throws InterruptedException
*/
public ReduceType waitForFinalReduce() throws InterruptedException {
countDownLatch.await();
public ReduceType getReduceResult() {
return sum;
}
}

View File

@ -27,19 +27,17 @@ public class ReducerUnitTest extends BaseTest {
List<Object[]> tests = new ArrayList<Object[]>();
for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) {
for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) {
for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) {
if ( groupSize < nElements ) {
for ( final List<MapResult<Integer>> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) {
tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize });
}
for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) {
if ( groupSize < nElements ) {
for ( final List<MapResult<Integer>> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) {
tests.add(new Object[]{ new ListOfJobs(jobs), groupSize });
}
}
}
for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) {
if ( groupSize < nElements ) {
tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize });
}
for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) {
if ( groupSize < nElements ) {
tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), groupSize });
}
}
}
@ -77,11 +75,7 @@ public class ReducerUnitTest extends BaseTest {
}
@Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME)
public void testReducerThread(final List<MapResult<Integer>> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception {
runTests(jobs, setJobIDAtStart, groupSize);
}
private void runTests( final List<MapResult<Integer>> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception {
public void testReducerThread(final List<MapResult<Integer>> allJobs, int groupSize) throws Exception {
if ( groupSize == -1 )
groupSize = allJobs.size();
@ -99,7 +93,6 @@ public class ReducerUnitTest extends BaseTest {
int nJobsSubmitted = 0;
int jobGroupCount = 0;
final int lastJobGroupCount = jobGroups.size() - 1;
setJobIDAtStart = setJobIDAtStart && groupSize == 1;
for ( final List<MapResult<Integer>> jobs : jobGroups ) {
//logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs");
@ -114,48 +107,17 @@ public class ReducerUnitTest extends BaseTest {
nJobsSubmitted++;
}
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start");
if ( jobGroupCount == 0 && setJobIDAtStart ) {
// only can do the setJobID if jobs cannot be submitted out of order
reducer.setTotalJobCount(allJobs.size()+1);
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything");
}
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted");
if ( setJobIDAtStart ) {
final boolean submittedLastJob = jobGroupCount == lastJobGroupCount;
Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob,
"When last job is set, latch should only be released if the last job has been submitted");
} else {
Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release");
}
jobGroupCount++;
}
if ( setJobIDAtStart )
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set");
else {
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set");
reducer.setTotalJobCount(allJobs.size() + 1);
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id ");
}
Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue");
es.shutdown();
es.awaitTermination(1, TimeUnit.HOURS);
}
@Test(enabled = true, expectedExceptions = IllegalStateException.class)
private void runSettingJobIDTwice() throws Exception {
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15);
}
@Test(timeOut = 1000, invocationCount = 100)
private void testNonBlockingReduce() throws Exception {
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
@ -242,12 +204,8 @@ public class ReducerUnitTest extends BaseTest {
@Override
public void run() {
try {
final int observedSum = reducer.waitForFinalReduce();
Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value");
} catch ( InterruptedException ex ) {
Assert.fail("Got interrupted");
}
final int observedSum = reducer.getReduceResult();
Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value");
}
}
}