From 2267b722b2337d8f4cd41a35e854df03c2dc8963 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 19 Sep 2012 16:59:24 -0400 Subject: [PATCH] Proper error handling in NanoScheduler -- Renamed TraversalErrorManager to the more general MultiThreadedErrorTracker -- ErrorTracker is now used throughout the NanoScheduler. In order to properly handle errors, the work previously done by main thread (submit jobs, block on reduce) is now handled in a separate thread. The main thread simply wakes up peroidically and checks whether the reduce result is available or if an error has occurred, and handles each appropriately. -- EngineFeaturesIntegrationTest checks that -nt and -nct properly throw errors in Walkers -- Added NanoSchedulerUnitTest for input errors -- ThreadEfficiencyMonitoring is now disabled by default, and can be enabled with a GATK command line option. This is because the monitoring doesn't differentiate between threads that are supposed to do work, and those that are supposed to wait, and therefore gives misleading results. -- Build.xml no longer copies the unittest results verbosely --- build.xml | 2 +- .../sting/gatk/GenomeAnalysisEngine.java | 3 +- .../arguments/GATKArgumentCollection.java | 10 +- .../executive/HierarchicalMicroScheduler.java | 6 +- .../utils/MultiThreadedErrorTracker.java | 80 ++++++++ .../sting/utils/TraversalErrorManager.java | 53 ----- .../utils/nanoScheduler/InputProducer.java | 10 +- .../utils/nanoScheduler/NanoScheduler.java | 194 ++++++++++++------ .../sting/utils/nanoScheduler/Reducer.java | 36 ++-- .../gatk/EngineFeaturesIntegrationTest.java | 6 +- .../nanoScheduler/InputProducerUnitTest.java | 5 +- .../nanoScheduler/NanoSchedulerUnitTest.java | 13 +- .../utils/nanoScheduler/ReducerUnitTest.java | 5 +- 13 files changed, 267 insertions(+), 156 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java diff --git a/build.xml b/build.xml index 0d1deba29..7e7415f08 100644 --- a/build.xml +++ b/build.xml @@ -1179,7 +1179,7 @@ - + diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index fc2546173..8071fe5dc 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -63,7 +63,6 @@ import org.broadinstitute.sting.utils.recalibration.BaseRecalibration; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import java.io.File; -import java.io.OutputStream; import java.util.*; /** @@ -410,7 +409,7 @@ public class GenomeAnalysisEngine { this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads, argCollection.numberOfCPUThreadsPerDataThread, argCollection.numberOfIOThreads, - ! argCollection.disableEfficiencyMonitor); + argCollection.monitorThreadEfficiency); } public int getTotalNumberOfThreads() { diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index 44817379a..c8887b8b2 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -307,12 +307,12 @@ public class GATKArgumentCollection { public int numberOfIOThreads = 0; /** - * By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny - * cost (< 0.1%) in runtime because of turning on the JavaBean. This argument allows you - * to disable the monitor + * Enable GATK to monitor its own threading efficiency, at a itsy-bitsy tiny + * cost (< 0.1%) in runtime because of turning on the JavaBean. This is largely for + * debugging purposes. */ - @Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false) - public Boolean disableEfficiencyMonitor = false; + @Argument(fullName = "monitorThreadEfficiency", shortName = "mte", doc = "Enable GATK threading efficiency monitoring", required = false) + public Boolean monitorThreadEfficiency = false; @Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false) public Integer numberOfBAMFileHandles = null; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 0ddced502..01c4315f2 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.TraversalErrorManager; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; @@ -46,7 +46,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * An exception that's occurred in this traversal. If null, no exception has occurred. */ - final TraversalErrorManager errorTracker = new TraversalErrorManager(); + final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); /** * Queue of incoming shards. @@ -351,7 +351,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * Allows other threads to notify of an error during traversal. */ protected synchronized RuntimeException notifyOfTraversalError(Throwable error) { - return errorTracker.notifyOfTraversalError(error); + return errorTracker.notifyOfError(error); } /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ diff --git a/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java b/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java new file mode 100644 index 000000000..98900031a --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java @@ -0,0 +1,80 @@ +package org.broadinstitute.sting.utils; + +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +/** + * A utility to track exceptions that occur across threads. + * + * Uses a notify mechanism so that multiple threads can tell the tracker that an + * error has occurred, and a master thread can monitor this object for an error + * occurring and take appropriate action. Only maintains the first + * error to reach the tracker. + * + * Refactored from HierarchicalMicroScheduler + * + * User: depristo + * Date: 9/19/12 + * Time: 11:20 AM + */ +public class MultiThreadedErrorTracker { + /** + * An exception that's occurred. If null, no exception has occurred. + */ + private RuntimeException error = null; + + /** + * Convenience function to check, and throw, an error is one is pending + */ + public synchronized void throwErrorIfPending() { + if (hasAnErrorOccurred()) + throw getError(); + } + + /** + * Detects whether an execution error has occurred. + * @return True if an error has occurred. False otherwise. + */ + public synchronized boolean hasAnErrorOccurred() { + return error != null; + } + + /** + * Retrieve the error that has occurred. + * + * @throws ReviewedStingException if no error has occurred. + * @return + */ + public synchronized RuntimeException getError() { + if(!hasAnErrorOccurred()) + throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); + return error; + } + + /** + * Notify this error tracker that an error has occurs. Only updates the tracked + * error if it is currently null (i.e., no error has been already reported). So + * calling this successively with multiple errors only keeps the first, which is the + * right thing to do as the initial failure is usually the meaningful one, but + * generates a cascade of failures as other subsystems fail. + */ + public synchronized RuntimeException notifyOfError(Throwable error) { + if ( this.error == null ) + this.error = toRuntimeException(error); + + return this.error; + } + + /** + * Convert error to a Runtime exception, or keep as is if it already is one + * + * @param error the error that has occurred + * @return the potentially converted error + */ + private RuntimeException toRuntimeException(final Throwable error) { + // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. + if (error instanceof RuntimeException) + return (RuntimeException)error; + else + return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java b/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java deleted file mode 100644 index dd57950e0..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.broadinstitute.sting.utils; - -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -/** - * Created with IntelliJ IDEA. - * User: depristo - * Date: 9/19/12 - * Time: 11:20 AM - * To change this template use File | Settings | File Templates. - */ -public class TraversalErrorManager { - /** - * An exception that's occurred in this traversal. If null, no exception has occurred. - */ - private RuntimeException error = null; - - public synchronized void throwErrorIfPending() { - if (hasTraversalErrorOccurred()) - throw getTraversalError(); - } - - /** - * Detects whether an execution error has occurred. - * @return True if an error has occurred. False otherwise. - */ - public synchronized boolean hasTraversalErrorOccurred() { - return error != null; - } - - public synchronized RuntimeException getTraversalError() { - if(!hasTraversalErrorOccurred()) - throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); - return error; - } - - /** - * Allows other threads to notify of an error during traversal. - */ - public synchronized RuntimeException notifyOfTraversalError(Throwable error) { - // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. - this.error = toRuntimeException(error); - return this.error; - } - - private RuntimeException toRuntimeException(final Throwable error) { - // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. - if (error instanceof RuntimeException) - return (RuntimeException)error; - else - return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 0e337631c..bd99a9266 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,8 +1,8 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.SimpleTimer; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; import java.util.concurrent.BlockingQueue; @@ -29,6 +29,8 @@ class InputProducer implements Runnable { */ final BlockingQueue outputQueue; + final MultiThreadedErrorTracker errorTracker; + /** * Have we read the last value from inputReader? * @@ -48,13 +50,16 @@ class InputProducer implements Runnable { final CountDownLatch latch = new CountDownLatch(1); public InputProducer(final Iterator inputReader, + final MultiThreadedErrorTracker errorTracker, final SimpleTimer inputTimer, final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null"); if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; + this.errorTracker = errorTracker; this.inputTimer = inputTimer; this.outputQueue = outputQueue; } @@ -129,8 +134,7 @@ class InputProducer implements Runnable { latch.countDown(); } catch (Exception ex) { - logger.warn("Got exception " + ex); - throw new ReviewedStingException("got execution exception", ex); + errorTracker.notifyOfError(ex); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 31ce04074..b014695da 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -3,7 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; import java.util.Iterator; @@ -48,8 +48,10 @@ public class NanoScheduler { final int bufferSize; final int nThreads; final ExecutorService inputExecutor; + final ExecutorService masterExecutor; final ExecutorService mapExecutor; final Semaphore runningMapJobSlots; + final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker(); boolean shutdown = false; boolean debug = false; @@ -83,13 +85,14 @@ public class NanoScheduler { this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = null; + this.mapExecutor = this.inputExecutor = this.masterExecutor = null; runningMapJobSlots = null; } else { this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); + this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); } // start timing the time spent outside of the nanoScheduler @@ -128,6 +131,7 @@ public class NanoScheduler { if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); + shutdownExecutor("masterExecutor", masterExecutor); } shutdown = true; @@ -309,85 +313,146 @@ public class NanoScheduler { final NSReduceFunction reduce) { debugPrint("Executing nanoScheduler"); - // a blocking queue that limits the number of input datum to the requested buffer size - // note we need +1 because we continue to enqueue the lastObject - final BlockingQueue.InputValue> inputQueue - = new LinkedBlockingDeque.InputValue>(bufferSize+1); + // start up the master job + final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce); + final Future reduceResult = masterExecutor.submit(masterJob); - // Create the input producer and start it running - final InputProducer inputProducer = - new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue); - inputExecutor.submit(inputProducer); + while ( true ) { + // check that no errors occurred while we were waiting + handleErrors(); - // a priority queue that stores up to bufferSize elements - // produced by completed map jobs. - final PriorityBlockingQueue> mapResultQueue = - new PriorityBlockingQueue>(); + try { + final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS); - final Reducer reducer - = new Reducer(reduce, myNSRuntimeProfile.reduceTimer, initialValue); + // in case an error occurred in the reduce + handleErrors(); - try { - int nSubmittedJobs = 0; - - while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { - // acquire a slot to run a map job. Blocks if too many jobs are enqueued - runningMapJobSlots.acquire(); - - mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); - nSubmittedJobs++; + // return our final reduce result + return result; + } catch (final TimeoutException ex ) { + // a normal case -- we just aren't done + } catch (final InterruptedException ex) { + errorTracker.notifyOfError(ex); + // will handle error in the next round of the for loop + } catch (final ExecutionException ex) { + errorTracker.notifyOfError(ex); + // will handle error in the next round of the for loop } + } + } - // mark the last job id we've submitted so we now the id to wait for - //logger.warn("setting jobs submitted to " + nSubmittedJobs); - reducer.setTotalJobCount(nSubmittedJobs); - - // wait for all of the input and map threads to finish - return waitForCompletion(inputProducer, reducer); - } catch (Exception ex) { - logger.warn("Got exception " + ex); - throw new ReviewedStingException("got execution exception", ex); + private void handleErrors() { + if ( errorTracker.hasAnErrorOccurred() ) { + masterExecutor.shutdownNow(); + mapExecutor.shutdownNow(); + inputExecutor.shutdownNow(); + errorTracker.throwErrorIfPending(); } } /** - * Wait until the input thread and all map threads have completed running, and return the final reduce result + * MasterJob has the task to enqueue Map jobs and wait for the final reduce + * + * It must be run in a separate thread in order to properly handle errors that may occur + * in the input, map, or reduce jobs without deadlocking. + * + * The result of this callable is the final reduce value for the input / map / reduce jobs */ - private ReduceType waitForCompletion(final InputProducer inputProducer, - final Reducer reducer) throws InterruptedException { - // wait until we have a final reduce result + private class MasterJob implements Callable { + final Iterator inputReader; + final NSMapFunction map; + final ReduceType initialValue; + final NSReduceFunction reduce; + + private MasterJob(Iterator inputReader, NSMapFunction map, ReduceType initialValue, NSReduceFunction reduce) { + this.inputReader = inputReader; + this.map = map; + this.initialValue = initialValue; + this.reduce = reduce; + } + + @Override + public ReduceType call() { + // a blocking queue that limits the number of input datum to the requested buffer size + // note we need +1 because we continue to enqueue the lastObject + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(bufferSize+1); + + // Create the input producer and start it running + final InputProducer inputProducer = + new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); + inputExecutor.submit(inputProducer); + + // a priority queue that stores up to bufferSize elements + // produced by completed map jobs. + final PriorityBlockingQueue> mapResultQueue = + new PriorityBlockingQueue>(); + + final Reducer reducer + = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); + + try { + int nSubmittedJobs = 0; + + while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { + // acquire a slot to run a map job. Blocks if too many jobs are enqueued + runningMapJobSlots.acquire(); + + mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); + nSubmittedJobs++; + } + + // mark the last job id we've submitted so we now the id to wait for + //logger.warn("setting jobs submitted to " + nSubmittedJobs); + reducer.setTotalJobCount(nSubmittedJobs); + + // wait for all of the input and map threads to finish + return waitForCompletion(inputProducer, reducer); + } catch (Exception ex) { + errorTracker.notifyOfError(ex); + return initialValue; + } + } + + /** + * Wait until the input thread and all map threads have completed running, and return the final reduce result + */ + private ReduceType waitForCompletion(final InputProducer inputProducer, + final Reducer reducer) throws InterruptedException { + // wait until we have a final reduce result // logger.warn("waiting for final reduce"); - final ReduceType finalSum = reducer.waitForFinalReduce(); + final ReduceType finalSum = reducer.waitForFinalReduce(); - // now wait for the input provider thread to terminate + // now wait for the input provider thread to terminate // logger.warn("waiting on inputProducer"); - inputProducer.waitForDone(); + inputProducer.waitForDone(); - // wait for all the map threads to finish by acquiring and then releasing all map job semaphores + // wait for all the map threads to finish by acquiring and then releasing all map job semaphores // logger.warn("waiting on map"); - runningMapJobSlots.acquire(this.bufferSize); - runningMapJobSlots.release(this.bufferSize); + runningMapJobSlots.acquire(bufferSize); + runningMapJobSlots.release(bufferSize); - // everything is finally shutdown, return the final reduce value - return finalSum; - } + // everything is finally shutdown, return the final reduce value + return finalSum; + } - /** - * Should we continue to submit jobs given the number of jobs already submitted and the - * number of read items in inputProducer? - * - * We continue to submit jobs while inputProducer hasn't reached EOF or the number - * of jobs we've enqueued isn't the number of read elements. This means that in - * some cases we submit more jobs than total read elements (cannot know because of - * multi-threading) so map jobs must handle the case where getNext() returns EOF. - * - * @param nJobsSubmitted - * @param inputProducer - * @return - */ - private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNumInputValues(); - return nReadItems == -1 || nJobsSubmitted < nReadItems; + /** + * Should we continue to submit jobs given the number of jobs already submitted and the + * number of read items in inputProducer? + * + * We continue to submit jobs while inputProducer hasn't reached EOF or the number + * of jobs we've enqueued isn't the number of read elements. This means that in + * some cases we submit more jobs than total read elements (cannot know because of + * multi-threading) so map jobs must handle the case where getNext() returns EOF. + * + * @param nJobsSubmitted + * @param inputProducer + * @return + */ + private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { + final int nReadItems = inputProducer.getNumInputValues(); + return nReadItems == -1 || nJobsSubmitted < nReadItems; + } } private class MapReduceJob implements Runnable { @@ -444,8 +509,7 @@ public class NanoScheduler { final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); } catch (Exception ex) { - logger.warn("Got exception " + ex); - throw new ReviewedStingException("got execution exception", ex); + errorTracker.notifyOfError(ex); } finally { // we finished a map job, release the job queue semaphore runningMapJobSlots.release(); diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 428ab37fd..92c1018eb 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.SimpleTimer; import java.util.concurrent.CountDownLatch; @@ -34,6 +35,7 @@ class Reducer { final CountDownLatch countDownLatch = new CountDownLatch(1); final NSReduceFunction reduce; final SimpleTimer reduceTimer; + final MultiThreadedErrorTracker errorTracker; /** * The sum of the reduce function applied to all MapResults. After this Reducer @@ -63,11 +65,14 @@ class Reducer { * @param initialSum the initial reduce sum */ public Reducer(final NSReduceFunction reduce, + final MultiThreadedErrorTracker errorTracker, final SimpleTimer reduceTimer, final ReduceType initialSum) { + if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); + this.errorTracker = errorTracker; this.reduce = reduce; this.reduceTimer = reduceTimer; this.sum = initialSum; @@ -105,31 +110,34 @@ class Reducer { * @throws InterruptedException */ @Ensures("result >= 0") - public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) throws InterruptedException { + public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) { if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); int nReducesNow = 0; // if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS ) // logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); + try { + while ( reduceNextValueInQueue(mapResultQueue) ) { + final MapResult result = mapResultQueue.take(); + prevJobID = result.getJobID(); - while ( reduceNextValueInQueue(mapResultQueue) ) { - final MapResult result = mapResultQueue.take(); - prevJobID = result.getJobID(); + if ( ! result.isEOFMarker() ) { + nReducesNow++; - if ( ! result.isEOFMarker() ) { - nReducesNow++; + // apply reduce, keeping track of sum + reduceTimer.restart(); + sum = reduce.apply(result.getValue(), sum); + reduceTimer.stop(); - // apply reduce, keeping track of sum - reduceTimer.restart(); - sum = reduce.apply(result.getValue(), sum); - reduceTimer.stop(); + } + numJobsReduced++; + maybeReleaseLatch(); } - - numJobsReduced++; - maybeReleaseLatch(); + } catch (Exception ex) { + errorTracker.notifyOfError(ex); + countDownLatch.countDown(); } - // if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS ) // logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); diff --git a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java index d07bd104d..9483e4757 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java @@ -91,7 +91,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { super(EngineErrorHandlingTestProvider.class); this.expectedException = exceptedException; this.args = args; - this.iterationsToTest = args.equals("") ? 1 : 1; // TODO -- update to 1000 + this.iterationsToTest = args.equals("") ? 1 : 10; setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args)); } } @@ -103,7 +103,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { continue; // cannot reliably throw errors in TREE_REDUCE final String failArg = " -fail " + failMethod.name(); - for ( final String args : Arrays.asList("", " -nt 2") ) { // , " -nct 2") ) { + for ( final String args : Arrays.asList("", " -nt 2", " -nct 2") ) { new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args); new EngineErrorHandlingTestProvider(UserException.class, failArg + args); new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args); @@ -116,7 +116,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { // // Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type // - @Test(dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 ) + @Test(enabled = true, dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 ) public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) { for ( int i = 0; i < cfg.iterationsToTest; i++ ) { final String root = "-T ErrorThrowing -R " + exampleFASTA; diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 5f54303a9..6c59f1585 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -45,7 +46,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(queueSize); - final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); @@ -93,7 +94,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(); - final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(ip); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index dc8674d88..f267999e3 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -5,6 +5,7 @@ import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.testng.Assert; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -116,6 +117,12 @@ public class NanoSchedulerUnitTest extends BaseTest { } static NanoSchedulerBasicTest exampleTest = null; + + @BeforeSuite + public void setUp() throws Exception { + exampleTest = new NanoSchedulerBasicTest(10, 2, 1, 10, false); + } + @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { // for ( final int bufferSize : Arrays.asList(1, 10) ) { @@ -134,7 +141,7 @@ public class NanoSchedulerUnitTest extends BaseTest { for ( final int end : Arrays.asList(0, 1, 2, 11, 100, 10000, 100000) ) { for ( final boolean addDelays : Arrays.asList(true, false) ) { if ( end < 1000 ) - exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays); + new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays); } } } @@ -221,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest { nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); } - @Test(expectedExceptions = NullPointerException.class, timeOut = 1000) + @Test(expectedExceptions = NullPointerException.class, timeOut = 10000) public void testInputErrorIsThrown_NPE() throws InterruptedException { executeTestErrorThrowingInput(new NullPointerException()); } - @Test(expectedExceptions = NullPointerException.class, timeOut = 1000) + @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000) public void testInputErrorIsThrown_RSE() throws InterruptedException { executeTestErrorThrowingInput(new ReviewedStingException("test")); } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 2732d67d3..39133d1ed 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.Utils; import org.testng.Assert; @@ -92,7 +93,7 @@ public class ReducerUnitTest extends BaseTest { final List>> jobGroups = Utils.groupList(allJobs, groupSize); final ReduceSumTest reduce = new ReduceSumTest(); - final Reducer reducer = new Reducer(reduce, new SimpleTimer(), 0); + final Reducer reducer = new Reducer(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0); final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs)); final ExecutorService es = Executors.newSingleThreadExecutor(); @@ -154,7 +155,7 @@ public class ReducerUnitTest extends BaseTest { private void runSettingJobIDTwice() throws Exception { final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); - final Reducer reducer = new Reducer(new ReduceSumTest(), new SimpleTimer(), 0); + final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0); reducer.setTotalJobCount(10); reducer.setTotalJobCount(15);