NanoScheduler reducer optimizations
-- reduceAsMuchAsPossible no longer blocks threads via synchronization, but instead uses an explicit lock to manage access. If the lock is already held (because some thread is doing reduce) then the thread attempting to reduce immediately exits the call and continues doing productive work. They removes one major source of blocking contention in the NanoScheduler
This commit is contained in:
parent
161487b4a4
commit
bf81db40f7
|
|
@ -381,7 +381,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
reducer.setTotalJobCount(nSubmittedJobs);
|
||||
|
||||
// wait for all of the input and map threads to finish
|
||||
return waitForCompletion(inputProducer, reducer);
|
||||
return waitForCompletion(mapResultQueue, reducer);
|
||||
} catch (Throwable ex) {
|
||||
// logger.warn("Reduce job got exception " + ex);
|
||||
errorTracker.notifyOfError(ex);
|
||||
|
|
@ -392,17 +392,21 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
/**
|
||||
* Wait until the input thread and all map threads have completed running, and return the final reduce result
|
||||
*/
|
||||
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
|
||||
private ReduceType waitForCompletion(final MapResultsQueue<MapType> mapResultsQueue,
|
||||
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
|
||||
// wait until we have a final reduce result
|
||||
// logger.warn("waiting for final reduce");
|
||||
final ReduceType finalSum = reducer.waitForFinalReduce();
|
||||
|
||||
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
|
||||
// logger.warn("waiting on map");
|
||||
runningMapJobSlots.acquire(bufferSize);
|
||||
runningMapJobSlots.release(bufferSize);
|
||||
|
||||
// do a final reduce here. This is critically important because the InputMapReduce jobs
|
||||
// no longer block on reducing, so it's possible for all the threads to end with a few
|
||||
// reduce jobs on the queue still to do. This call ensures that we reduce everything
|
||||
reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
|
||||
|
||||
// wait until we have a final reduce result
|
||||
final ReduceType finalSum = reducer.waitForFinalReduce();
|
||||
|
||||
|
||||
// everything is finally shutdown, return the final reduce value
|
||||
return finalSum;
|
||||
}
|
||||
|
|
@ -470,7 +474,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
|
||||
mapResultQueue.put(result);
|
||||
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
|
||||
// reduce as much as possible, without blocking, if another thread is already doing reduces
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false);
|
||||
} catch (Throwable ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||
|
||||
import com.google.java.contract.Ensures;
|
||||
import com.google.java.contract.Requires;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Reducer supporting two-threaded reduce of the map/reduce.
|
||||
|
|
@ -31,9 +31,10 @@ class Reducer<MapType, ReduceType> {
|
|||
private final static Logger logger = Logger.getLogger(Reducer.class);
|
||||
private final static int UNSET_NUM_SUBMITTED_JOBS = -2;
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final NSReduceFunction<MapType, ReduceType> reduce;
|
||||
final MultiThreadedErrorTracker errorTracker;
|
||||
private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
private final NSReduceFunction<MapType, ReduceType> reduce;
|
||||
private final MultiThreadedErrorTracker errorTracker;
|
||||
private final Lock reduceLock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* The sum of the reduce function applied to all MapResults. After this Reducer
|
||||
|
|
@ -79,39 +80,63 @@ class Reducer<MapType, ReduceType> {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
@Ensures("result >= 0")
|
||||
public synchronized int reduceAsMuchAsPossible(final MapResultsQueue<MapType> mapResultQueue) {
|
||||
// TODO -- have conditional lock acquistion. If the lock can be acquired, actually do some useful
|
||||
// TODO -- work, but if it cannot just continue on your way. No sense in having all our map
|
||||
// TODO -- threads block here just to fall through. The only question is if, with that locking scheme,
|
||||
// TODO -- it's possible to leave some values in the map queue, as the thread owning the lock is
|
||||
// TODO -- exiting and the only remaining thread to actually complete the reduce falls through.
|
||||
// TODO -- all we really need to do is add a final call to reduceAsMuchAsPossible when exiting the nano scheduler
|
||||
// TODO -- to make sure we've cleaned everything up
|
||||
public int reduceAsMuchAsPossible(final MapResultsQueue<MapType> mapResultQueue, final boolean waitForLock) {
|
||||
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
|
||||
int nReducesNow = 0;
|
||||
|
||||
final boolean haveLock = acquireReduceLock(waitForLock);
|
||||
try {
|
||||
while ( mapResultQueue.nextValueIsAvailable() ) {
|
||||
final MapResult<MapType> result = mapResultQueue.take();
|
||||
if ( haveLock ) {
|
||||
while ( mapResultQueue.nextValueIsAvailable() ) {
|
||||
final MapResult<MapType> result = mapResultQueue.take();
|
||||
|
||||
if ( ! result.isEOFMarker() ) {
|
||||
nReducesNow++;
|
||||
if ( ! result.isEOFMarker() ) {
|
||||
nReducesNow++;
|
||||
|
||||
// apply reduce, keeping track of sum
|
||||
sum = reduce.apply(result.getValue(), sum);
|
||||
// apply reduce, keeping track of sum
|
||||
sum = reduce.apply(result.getValue(), sum);
|
||||
}
|
||||
|
||||
numJobsReduced++;
|
||||
maybeReleaseLatch();
|
||||
}
|
||||
|
||||
numJobsReduced++;
|
||||
maybeReleaseLatch();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
countDownLatch.countDown();
|
||||
} finally {
|
||||
if ( haveLock ) // if we acquired the lock, unlock it
|
||||
releaseReduceLock();
|
||||
}
|
||||
|
||||
return nReducesNow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the reduce lock, either returning immediately if not possible or blocking until the lock is available
|
||||
*
|
||||
* @param blockUntilAvailable if true, we will block until the lock is available, otherwise we return immediately
|
||||
* without acquiring the lock
|
||||
* @return true if the lock has been acquired, false otherwise
|
||||
*/
|
||||
protected boolean acquireReduceLock(final boolean blockUntilAvailable) {
|
||||
if ( blockUntilAvailable ) {
|
||||
reduceLock.lock();
|
||||
return true;
|
||||
} else {
|
||||
return reduceLock.tryLock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the reduce lock.
|
||||
*
|
||||
* Assumes that the invoking thread actually previously acquired the lock (it's a problem if not).
|
||||
*/
|
||||
protected void releaseReduceLock() {
|
||||
reduceLock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* release the latch if appropriate
|
||||
*
|
||||
|
|
|
|||
|
|
@ -11,10 +11,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* UnitTests for Reducer
|
||||
|
|
@ -125,7 +122,7 @@ public class ReducerUnitTest extends BaseTest {
|
|||
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything");
|
||||
}
|
||||
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue);
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
|
||||
Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted");
|
||||
|
||||
if ( setJobIDAtStart ) {
|
||||
|
|
@ -152,16 +149,71 @@ public class ReducerUnitTest extends BaseTest {
|
|||
es.awaitTermination(1, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = IllegalStateException.class)
|
||||
@Test(enabled = true, expectedExceptions = IllegalStateException.class)
|
||||
private void runSettingJobIDTwice() throws Exception {
|
||||
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
|
||||
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
|
||||
|
||||
reducer.setTotalJobCount(10);
|
||||
reducer.setTotalJobCount(15);
|
||||
}
|
||||
|
||||
@Test(timeOut = 1000, invocationCount = 100)
|
||||
private void testNonBlockingReduce() throws Exception {
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
|
||||
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
|
||||
mapResultsQueue.put(new MapResult<Integer>(0, 0));
|
||||
mapResultsQueue.put(new MapResult<Integer>(1, 1));
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||
|
||||
es.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
reducer.acquireReduceLock(true);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, false);
|
||||
Assert.assertEquals(nReduced, 0, "The reducer lock was already held but we did some work");
|
||||
es.shutdown();
|
||||
es.awaitTermination(1, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
@Test(timeOut = 10000, invocationCount = 100)
|
||||
private void testBlockingReduce() throws Exception {
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
|
||||
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
|
||||
mapResultsQueue.put(new MapResult<Integer>(0, 0));
|
||||
mapResultsQueue.put(new MapResult<Integer>(1, 1));
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||
|
||||
es.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
reducer.acquireReduceLock(true);
|
||||
latch.countDown();
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch ( InterruptedException e ) {
|
||||
;
|
||||
} finally {
|
||||
reducer.releaseReduceLock();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
|
||||
Assert.assertEquals(nReduced, 2, "The reducer should have blocked until the lock was freed and reduced 2 values");
|
||||
es.shutdown();
|
||||
es.awaitTermination(1, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
|
||||
public class ReduceSumTest implements NSReduceFunction<Integer, Integer> {
|
||||
int nRead = 0;
|
||||
int lastValue = -1;
|
||||
|
|
|
|||
Loading…
Reference in New Issue