diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d8325f83e..d3b8c8149 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -381,7 +381,7 @@ public class NanoScheduler { reducer.setTotalJobCount(nSubmittedJobs); // wait for all of the input and map threads to finish - return waitForCompletion(inputProducer, reducer); + return waitForCompletion(mapResultQueue, reducer); } catch (Throwable ex) { // logger.warn("Reduce job got exception " + ex); errorTracker.notifyOfError(ex); @@ -392,17 +392,21 @@ public class NanoScheduler { /** * Wait until the input thread and all map threads have completed running, and return the final reduce result */ - private ReduceType waitForCompletion(final InputProducer inputProducer, + private ReduceType waitForCompletion(final MapResultsQueue mapResultsQueue, final Reducer reducer) throws InterruptedException { - // wait until we have a final reduce result -// logger.warn("waiting for final reduce"); - final ReduceType finalSum = reducer.waitForFinalReduce(); - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores -// logger.warn("waiting on map"); runningMapJobSlots.acquire(bufferSize); runningMapJobSlots.release(bufferSize); + // do a final reduce here. This is critically important because the InputMapReduce jobs + // no longer block on reducing, so it's possible for all the threads to end with a few + // reduce jobs on the queue still to do. This call ensures that we reduce everything + reducer.reduceAsMuchAsPossible(mapResultsQueue, true); + + // wait until we have a final reduce result + final ReduceType finalSum = reducer.waitForFinalReduce(); + + // everything is finally shutdown, return the final reduce value return finalSum; } @@ -470,7 +474,8 @@ public class NanoScheduler { mapResultQueue.put(result); - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); + // reduce as much as possible, without blocking, if another thread is already doing reduces + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false); } catch (Throwable ex) { errorTracker.notifyOfError(ex); } finally { diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index a3d3f9056..25e8b1fe6 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -1,12 +1,12 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; -import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Reducer supporting two-threaded reduce of the map/reduce. @@ -31,9 +31,10 @@ class Reducer { private final static Logger logger = Logger.getLogger(Reducer.class); private final static int UNSET_NUM_SUBMITTED_JOBS = -2; - final CountDownLatch countDownLatch = new CountDownLatch(1); - final NSReduceFunction reduce; - final MultiThreadedErrorTracker errorTracker; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private final NSReduceFunction reduce; + private final MultiThreadedErrorTracker errorTracker; + private final Lock reduceLock = new ReentrantLock(); /** * The sum of the reduce function applied to all MapResults. After this Reducer @@ -79,39 +80,63 @@ class Reducer { * @throws InterruptedException */ @Ensures("result >= 0") - public synchronized int reduceAsMuchAsPossible(final MapResultsQueue mapResultQueue) { - // TODO -- have conditional lock acquistion. If the lock can be acquired, actually do some useful - // TODO -- work, but if it cannot just continue on your way. No sense in having all our map - // TODO -- threads block here just to fall through. The only question is if, with that locking scheme, - // TODO -- it's possible to leave some values in the map queue, as the thread owning the lock is - // TODO -- exiting and the only remaining thread to actually complete the reduce falls through. - // TODO -- all we really need to do is add a final call to reduceAsMuchAsPossible when exiting the nano scheduler - // TODO -- to make sure we've cleaned everything up + public int reduceAsMuchAsPossible(final MapResultsQueue mapResultQueue, final boolean waitForLock) { if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); int nReducesNow = 0; + final boolean haveLock = acquireReduceLock(waitForLock); try { - while ( mapResultQueue.nextValueIsAvailable() ) { - final MapResult result = mapResultQueue.take(); + if ( haveLock ) { + while ( mapResultQueue.nextValueIsAvailable() ) { + final MapResult result = mapResultQueue.take(); - if ( ! result.isEOFMarker() ) { - nReducesNow++; + if ( ! result.isEOFMarker() ) { + nReducesNow++; - // apply reduce, keeping track of sum - sum = reduce.apply(result.getValue(), sum); + // apply reduce, keeping track of sum + sum = reduce.apply(result.getValue(), sum); + } + + numJobsReduced++; + maybeReleaseLatch(); } - - numJobsReduced++; - maybeReleaseLatch(); } } catch (Exception ex) { errorTracker.notifyOfError(ex); countDownLatch.countDown(); + } finally { + if ( haveLock ) // if we acquired the lock, unlock it + releaseReduceLock(); } return nReducesNow; } + /** + * Acquire the reduce lock, either returning immediately if not possible or blocking until the lock is available + * + * @param blockUntilAvailable if true, we will block until the lock is available, otherwise we return immediately + * without acquiring the lock + * @return true if the lock has been acquired, false otherwise + */ + protected boolean acquireReduceLock(final boolean blockUntilAvailable) { + if ( blockUntilAvailable ) { + reduceLock.lock(); + return true; + } else { + return reduceLock.tryLock(); + } + } + + /** + * Free the reduce lock. + * + * Assumes that the invoking thread actually previously acquired the lock (it's a problem if not). + */ + protected void releaseReduceLock() { + reduceLock.unlock(); + } + /** * release the latch if appropriate * diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 6ea73f3cd..5db5acde0 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -11,10 +11,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * UnitTests for Reducer @@ -125,7 +122,7 @@ public class ReducerUnitTest extends BaseTest { Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); } - final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue); + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true); Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted"); if ( setJobIDAtStart ) { @@ -152,16 +149,71 @@ public class ReducerUnitTest extends BaseTest { es.awaitTermination(1, TimeUnit.HOURS); } - @Test(expectedExceptions = IllegalStateException.class) + @Test(enabled = true, expectedExceptions = IllegalStateException.class) private void runSettingJobIDTwice() throws Exception { - final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); - final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); - reducer.setTotalJobCount(10); reducer.setTotalJobCount(15); } + @Test(timeOut = 1000, invocationCount = 100) + private void testNonBlockingReduce() throws Exception { + final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); + final MapResultsQueue mapResultsQueue = new MapResultsQueue(); + mapResultsQueue.put(new MapResult(0, 0)); + mapResultsQueue.put(new MapResult(1, 1)); + + final CountDownLatch latch = new CountDownLatch(1); + final ExecutorService es = Executors.newSingleThreadExecutor(); + + es.submit(new Runnable() { + @Override + public void run() { + reducer.acquireReduceLock(true); + latch.countDown(); + } + }); + + latch.await(); + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, false); + Assert.assertEquals(nReduced, 0, "The reducer lock was already held but we did some work"); + es.shutdown(); + es.awaitTermination(1, TimeUnit.HOURS); + } + + @Test(timeOut = 10000, invocationCount = 100) + private void testBlockingReduce() throws Exception { + final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); + final MapResultsQueue mapResultsQueue = new MapResultsQueue(); + mapResultsQueue.put(new MapResult(0, 0)); + mapResultsQueue.put(new MapResult(1, 1)); + + final CountDownLatch latch = new CountDownLatch(1); + final ExecutorService es = Executors.newSingleThreadExecutor(); + + es.submit(new Runnable() { + @Override + public void run() { + reducer.acquireReduceLock(true); + latch.countDown(); + try { + Thread.sleep(100); + } catch ( InterruptedException e ) { + ; + } finally { + reducer.releaseReduceLock(); + } + } + }); + + latch.await(); + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true); + Assert.assertEquals(nReduced, 2, "The reducer should have blocked until the lock was freed and reduced 2 values"); + es.shutdown(); + es.awaitTermination(1, TimeUnit.HOURS); + } + + public class ReduceSumTest implements NSReduceFunction { int nRead = 0; int lastValue = -1;