diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index b014695da..d817877cc 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -48,7 +48,7 @@ public class NanoScheduler { final int bufferSize; final int nThreads; final ExecutorService inputExecutor; - final ExecutorService masterExecutor; + final ExecutorService errorWatchingExecutor; final ExecutorService mapExecutor; final Semaphore runningMapJobSlots; final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); @@ -85,14 +85,14 @@ public class NanoScheduler { this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = this.masterExecutor = null; + this.mapExecutor = this.inputExecutor = this.errorWatchingExecutor = null; runningMapJobSlots = null; } else { this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); - this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); + this.errorWatchingExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); } // start timing the time spent outside of the nanoScheduler @@ -131,7 +131,7 @@ public class NanoScheduler { if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); - shutdownExecutor("masterExecutor", masterExecutor); + shutdownExecutor("errorWatchingExecutor", errorWatchingExecutor); } shutdown = true; @@ -313,148 +313,154 @@ public class NanoScheduler { final NSReduceFunction reduce) { debugPrint("Executing nanoScheduler"); - // start up the master job - final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce); - final Future reduceResult = masterExecutor.submit(masterJob); + final ErrorWatcherThread errorWatcher = new ErrorWatcherThread(); + errorWatchingExecutor.submit(errorWatcher); - while ( true ) { - // check that no errors occurred while we were waiting - handleErrors(); + // a blocking queue that limits the number of input datum to the requested buffer size + // note we need +1 because we continue to enqueue the lastObject + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(bufferSize+1); - try { - final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS); + // Create the input producer and start it running + final InputProducer inputProducer = + new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); + inputExecutor.submit(inputProducer); - // in case an error occurred in the reduce - handleErrors(); + // a priority queue that stores up to bufferSize elements + // produced by completed map jobs. + final PriorityBlockingQueue> mapResultQueue = + new PriorityBlockingQueue>(); - // return our final reduce result - return result; - } catch (final TimeoutException ex ) { - // a normal case -- we just aren't done - } catch (final InterruptedException ex) { - errorTracker.notifyOfError(ex); - // will handle error in the next round of the for loop - } catch (final ExecutionException ex) { - errorTracker.notifyOfError(ex); - // will handle error in the next round of the for loop + final Reducer reducer + = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); + + try { + int nSubmittedJobs = 0; + + while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { + // acquire a slot to run a map job. Blocks if too many jobs are enqueued + runningMapJobSlots.acquire(); + + mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); + nSubmittedJobs++; } + + // mark the last job id we've submitted so we now the id to wait for + //logger.warn("setting jobs submitted to " + nSubmittedJobs); + reducer.setTotalJobCount(nSubmittedJobs); + + // wait for all of the input and map threads to finish + return waitForCompletion(inputProducer, reducer, errorWatcher); + } catch (Exception ex) { + // occurs in general because the error watching thread shut us down + throw errorTracker.notifyOfError(ex); } } - private void handleErrors() { + /** + * Wait until the input thread and all map threads have completed running, and return the final reduce result + */ + private ReduceType waitForCompletion(final InputProducer inputProducer, + final Reducer reducer, + final ErrorWatcherThread errorWatcher) throws InterruptedException { + // wait until we have a final reduce result +// logger.warn("waiting for final reduce"); + final ReduceType finalSum = reducer.waitForFinalReduce(); + + // now wait for the input provider thread to terminate +// logger.warn("waiting on inputProducer"); + inputProducer.waitForDone(); + + // wait for all the map threads to finish by acquiring and then releasing all map job semaphores +// logger.warn("waiting on map"); + runningMapJobSlots.acquire(bufferSize); + runningMapJobSlots.release(bufferSize); + + // We are done with everything so shutdown the errorWatcher thread + errorWatcher.shutdown(); + + // everything is finally shutdown, return the final reduce value + return finalSum; + } + + /** + * Should we continue to submit jobs given the number of jobs already submitted and the + * number of read items in inputProducer? + * + * We continue to submit jobs while inputProducer hasn't reached EOF or the number + * of jobs we've enqueued isn't the number of read elements. This means that in + * some cases we submit more jobs than total read elements (cannot know because of + * multi-threading) so map jobs must handle the case where getNext() returns EOF. + * + * @param nJobsSubmitted + * @param inputProducer + * @return + */ + private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { + final int nReadItems = inputProducer.getNumInputValues(); + return nReadItems == -1 || nJobsSubmitted < nReadItems; + } + + /** + * A thread that periodically wakes up and checks to see if an error has occurred, and if + * so shuts down the NanoScheduler (via shutdownNow()), sending an InterruptedException to + * the main thread, which throws the error in the errorTracker. + * + * The main thread should call shutdown() when its ready to return itself, which will cause + * the run() method of this thread to abort in the next iteration. Uses a local latch to + * cause the thread calling shutdown to block until the run() method exits. + */ + private class ErrorWatcherThread implements Runnable { + boolean done = false; + final CountDownLatch latch = new CountDownLatch(1); + + private boolean isDone() { + return done; + } + + /** + * Shutdown this ErrorWatcher, blocking until the run() method of this thread exits + * + * @throws InterruptedException + */ + public void shutdown() throws InterruptedException { + this.done = true; + latch.await(); + } + + @Override + public void run() { + while ( ! isDone() ) { + try { + Thread.sleep(100); + handleErrorsIfOneOccurred(); + } catch (final InterruptedException ex) { + break; // just exit + } + } + + // free the latch so the shutdown thread starts up + latch.countDown(); + } + } + + /** + * If an error has occurred in the tracker, shut down the executors and + * throw the occur, otherwise do nothing. + */ + private void handleErrorsIfOneOccurred() { if ( errorTracker.hasAnErrorOccurred() ) { - masterExecutor.shutdownNow(); mapExecutor.shutdownNow(); inputExecutor.shutdownNow(); + errorWatchingExecutor.shutdownNow(); errorTracker.throwErrorIfPending(); } } /** - * MasterJob has the task to enqueue Map jobs and wait for the final reduce - * - * It must be run in a separate thread in order to properly handle errors that may occur - * in the input, map, or reduce jobs without deadlocking. - * - * The result of this callable is the final reduce value for the input / map / reduce jobs + * Executes a single map job, reading the next element from the input inputQueue + * and after mapping runs reduce on as many elements as possible */ - private class MasterJob implements Callable { - final Iterator inputReader; - final NSMapFunction map; - final ReduceType initialValue; - final NSReduceFunction reduce; - - private MasterJob(Iterator inputReader, NSMapFunction map, ReduceType initialValue, NSReduceFunction reduce) { - this.inputReader = inputReader; - this.map = map; - this.initialValue = initialValue; - this.reduce = reduce; - } - - @Override - public ReduceType call() { - // a blocking queue that limits the number of input datum to the requested buffer size - // note we need +1 because we continue to enqueue the lastObject - final BlockingQueue.InputValue> inputQueue - = new LinkedBlockingDeque.InputValue>(bufferSize+1); - - // Create the input producer and start it running - final InputProducer inputProducer = - new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); - inputExecutor.submit(inputProducer); - - // a priority queue that stores up to bufferSize elements - // produced by completed map jobs. - final PriorityBlockingQueue> mapResultQueue = - new PriorityBlockingQueue>(); - - final Reducer reducer - = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); - - try { - int nSubmittedJobs = 0; - - while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { - // acquire a slot to run a map job. Blocks if too many jobs are enqueued - runningMapJobSlots.acquire(); - - mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); - nSubmittedJobs++; - } - - // mark the last job id we've submitted so we now the id to wait for - //logger.warn("setting jobs submitted to " + nSubmittedJobs); - reducer.setTotalJobCount(nSubmittedJobs); - - // wait for all of the input and map threads to finish - return waitForCompletion(inputProducer, reducer); - } catch (Exception ex) { - errorTracker.notifyOfError(ex); - return initialValue; - } - } - - /** - * Wait until the input thread and all map threads have completed running, and return the final reduce result - */ - private ReduceType waitForCompletion(final InputProducer inputProducer, - final Reducer reducer) throws InterruptedException { - // wait until we have a final reduce result -// logger.warn("waiting for final reduce"); - final ReduceType finalSum = reducer.waitForFinalReduce(); - - // now wait for the input provider thread to terminate -// logger.warn("waiting on inputProducer"); - inputProducer.waitForDone(); - - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores -// logger.warn("waiting on map"); - runningMapJobSlots.acquire(bufferSize); - runningMapJobSlots.release(bufferSize); - - // everything is finally shutdown, return the final reduce value - return finalSum; - } - - /** - * Should we continue to submit jobs given the number of jobs already submitted and the - * number of read items in inputProducer? - * - * We continue to submit jobs while inputProducer hasn't reached EOF or the number - * of jobs we've enqueued isn't the number of read elements. This means that in - * some cases we submit more jobs than total read elements (cannot know because of - * multi-threading) so map jobs must handle the case where getNext() returns EOF. - * - * @param nJobsSubmitted - * @param inputProducer - * @return - */ - private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNumInputValues(); - return nReadItems == -1 || nJobsSubmitted < nReadItems; - } - } - private class MapReduceJob implements Runnable { final BlockingQueue.InputValue> inputQueue; final PriorityBlockingQueue> mapResultQueue; diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index f267999e3..97a45940f 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -24,7 +24,7 @@ import java.util.List; */ public class NanoSchedulerUnitTest extends BaseTest { private final static boolean debug = false; - public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000; + public static final int NANO_SCHEDULE_MAX_RUNTIME = 10000; private static class Map2x implements NSMapFunction { @Override public Integer apply(Integer input) { return input * 2; } @@ -228,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest { nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); } - @Test(expectedExceptions = NullPointerException.class, timeOut = 10000) + @Test(expectedExceptions = NullPointerException.class, timeOut = 10000, invocationCount = 50) public void testInputErrorIsThrown_NPE() throws InterruptedException { executeTestErrorThrowingInput(new NullPointerException()); } - @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000) + @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000, invocationCount = 50) public void testInputErrorIsThrown_RSE() throws InterruptedException { executeTestErrorThrowingInput(new ReviewedStingException("test")); }