diff --git a/build.xml b/build.xml
index 0d1deba29..7e7415f08 100644
--- a/build.xml
+++ b/build.xml
@@ -1179,7 +1179,7 @@
-
+
diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java
index fc2546173..8071fe5dc 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java
@@ -63,7 +63,6 @@ import org.broadinstitute.sting.utils.recalibration.BaseRecalibration;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
import java.io.File;
-import java.io.OutputStream;
import java.util.*;
/**
@@ -410,7 +409,7 @@ public class GenomeAnalysisEngine {
this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads,
argCollection.numberOfCPUThreadsPerDataThread,
argCollection.numberOfIOThreads,
- ! argCollection.disableEfficiencyMonitor);
+ argCollection.monitorThreadEfficiency);
}
public int getTotalNumberOfThreads() {
diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java
index 44817379a..c8887b8b2 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java
@@ -307,12 +307,12 @@ public class GATKArgumentCollection {
public int numberOfIOThreads = 0;
/**
- * By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny
- * cost (< 0.1%) in runtime because of turning on the JavaBean. This argument allows you
- * to disable the monitor
+ * Enable GATK to monitor its own threading efficiency, at a itsy-bitsy tiny
+ * cost (< 0.1%) in runtime because of turning on the JavaBean. This is largely for
+ * debugging purposes.
*/
- @Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false)
- public Boolean disableEfficiencyMonitor = false;
+ @Argument(fullName = "monitorThreadEfficiency", shortName = "mte", doc = "Enable GATK threading efficiency monitoring", required = false)
+ public Boolean monitorThreadEfficiency = false;
@Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false)
public Integer numberOfBAMFileHandles = null;
diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java
index 0ddced502..01c4315f2 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java
@@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
-import org.broadinstitute.sting.utils.TraversalErrorManager;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
@@ -46,7 +46,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
- final TraversalErrorManager errorTracker = new TraversalErrorManager();
+ final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
/**
* Queue of incoming shards.
@@ -351,7 +351,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* Allows other threads to notify of an error during traversal.
*/
protected synchronized RuntimeException notifyOfTraversalError(Throwable error) {
- return errorTracker.notifyOfTraversalError(error);
+ return errorTracker.notifyOfError(error);
}
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
diff --git a/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java b/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java
new file mode 100644
index 000000000..98900031a
--- /dev/null
+++ b/public/java/src/org/broadinstitute/sting/utils/MultiThreadedErrorTracker.java
@@ -0,0 +1,80 @@
+package org.broadinstitute.sting.utils;
+
+import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
+
+/**
+ * A utility to track exceptions that occur across threads.
+ *
+ * Uses a notify mechanism so that multiple threads can tell the tracker that an
+ * error has occurred, and a master thread can monitor this object for an error
+ * occurring and take appropriate action. Only maintains the first
+ * error to reach the tracker.
+ *
+ * Refactored from HierarchicalMicroScheduler
+ *
+ * User: depristo
+ * Date: 9/19/12
+ * Time: 11:20 AM
+ */
+public class MultiThreadedErrorTracker {
+ /**
+ * An exception that's occurred. If null, no exception has occurred.
+ */
+ private RuntimeException error = null;
+
+ /**
+ * Convenience function to check, and throw, an error is one is pending
+ */
+ public synchronized void throwErrorIfPending() {
+ if (hasAnErrorOccurred())
+ throw getError();
+ }
+
+ /**
+ * Detects whether an execution error has occurred.
+ * @return True if an error has occurred. False otherwise.
+ */
+ public synchronized boolean hasAnErrorOccurred() {
+ return error != null;
+ }
+
+ /**
+ * Retrieve the error that has occurred.
+ *
+ * @throws ReviewedStingException if no error has occurred.
+ * @return
+ */
+ public synchronized RuntimeException getError() {
+ if(!hasAnErrorOccurred())
+ throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
+ return error;
+ }
+
+ /**
+ * Notify this error tracker that an error has occurs. Only updates the tracked
+ * error if it is currently null (i.e., no error has been already reported). So
+ * calling this successively with multiple errors only keeps the first, which is the
+ * right thing to do as the initial failure is usually the meaningful one, but
+ * generates a cascade of failures as other subsystems fail.
+ */
+ public synchronized RuntimeException notifyOfError(Throwable error) {
+ if ( this.error == null )
+ this.error = toRuntimeException(error);
+
+ return this.error;
+ }
+
+ /**
+ * Convert error to a Runtime exception, or keep as is if it already is one
+ *
+ * @param error the error that has occurred
+ * @return the potentially converted error
+ */
+ private RuntimeException toRuntimeException(final Throwable error) {
+ // If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
+ if (error instanceof RuntimeException)
+ return (RuntimeException)error;
+ else
+ return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
+ }
+}
diff --git a/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java b/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java
deleted file mode 100644
index dd57950e0..000000000
--- a/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.broadinstitute.sting.utils;
-
-import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
-
-/**
- * Created with IntelliJ IDEA.
- * User: depristo
- * Date: 9/19/12
- * Time: 11:20 AM
- * To change this template use File | Settings | File Templates.
- */
-public class TraversalErrorManager {
- /**
- * An exception that's occurred in this traversal. If null, no exception has occurred.
- */
- private RuntimeException error = null;
-
- public synchronized void throwErrorIfPending() {
- if (hasTraversalErrorOccurred())
- throw getTraversalError();
- }
-
- /**
- * Detects whether an execution error has occurred.
- * @return True if an error has occurred. False otherwise.
- */
- public synchronized boolean hasTraversalErrorOccurred() {
- return error != null;
- }
-
- public synchronized RuntimeException getTraversalError() {
- if(!hasTraversalErrorOccurred())
- throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
- return error;
- }
-
- /**
- * Allows other threads to notify of an error during traversal.
- */
- public synchronized RuntimeException notifyOfTraversalError(Throwable error) {
- // If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
- this.error = toRuntimeException(error);
- return this.error;
- }
-
- private RuntimeException toRuntimeException(final Throwable error) {
- // If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
- if (error instanceof RuntimeException)
- return (RuntimeException)error;
- else
- return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
- }
-}
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
index 0e337631c..bd99a9266 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
@@ -1,8 +1,8 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
-import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
@@ -29,6 +29,8 @@ class InputProducer implements Runnable {
*/
final BlockingQueue outputQueue;
+ final MultiThreadedErrorTracker errorTracker;
+
/**
* Have we read the last value from inputReader?
*
@@ -48,13 +50,16 @@ class InputProducer implements Runnable {
final CountDownLatch latch = new CountDownLatch(1);
public InputProducer(final Iterator inputReader,
+ final MultiThreadedErrorTracker errorTracker,
final SimpleTimer inputTimer,
final BlockingQueue outputQueue) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
+ if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null");
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
this.inputReader = inputReader;
+ this.errorTracker = errorTracker;
this.inputTimer = inputTimer;
this.outputQueue = outputQueue;
}
@@ -129,8 +134,7 @@ class InputProducer implements Runnable {
latch.countDown();
} catch (Exception ex) {
- logger.warn("Got exception " + ex);
- throw new ReviewedStingException("got execution exception", ex);
+ errorTracker.notifyOfError(ex);
}
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
index 31ce04074..b014695da 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
@@ -3,7 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
-import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
import java.util.Iterator;
@@ -48,8 +48,10 @@ public class NanoScheduler {
final int bufferSize;
final int nThreads;
final ExecutorService inputExecutor;
+ final ExecutorService masterExecutor;
final ExecutorService mapExecutor;
final Semaphore runningMapJobSlots;
+ final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
boolean shutdown = false;
boolean debug = false;
@@ -83,13 +85,14 @@ public class NanoScheduler {
this.nThreads = nThreads;
if ( nThreads == 1 ) {
- this.mapExecutor = this.inputExecutor = null;
+ this.mapExecutor = this.inputExecutor = this.masterExecutor = null;
runningMapJobSlots = null;
} else {
this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d"));
runningMapJobSlots = new Semaphore(this.bufferSize);
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
+ this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
}
// start timing the time spent outside of the nanoScheduler
@@ -128,6 +131,7 @@ public class NanoScheduler {
if ( nThreads > 1 ) {
shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor);
+ shutdownExecutor("masterExecutor", masterExecutor);
}
shutdown = true;
@@ -309,85 +313,146 @@ public class NanoScheduler {
final NSReduceFunction reduce) {
debugPrint("Executing nanoScheduler");
- // a blocking queue that limits the number of input datum to the requested buffer size
- // note we need +1 because we continue to enqueue the lastObject
- final BlockingQueue.InputValue> inputQueue
- = new LinkedBlockingDeque.InputValue>(bufferSize+1);
+ // start up the master job
+ final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce);
+ final Future reduceResult = masterExecutor.submit(masterJob);
- // Create the input producer and start it running
- final InputProducer inputProducer =
- new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue);
- inputExecutor.submit(inputProducer);
+ while ( true ) {
+ // check that no errors occurred while we were waiting
+ handleErrors();
- // a priority queue that stores up to bufferSize elements
- // produced by completed map jobs.
- final PriorityBlockingQueue> mapResultQueue =
- new PriorityBlockingQueue>();
+ try {
+ final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
- final Reducer reducer
- = new Reducer(reduce, myNSRuntimeProfile.reduceTimer, initialValue);
+ // in case an error occurred in the reduce
+ handleErrors();
- try {
- int nSubmittedJobs = 0;
-
- while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
- // acquire a slot to run a map job. Blocks if too many jobs are enqueued
- runningMapJobSlots.acquire();
-
- mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
- nSubmittedJobs++;
+ // return our final reduce result
+ return result;
+ } catch (final TimeoutException ex ) {
+ // a normal case -- we just aren't done
+ } catch (final InterruptedException ex) {
+ errorTracker.notifyOfError(ex);
+ // will handle error in the next round of the for loop
+ } catch (final ExecutionException ex) {
+ errorTracker.notifyOfError(ex);
+ // will handle error in the next round of the for loop
}
+ }
+ }
- // mark the last job id we've submitted so we now the id to wait for
- //logger.warn("setting jobs submitted to " + nSubmittedJobs);
- reducer.setTotalJobCount(nSubmittedJobs);
-
- // wait for all of the input and map threads to finish
- return waitForCompletion(inputProducer, reducer);
- } catch (Exception ex) {
- logger.warn("Got exception " + ex);
- throw new ReviewedStingException("got execution exception", ex);
+ private void handleErrors() {
+ if ( errorTracker.hasAnErrorOccurred() ) {
+ masterExecutor.shutdownNow();
+ mapExecutor.shutdownNow();
+ inputExecutor.shutdownNow();
+ errorTracker.throwErrorIfPending();
}
}
/**
- * Wait until the input thread and all map threads have completed running, and return the final reduce result
+ * MasterJob has the task to enqueue Map jobs and wait for the final reduce
+ *
+ * It must be run in a separate thread in order to properly handle errors that may occur
+ * in the input, map, or reduce jobs without deadlocking.
+ *
+ * The result of this callable is the final reduce value for the input / map / reduce jobs
*/
- private ReduceType waitForCompletion(final InputProducer inputProducer,
- final Reducer reducer) throws InterruptedException {
- // wait until we have a final reduce result
+ private class MasterJob implements Callable {
+ final Iterator inputReader;
+ final NSMapFunction map;
+ final ReduceType initialValue;
+ final NSReduceFunction reduce;
+
+ private MasterJob(Iterator inputReader, NSMapFunction map, ReduceType initialValue, NSReduceFunction reduce) {
+ this.inputReader = inputReader;
+ this.map = map;
+ this.initialValue = initialValue;
+ this.reduce = reduce;
+ }
+
+ @Override
+ public ReduceType call() {
+ // a blocking queue that limits the number of input datum to the requested buffer size
+ // note we need +1 because we continue to enqueue the lastObject
+ final BlockingQueue.InputValue> inputQueue
+ = new LinkedBlockingDeque.InputValue>(bufferSize+1);
+
+ // Create the input producer and start it running
+ final InputProducer inputProducer =
+ new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
+ inputExecutor.submit(inputProducer);
+
+ // a priority queue that stores up to bufferSize elements
+ // produced by completed map jobs.
+ final PriorityBlockingQueue> mapResultQueue =
+ new PriorityBlockingQueue>();
+
+ final Reducer reducer
+ = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
+
+ try {
+ int nSubmittedJobs = 0;
+
+ while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
+ // acquire a slot to run a map job. Blocks if too many jobs are enqueued
+ runningMapJobSlots.acquire();
+
+ mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
+ nSubmittedJobs++;
+ }
+
+ // mark the last job id we've submitted so we now the id to wait for
+ //logger.warn("setting jobs submitted to " + nSubmittedJobs);
+ reducer.setTotalJobCount(nSubmittedJobs);
+
+ // wait for all of the input and map threads to finish
+ return waitForCompletion(inputProducer, reducer);
+ } catch (Exception ex) {
+ errorTracker.notifyOfError(ex);
+ return initialValue;
+ }
+ }
+
+ /**
+ * Wait until the input thread and all map threads have completed running, and return the final reduce result
+ */
+ private ReduceType waitForCompletion(final InputProducer inputProducer,
+ final Reducer reducer) throws InterruptedException {
+ // wait until we have a final reduce result
// logger.warn("waiting for final reduce");
- final ReduceType finalSum = reducer.waitForFinalReduce();
+ final ReduceType finalSum = reducer.waitForFinalReduce();
- // now wait for the input provider thread to terminate
+ // now wait for the input provider thread to terminate
// logger.warn("waiting on inputProducer");
- inputProducer.waitForDone();
+ inputProducer.waitForDone();
- // wait for all the map threads to finish by acquiring and then releasing all map job semaphores
+ // wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map");
- runningMapJobSlots.acquire(this.bufferSize);
- runningMapJobSlots.release(this.bufferSize);
+ runningMapJobSlots.acquire(bufferSize);
+ runningMapJobSlots.release(bufferSize);
- // everything is finally shutdown, return the final reduce value
- return finalSum;
- }
+ // everything is finally shutdown, return the final reduce value
+ return finalSum;
+ }
- /**
- * Should we continue to submit jobs given the number of jobs already submitted and the
- * number of read items in inputProducer?
- *
- * We continue to submit jobs while inputProducer hasn't reached EOF or the number
- * of jobs we've enqueued isn't the number of read elements. This means that in
- * some cases we submit more jobs than total read elements (cannot know because of
- * multi-threading) so map jobs must handle the case where getNext() returns EOF.
- *
- * @param nJobsSubmitted
- * @param inputProducer
- * @return
- */
- private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) {
- final int nReadItems = inputProducer.getNumInputValues();
- return nReadItems == -1 || nJobsSubmitted < nReadItems;
+ /**
+ * Should we continue to submit jobs given the number of jobs already submitted and the
+ * number of read items in inputProducer?
+ *
+ * We continue to submit jobs while inputProducer hasn't reached EOF or the number
+ * of jobs we've enqueued isn't the number of read elements. This means that in
+ * some cases we submit more jobs than total read elements (cannot know because of
+ * multi-threading) so map jobs must handle the case where getNext() returns EOF.
+ *
+ * @param nJobsSubmitted
+ * @param inputProducer
+ * @return
+ */
+ private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) {
+ final int nReadItems = inputProducer.getNumInputValues();
+ return nReadItems == -1 || nJobsSubmitted < nReadItems;
+ }
}
private class MapReduceJob implements Runnable {
@@ -444,8 +509,7 @@ public class NanoScheduler {
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Exception ex) {
- logger.warn("Got exception " + ex);
- throw new ReviewedStingException("got execution exception", ex);
+ errorTracker.notifyOfError(ex);
} finally {
// we finished a map job, release the job queue semaphore
runningMapJobSlots.release();
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
index 428ab37fd..92c1018eb 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
@@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import java.util.concurrent.CountDownLatch;
@@ -34,6 +35,7 @@ class Reducer {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final NSReduceFunction reduce;
final SimpleTimer reduceTimer;
+ final MultiThreadedErrorTracker errorTracker;
/**
* The sum of the reduce function applied to all MapResults. After this Reducer
@@ -63,11 +65,14 @@ class Reducer {
* @param initialSum the initial reduce sum
*/
public Reducer(final NSReduceFunction reduce,
+ final MultiThreadedErrorTracker errorTracker,
final SimpleTimer reduceTimer,
final ReduceType initialSum) {
+ if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null");
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
+ this.errorTracker = errorTracker;
this.reduce = reduce;
this.reduceTimer = reduceTimer;
this.sum = initialSum;
@@ -105,31 +110,34 @@ class Reducer {
* @throws InterruptedException
*/
@Ensures("result >= 0")
- public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) throws InterruptedException {
+ public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) {
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReducesNow = 0;
// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
+ try {
+ while ( reduceNextValueInQueue(mapResultQueue) ) {
+ final MapResult result = mapResultQueue.take();
+ prevJobID = result.getJobID();
- while ( reduceNextValueInQueue(mapResultQueue) ) {
- final MapResult result = mapResultQueue.take();
- prevJobID = result.getJobID();
+ if ( ! result.isEOFMarker() ) {
+ nReducesNow++;
- if ( ! result.isEOFMarker() ) {
- nReducesNow++;
+ // apply reduce, keeping track of sum
+ reduceTimer.restart();
+ sum = reduce.apply(result.getValue(), sum);
+ reduceTimer.stop();
- // apply reduce, keeping track of sum
- reduceTimer.restart();
- sum = reduce.apply(result.getValue(), sum);
- reduceTimer.stop();
+ }
+ numJobsReduced++;
+ maybeReleaseLatch();
}
-
- numJobsReduced++;
- maybeReleaseLatch();
+ } catch (Exception ex) {
+ errorTracker.notifyOfError(ex);
+ countDownLatch.countDown();
}
-
// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
diff --git a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java
index d07bd104d..9483e4757 100644
--- a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java
+++ b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java
@@ -91,7 +91,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
super(EngineErrorHandlingTestProvider.class);
this.expectedException = exceptedException;
this.args = args;
- this.iterationsToTest = args.equals("") ? 1 : 1; // TODO -- update to 1000
+ this.iterationsToTest = args.equals("") ? 1 : 10;
setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args));
}
}
@@ -103,7 +103,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
continue; // cannot reliably throw errors in TREE_REDUCE
final String failArg = " -fail " + failMethod.name();
- for ( final String args : Arrays.asList("", " -nt 2") ) { // , " -nct 2") ) {
+ for ( final String args : Arrays.asList("", " -nt 2", " -nct 2") ) {
new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args);
new EngineErrorHandlingTestProvider(UserException.class, failArg + args);
new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args);
@@ -116,7 +116,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
//
// Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type
//
- @Test(dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 )
+ @Test(enabled = true, dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 )
public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) {
for ( int i = 0; i < cfg.iterationsToTest; i++ ) {
final String root = "-T ErrorThrowing -R " + exampleFASTA;
diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java
index 5f54303a9..6c59f1585 100644
--- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java
+++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java
@@ -1,6 +1,7 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -45,7 +46,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque.InputValue> readQueue =
new LinkedBlockingDeque.InputValue>(queueSize);
- final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue);
+ final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor();
@@ -93,7 +94,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque.InputValue> readQueue =
new LinkedBlockingDeque.InputValue>();
- final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue);
+ final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(ip);
diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java
index dc8674d88..f267999e3 100644
--- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java
+++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java
@@ -5,6 +5,7 @@ import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -116,6 +117,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
}
static NanoSchedulerBasicTest exampleTest = null;
+
+ @BeforeSuite
+ public void setUp() throws Exception {
+ exampleTest = new NanoSchedulerBasicTest(10, 2, 1, 10, false);
+ }
+
@DataProvider(name = "NanoSchedulerBasicTest")
public Object[][] createNanoSchedulerBasicTest() {
// for ( final int bufferSize : Arrays.asList(1, 10) ) {
@@ -134,7 +141,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
for ( final int end : Arrays.asList(0, 1, 2, 11, 100, 10000, 100000) ) {
for ( final boolean addDelays : Arrays.asList(true, false) ) {
if ( end < 1000 )
- exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays);
+ new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays);
}
}
}
@@ -221,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
}
- @Test(expectedExceptions = NullPointerException.class, timeOut = 1000)
+ @Test(expectedExceptions = NullPointerException.class, timeOut = 10000)
public void testInputErrorIsThrown_NPE() throws InterruptedException {
executeTestErrorThrowingInput(new NullPointerException());
}
- @Test(expectedExceptions = NullPointerException.class, timeOut = 1000)
+ @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000)
public void testInputErrorIsThrown_RSE() throws InterruptedException {
executeTestErrorThrowingInput(new ReviewedStingException("test"));
}
diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java
index 2732d67d3..39133d1ed 100644
--- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java
+++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java
@@ -1,6 +1,7 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
+import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert;
@@ -92,7 +93,7 @@ public class ReducerUnitTest extends BaseTest {
final List>> jobGroups = Utils.groupList(allJobs, groupSize);
final ReduceSumTest reduce = new ReduceSumTest();
- final Reducer reducer = new Reducer(reduce, new SimpleTimer(), 0);
+ final Reducer reducer = new Reducer(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs));
final ExecutorService es = Executors.newSingleThreadExecutor();
@@ -154,7 +155,7 @@ public class ReducerUnitTest extends BaseTest {
private void runSettingJobIDTwice() throws Exception {
final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>();
- final Reducer reducer = new Reducer(new ReduceSumTest(), new SimpleTimer(), 0);
+ final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15);