diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d817877cc..b014695da 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -48,7 +48,7 @@ public class NanoScheduler { final int bufferSize; final int nThreads; final ExecutorService inputExecutor; - final ExecutorService errorWatchingExecutor; + final ExecutorService masterExecutor; final ExecutorService mapExecutor; final Semaphore runningMapJobSlots; final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); @@ -85,14 +85,14 @@ public class NanoScheduler { this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = this.errorWatchingExecutor = null; + this.mapExecutor = this.inputExecutor = this.masterExecutor = null; runningMapJobSlots = null; } else { this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); - this.errorWatchingExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); + this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); } // start timing the time spent outside of the nanoScheduler @@ -131,7 +131,7 @@ public class NanoScheduler { if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); - shutdownExecutor("errorWatchingExecutor", errorWatchingExecutor); + shutdownExecutor("masterExecutor", masterExecutor); } shutdown = true; @@ -313,154 +313,148 @@ public class NanoScheduler { final NSReduceFunction reduce) { debugPrint("Executing nanoScheduler"); - final ErrorWatcherThread errorWatcher = new ErrorWatcherThread(); - errorWatchingExecutor.submit(errorWatcher); + // start up the master job + final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce); + final Future reduceResult = masterExecutor.submit(masterJob); - // a blocking queue that limits the number of input datum to the requested buffer size - // note we need +1 because we continue to enqueue the lastObject - final BlockingQueue.InputValue> inputQueue - = new LinkedBlockingDeque.InputValue>(bufferSize+1); + while ( true ) { + // check that no errors occurred while we were waiting + handleErrors(); - // Create the input producer and start it running - final InputProducer inputProducer = - new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); - inputExecutor.submit(inputProducer); + try { + final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS); - // a priority queue that stores up to bufferSize elements - // produced by completed map jobs. - final PriorityBlockingQueue> mapResultQueue = - new PriorityBlockingQueue>(); + // in case an error occurred in the reduce + handleErrors(); - final Reducer reducer - = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); - - try { - int nSubmittedJobs = 0; - - while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { - // acquire a slot to run a map job. Blocks if too many jobs are enqueued - runningMapJobSlots.acquire(); - - mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); - nSubmittedJobs++; + // return our final reduce result + return result; + } catch (final TimeoutException ex ) { + // a normal case -- we just aren't done + } catch (final InterruptedException ex) { + errorTracker.notifyOfError(ex); + // will handle error in the next round of the for loop + } catch (final ExecutionException ex) { + errorTracker.notifyOfError(ex); + // will handle error in the next round of the for loop } - - // mark the last job id we've submitted so we now the id to wait for - //logger.warn("setting jobs submitted to " + nSubmittedJobs); - reducer.setTotalJobCount(nSubmittedJobs); - - // wait for all of the input and map threads to finish - return waitForCompletion(inputProducer, reducer, errorWatcher); - } catch (Exception ex) { - // occurs in general because the error watching thread shut us down - throw errorTracker.notifyOfError(ex); } } - /** - * Wait until the input thread and all map threads have completed running, and return the final reduce result - */ - private ReduceType waitForCompletion(final InputProducer inputProducer, - final Reducer reducer, - final ErrorWatcherThread errorWatcher) throws InterruptedException { - // wait until we have a final reduce result -// logger.warn("waiting for final reduce"); - final ReduceType finalSum = reducer.waitForFinalReduce(); - - // now wait for the input provider thread to terminate -// logger.warn("waiting on inputProducer"); - inputProducer.waitForDone(); - - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores -// logger.warn("waiting on map"); - runningMapJobSlots.acquire(bufferSize); - runningMapJobSlots.release(bufferSize); - - // We are done with everything so shutdown the errorWatcher thread - errorWatcher.shutdown(); - - // everything is finally shutdown, return the final reduce value - return finalSum; - } - - /** - * Should we continue to submit jobs given the number of jobs already submitted and the - * number of read items in inputProducer? - * - * We continue to submit jobs while inputProducer hasn't reached EOF or the number - * of jobs we've enqueued isn't the number of read elements. This means that in - * some cases we submit more jobs than total read elements (cannot know because of - * multi-threading) so map jobs must handle the case where getNext() returns EOF. - * - * @param nJobsSubmitted - * @param inputProducer - * @return - */ - private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNumInputValues(); - return nReadItems == -1 || nJobsSubmitted < nReadItems; - } - - /** - * A thread that periodically wakes up and checks to see if an error has occurred, and if - * so shuts down the NanoScheduler (via shutdownNow()), sending an InterruptedException to - * the main thread, which throws the error in the errorTracker. - * - * The main thread should call shutdown() when its ready to return itself, which will cause - * the run() method of this thread to abort in the next iteration. Uses a local latch to - * cause the thread calling shutdown to block until the run() method exits. - */ - private class ErrorWatcherThread implements Runnable { - boolean done = false; - final CountDownLatch latch = new CountDownLatch(1); - - private boolean isDone() { - return done; - } - - /** - * Shutdown this ErrorWatcher, blocking until the run() method of this thread exits - * - * @throws InterruptedException - */ - public void shutdown() throws InterruptedException { - this.done = true; - latch.await(); - } - - @Override - public void run() { - while ( ! isDone() ) { - try { - Thread.sleep(100); - handleErrorsIfOneOccurred(); - } catch (final InterruptedException ex) { - break; // just exit - } - } - - // free the latch so the shutdown thread starts up - latch.countDown(); - } - } - - /** - * If an error has occurred in the tracker, shut down the executors and - * throw the occur, otherwise do nothing. - */ - private void handleErrorsIfOneOccurred() { + private void handleErrors() { if ( errorTracker.hasAnErrorOccurred() ) { + masterExecutor.shutdownNow(); mapExecutor.shutdownNow(); inputExecutor.shutdownNow(); - errorWatchingExecutor.shutdownNow(); errorTracker.throwErrorIfPending(); } } /** - * Executes a single map job, reading the next element from the input inputQueue - * and after mapping runs reduce on as many elements as possible + * MasterJob has the task to enqueue Map jobs and wait for the final reduce + * + * It must be run in a separate thread in order to properly handle errors that may occur + * in the input, map, or reduce jobs without deadlocking. + * + * The result of this callable is the final reduce value for the input / map / reduce jobs */ + private class MasterJob implements Callable { + final Iterator inputReader; + final NSMapFunction map; + final ReduceType initialValue; + final NSReduceFunction reduce; + + private MasterJob(Iterator inputReader, NSMapFunction map, ReduceType initialValue, NSReduceFunction reduce) { + this.inputReader = inputReader; + this.map = map; + this.initialValue = initialValue; + this.reduce = reduce; + } + + @Override + public ReduceType call() { + // a blocking queue that limits the number of input datum to the requested buffer size + // note we need +1 because we continue to enqueue the lastObject + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(bufferSize+1); + + // Create the input producer and start it running + final InputProducer inputProducer = + new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); + inputExecutor.submit(inputProducer); + + // a priority queue that stores up to bufferSize elements + // produced by completed map jobs. + final PriorityBlockingQueue> mapResultQueue = + new PriorityBlockingQueue>(); + + final Reducer reducer + = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); + + try { + int nSubmittedJobs = 0; + + while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { + // acquire a slot to run a map job. Blocks if too many jobs are enqueued + runningMapJobSlots.acquire(); + + mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); + nSubmittedJobs++; + } + + // mark the last job id we've submitted so we now the id to wait for + //logger.warn("setting jobs submitted to " + nSubmittedJobs); + reducer.setTotalJobCount(nSubmittedJobs); + + // wait for all of the input and map threads to finish + return waitForCompletion(inputProducer, reducer); + } catch (Exception ex) { + errorTracker.notifyOfError(ex); + return initialValue; + } + } + + /** + * Wait until the input thread and all map threads have completed running, and return the final reduce result + */ + private ReduceType waitForCompletion(final InputProducer inputProducer, + final Reducer reducer) throws InterruptedException { + // wait until we have a final reduce result +// logger.warn("waiting for final reduce"); + final ReduceType finalSum = reducer.waitForFinalReduce(); + + // now wait for the input provider thread to terminate +// logger.warn("waiting on inputProducer"); + inputProducer.waitForDone(); + + // wait for all the map threads to finish by acquiring and then releasing all map job semaphores +// logger.warn("waiting on map"); + runningMapJobSlots.acquire(bufferSize); + runningMapJobSlots.release(bufferSize); + + // everything is finally shutdown, return the final reduce value + return finalSum; + } + + /** + * Should we continue to submit jobs given the number of jobs already submitted and the + * number of read items in inputProducer? + * + * We continue to submit jobs while inputProducer hasn't reached EOF or the number + * of jobs we've enqueued isn't the number of read elements. This means that in + * some cases we submit more jobs than total read elements (cannot know because of + * multi-threading) so map jobs must handle the case where getNext() returns EOF. + * + * @param nJobsSubmitted + * @param inputProducer + * @return + */ + private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { + final int nReadItems = inputProducer.getNumInputValues(); + return nReadItems == -1 || nJobsSubmitted < nReadItems; + } + } + private class MapReduceJob implements Runnable { final BlockingQueue.InputValue> inputQueue; final PriorityBlockingQueue> mapResultQueue; diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 97a45940f..f267999e3 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -24,7 +24,7 @@ import java.util.List; */ public class NanoSchedulerUnitTest extends BaseTest { private final static boolean debug = false; - public static final int NANO_SCHEDULE_MAX_RUNTIME = 10000; + public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000; private static class Map2x implements NSMapFunction { @Override public Integer apply(Integer input) { return input * 2; } @@ -228,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest { nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); } - @Test(expectedExceptions = NullPointerException.class, timeOut = 10000, invocationCount = 50) + @Test(expectedExceptions = NullPointerException.class, timeOut = 10000) public void testInputErrorIsThrown_NPE() throws InterruptedException { executeTestErrorThrowingInput(new NullPointerException()); } - @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000, invocationCount = 50) + @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000) public void testInputErrorIsThrown_RSE() throws InterruptedException { executeTestErrorThrowingInput(new ReviewedStingException("test")); }