Proper error handling in NanoScheduler

-- Renamed TraversalErrorManager to the more general MultiThreadedErrorTracker
-- ErrorTracker is now used throughout the NanoScheduler.  In order to properly handle errors, the work previously done by main thread (submit jobs, block on reduce) is now handled in a separate thread.  The main thread simply wakes up peroidically and checks whether the reduce result is available or if an error has occurred, and handles each appropriately.
-- EngineFeaturesIntegrationTest checks that -nt and -nct properly throw errors in Walkers
-- Added NanoSchedulerUnitTest for input errors
-- ThreadEfficiencyMonitoring is now disabled by default, and can be enabled with a GATK command line option.  This is because the monitoring doesn't differentiate between threads that are supposed to do work, and those that are supposed to wait, and therefore gives misleading results.
-- Build.xml no longer copies the unittest results verbosely
This commit is contained in:
Mark DePristo 2012-09-19 16:59:24 -04:00
parent 773af05980
commit 2267b722b2
13 changed files with 267 additions and 156 deletions

View File

@ -1179,7 +1179,7 @@
<!-- copy the report to our private_html directory for easy viewing in a broswer --> <!-- copy the report to our private_html directory for easy viewing in a broswer -->
<mkdir dir="${iwww.report.dir}/@{testtype}"/> <mkdir dir="${iwww.report.dir}/@{testtype}"/>
<copy todir="${iwww.report.dir}/@{testtype}" verbose="true"> <copy todir="${iwww.report.dir}/@{testtype}" verbose="false">
<fileset dir="@{outputdir}"/> <fileset dir="@{outputdir}"/>
</copy> </copy>

View File

@ -63,7 +63,6 @@ import org.broadinstitute.sting.utils.recalibration.BaseRecalibration;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
import java.io.File; import java.io.File;
import java.io.OutputStream;
import java.util.*; import java.util.*;
/** /**
@ -410,7 +409,7 @@ public class GenomeAnalysisEngine {
this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads, this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads,
argCollection.numberOfCPUThreadsPerDataThread, argCollection.numberOfCPUThreadsPerDataThread,
argCollection.numberOfIOThreads, argCollection.numberOfIOThreads,
! argCollection.disableEfficiencyMonitor); argCollection.monitorThreadEfficiency);
} }
public int getTotalNumberOfThreads() { public int getTotalNumberOfThreads() {

View File

@ -307,12 +307,12 @@ public class GATKArgumentCollection {
public int numberOfIOThreads = 0; public int numberOfIOThreads = 0;
/** /**
* By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny * Enable GATK to monitor its own threading efficiency, at a itsy-bitsy tiny
* cost (< 0.1%) in runtime because of turning on the JavaBean. This argument allows you * cost (< 0.1%) in runtime because of turning on the JavaBean. This is largely for
* to disable the monitor * debugging purposes.
*/ */
@Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false) @Argument(fullName = "monitorThreadEfficiency", shortName = "mte", doc = "Enable GATK threading efficiency monitoring", required = false)
public Boolean disableEfficiencyMonitor = false; public Boolean monitorThreadEfficiency = false;
@Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false) @Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false)
public Integer numberOfBAMFileHandles = null; public Integer numberOfBAMFileHandles = null;

View File

@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.TraversalErrorManager; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
@ -46,7 +46,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* An exception that's occurred in this traversal. If null, no exception has occurred. * An exception that's occurred in this traversal. If null, no exception has occurred.
*/ */
final TraversalErrorManager errorTracker = new TraversalErrorManager(); final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
/** /**
* Queue of incoming shards. * Queue of incoming shards.
@ -351,7 +351,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* Allows other threads to notify of an error during traversal. * Allows other threads to notify of an error during traversal.
*/ */
protected synchronized RuntimeException notifyOfTraversalError(Throwable error) { protected synchronized RuntimeException notifyOfTraversalError(Throwable error) {
return errorTracker.notifyOfTraversalError(error); return errorTracker.notifyOfError(error);
} }
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */

View File

@ -0,0 +1,80 @@
package org.broadinstitute.sting.utils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* A utility to track exceptions that occur across threads.
*
* Uses a notify mechanism so that multiple threads can tell the tracker that an
* error has occurred, and a master thread can monitor this object for an error
* occurring and take appropriate action. Only maintains the first
* error to reach the tracker.
*
* Refactored from HierarchicalMicroScheduler
*
* User: depristo
* Date: 9/19/12
* Time: 11:20 AM
*/
public class MultiThreadedErrorTracker {
/**
* An exception that's occurred. If null, no exception has occurred.
*/
private RuntimeException error = null;
/**
* Convenience function to check, and throw, an error is one is pending
*/
public synchronized void throwErrorIfPending() {
if (hasAnErrorOccurred())
throw getError();
}
/**
* Detects whether an execution error has occurred.
* @return True if an error has occurred. False otherwise.
*/
public synchronized boolean hasAnErrorOccurred() {
return error != null;
}
/**
* Retrieve the error that has occurred.
*
* @throws ReviewedStingException if no error has occurred.
* @return
*/
public synchronized RuntimeException getError() {
if(!hasAnErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
}
/**
* Notify this error tracker that an error has occurs. Only updates the tracked
* error if it is currently null (i.e., no error has been already reported). So
* calling this successively with multiple errors only keeps the first, which is the
* right thing to do as the initial failure is usually the meaningful one, but
* generates a cascade of failures as other subsystems fail.
*/
public synchronized RuntimeException notifyOfError(Throwable error) {
if ( this.error == null )
this.error = toRuntimeException(error);
return this.error;
}
/**
* Convert error to a Runtime exception, or keep as is if it already is one
*
* @param error the error that has occurred
* @return the potentially converted error
*/
private RuntimeException toRuntimeException(final Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
return (RuntimeException)error;
else
return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
}
}

View File

@ -1,53 +0,0 @@
package org.broadinstitute.sting.utils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 9/19/12
* Time: 11:20 AM
* To change this template use File | Settings | File Templates.
*/
public class TraversalErrorManager {
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
private RuntimeException error = null;
public synchronized void throwErrorIfPending() {
if (hasTraversalErrorOccurred())
throw getTraversalError();
}
/**
* Detects whether an execution error has occurred.
* @return True if an error has occurred. False otherwise.
*/
public synchronized boolean hasTraversalErrorOccurred() {
return error != null;
}
public synchronized RuntimeException getTraversalError() {
if(!hasTraversalErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
}
/**
* Allows other threads to notify of an error during traversal.
*/
public synchronized RuntimeException notifyOfTraversalError(Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
this.error = toRuntimeException(error);
return this.error;
}
private RuntimeException toRuntimeException(final Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
return (RuntimeException)error;
else
return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
}
}

View File

@ -1,8 +1,8 @@
package org.broadinstitute.sting.utils.nanoScheduler; package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -29,6 +29,8 @@ class InputProducer<InputType> implements Runnable {
*/ */
final BlockingQueue<InputValue> outputQueue; final BlockingQueue<InputValue> outputQueue;
final MultiThreadedErrorTracker errorTracker;
/** /**
* Have we read the last value from inputReader? * Have we read the last value from inputReader?
* *
@ -48,13 +50,16 @@ class InputProducer<InputType> implements Runnable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
public InputProducer(final Iterator<InputType> inputReader, public InputProducer(final Iterator<InputType> inputReader,
final MultiThreadedErrorTracker errorTracker,
final SimpleTimer inputTimer, final SimpleTimer inputTimer,
final BlockingQueue<InputValue> outputQueue) { final BlockingQueue<InputValue> outputQueue) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null");
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
this.inputReader = inputReader; this.inputReader = inputReader;
this.errorTracker = errorTracker;
this.inputTimer = inputTimer; this.inputTimer = inputTimer;
this.outputQueue = outputQueue; this.outputQueue = outputQueue;
} }
@ -129,8 +134,7 @@ class InputProducer<InputType> implements Runnable {
latch.countDown(); latch.countDown();
} catch (Exception ex) { } catch (Exception ex) {
logger.warn("Got exception " + ex); errorTracker.notifyOfError(ex);
throw new ReviewedStingException("got execution exception", ex);
} }
} }

View File

@ -3,7 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures; import com.google.java.contract.Ensures;
import com.google.java.contract.Requires; import com.google.java.contract.Requires;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.threading.NamedThreadFactory; import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
import java.util.Iterator; import java.util.Iterator;
@ -48,8 +48,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int bufferSize; final int bufferSize;
final int nThreads; final int nThreads;
final ExecutorService inputExecutor; final ExecutorService inputExecutor;
final ExecutorService masterExecutor;
final ExecutorService mapExecutor; final ExecutorService mapExecutor;
final Semaphore runningMapJobSlots; final Semaphore runningMapJobSlots;
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
boolean shutdown = false; boolean shutdown = false;
boolean debug = false; boolean debug = false;
@ -83,13 +85,14 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
this.nThreads = nThreads; this.nThreads = nThreads;
if ( nThreads == 1 ) { if ( nThreads == 1 ) {
this.mapExecutor = this.inputExecutor = null; this.mapExecutor = this.inputExecutor = this.masterExecutor = null;
runningMapJobSlots = null; runningMapJobSlots = null;
} else { } else {
this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d"));
runningMapJobSlots = new Semaphore(this.bufferSize); runningMapJobSlots = new Semaphore(this.bufferSize);
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
} }
// start timing the time spent outside of the nanoScheduler // start timing the time spent outside of the nanoScheduler
@ -128,6 +131,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( nThreads > 1 ) { if ( nThreads > 1 ) {
shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor); shutdownExecutor("mapExecutor", mapExecutor);
shutdownExecutor("masterExecutor", masterExecutor);
} }
shutdown = true; shutdown = true;
@ -309,6 +313,66 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final NSReduceFunction<MapType, ReduceType> reduce) { final NSReduceFunction<MapType, ReduceType> reduce) {
debugPrint("Executing nanoScheduler"); debugPrint("Executing nanoScheduler");
// start up the master job
final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce);
final Future<ReduceType> reduceResult = masterExecutor.submit(masterJob);
while ( true ) {
// check that no errors occurred while we were waiting
handleErrors();
try {
final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
// in case an error occurred in the reduce
handleErrors();
// return our final reduce result
return result;
} catch (final TimeoutException ex ) {
// a normal case -- we just aren't done
} catch (final InterruptedException ex) {
errorTracker.notifyOfError(ex);
// will handle error in the next round of the for loop
} catch (final ExecutionException ex) {
errorTracker.notifyOfError(ex);
// will handle error in the next round of the for loop
}
}
}
private void handleErrors() {
if ( errorTracker.hasAnErrorOccurred() ) {
masterExecutor.shutdownNow();
mapExecutor.shutdownNow();
inputExecutor.shutdownNow();
errorTracker.throwErrorIfPending();
}
}
/**
* MasterJob has the task to enqueue Map jobs and wait for the final reduce
*
* It must be run in a separate thread in order to properly handle errors that may occur
* in the input, map, or reduce jobs without deadlocking.
*
* The result of this callable is the final reduce value for the input / map / reduce jobs
*/
private class MasterJob implements Callable<ReduceType> {
final Iterator<InputType> inputReader;
final NSMapFunction<InputType, MapType> map;
final ReduceType initialValue;
final NSReduceFunction<MapType, ReduceType> reduce;
private MasterJob(Iterator<InputType> inputReader, NSMapFunction<InputType, MapType> map, ReduceType initialValue, NSReduceFunction<MapType, ReduceType> reduce) {
this.inputReader = inputReader;
this.map = map;
this.initialValue = initialValue;
this.reduce = reduce;
}
@Override
public ReduceType call() {
// a blocking queue that limits the number of input datum to the requested buffer size // a blocking queue that limits the number of input datum to the requested buffer size
// note we need +1 because we continue to enqueue the lastObject // note we need +1 because we continue to enqueue the lastObject
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
@ -316,7 +380,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// Create the input producer and start it running // Create the input producer and start it running
final InputProducer<InputType> inputProducer = final InputProducer<InputType> inputProducer =
new InputProducer<InputType>(inputReader, myNSRuntimeProfile.inputTimer, inputQueue); new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
inputExecutor.submit(inputProducer); inputExecutor.submit(inputProducer);
// a priority queue that stores up to bufferSize elements // a priority queue that stores up to bufferSize elements
@ -325,7 +389,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
new PriorityBlockingQueue<MapResult<MapType>>(); new PriorityBlockingQueue<MapResult<MapType>>();
final Reducer<MapType, ReduceType> reducer final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, myNSRuntimeProfile.reduceTimer, initialValue); = new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
try { try {
int nSubmittedJobs = 0; int nSubmittedJobs = 0;
@ -345,8 +409,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// wait for all of the input and map threads to finish // wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer); return waitForCompletion(inputProducer, reducer);
} catch (Exception ex) { } catch (Exception ex) {
logger.warn("Got exception " + ex); errorTracker.notifyOfError(ex);
throw new ReviewedStingException("got execution exception", ex); return initialValue;
} }
} }
@ -365,8 +429,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores // wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map"); // logger.warn("waiting on map");
runningMapJobSlots.acquire(this.bufferSize); runningMapJobSlots.acquire(bufferSize);
runningMapJobSlots.release(this.bufferSize); runningMapJobSlots.release(bufferSize);
// everything is finally shutdown, return the final reduce value // everything is finally shutdown, return the final reduce value
return finalSum; return finalSum;
@ -389,6 +453,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int nReadItems = inputProducer.getNumInputValues(); final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems; return nReadItems == -1 || nJobsSubmitted < nReadItems;
} }
}
private class MapReduceJob implements Runnable { private class MapReduceJob implements Runnable {
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue; final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue;
@ -444,8 +509,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Exception ex) { } catch (Exception ex) {
logger.warn("Got exception " + ex); errorTracker.notifyOfError(ex);
throw new ReviewedStingException("got execution exception", ex);
} finally { } finally {
// we finished a map job, release the job queue semaphore // we finished a map job, release the job queue semaphore
runningMapJobSlots.release(); runningMapJobSlots.release();

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures; import com.google.java.contract.Ensures;
import com.google.java.contract.Requires; import com.google.java.contract.Requires;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.SimpleTimer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -34,6 +35,7 @@ class Reducer<MapType, ReduceType> {
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
final NSReduceFunction<MapType, ReduceType> reduce; final NSReduceFunction<MapType, ReduceType> reduce;
final SimpleTimer reduceTimer; final SimpleTimer reduceTimer;
final MultiThreadedErrorTracker errorTracker;
/** /**
* The sum of the reduce function applied to all MapResults. After this Reducer * The sum of the reduce function applied to all MapResults. After this Reducer
@ -63,11 +65,14 @@ class Reducer<MapType, ReduceType> {
* @param initialSum the initial reduce sum * @param initialSum the initial reduce sum
*/ */
public Reducer(final NSReduceFunction<MapType, ReduceType> reduce, public Reducer(final NSReduceFunction<MapType, ReduceType> reduce,
final MultiThreadedErrorTracker errorTracker,
final SimpleTimer reduceTimer, final SimpleTimer reduceTimer,
final ReduceType initialSum) { final ReduceType initialSum) {
if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null");
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
this.errorTracker = errorTracker;
this.reduce = reduce; this.reduce = reduce;
this.reduceTimer = reduceTimer; this.reduceTimer = reduceTimer;
this.sum = initialSum; this.sum = initialSum;
@ -105,13 +110,13 @@ class Reducer<MapType, ReduceType> {
* @throws InterruptedException * @throws InterruptedException
*/ */
@Ensures("result >= 0") @Ensures("result >= 0")
public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) throws InterruptedException { public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReducesNow = 0; int nReducesNow = 0;
// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS ) // if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); // logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
try {
while ( reduceNextValueInQueue(mapResultQueue) ) { while ( reduceNextValueInQueue(mapResultQueue) ) {
final MapResult<MapType> result = mapResultQueue.take(); final MapResult<MapType> result = mapResultQueue.take();
prevJobID = result.getJobID(); prevJobID = result.getJobID();
@ -129,7 +134,10 @@ class Reducer<MapType, ReduceType> {
numJobsReduced++; numJobsReduced++;
maybeReleaseLatch(); maybeReleaseLatch();
} }
} catch (Exception ex) {
errorTracker.notifyOfError(ex);
countDownLatch.countDown();
}
// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS ) // if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); // logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());

View File

@ -91,7 +91,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
super(EngineErrorHandlingTestProvider.class); super(EngineErrorHandlingTestProvider.class);
this.expectedException = exceptedException; this.expectedException = exceptedException;
this.args = args; this.args = args;
this.iterationsToTest = args.equals("") ? 1 : 1; // TODO -- update to 1000 this.iterationsToTest = args.equals("") ? 1 : 10;
setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args)); setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args));
} }
} }
@ -103,7 +103,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
continue; // cannot reliably throw errors in TREE_REDUCE continue; // cannot reliably throw errors in TREE_REDUCE
final String failArg = " -fail " + failMethod.name(); final String failArg = " -fail " + failMethod.name();
for ( final String args : Arrays.asList("", " -nt 2") ) { // , " -nct 2") ) { for ( final String args : Arrays.asList("", " -nt 2", " -nct 2") ) {
new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args); new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args);
new EngineErrorHandlingTestProvider(UserException.class, failArg + args); new EngineErrorHandlingTestProvider(UserException.class, failArg + args);
new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args); new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args);
@ -116,7 +116,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
// //
// Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type // Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type
// //
@Test(dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 ) @Test(enabled = true, dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 )
public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) { public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) {
for ( int i = 0; i < cfg.iterationsToTest; i++ ) { for ( int i = 0; i < cfg.iterationsToTest; i++ ) {
final String root = "-T ErrorThrowing -R " + exampleFASTA; final String root = "-T ErrorThrowing -R " + exampleFASTA;

View File

@ -1,6 +1,7 @@
package org.broadinstitute.sting.utils.nanoScheduler; package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
@ -45,7 +46,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue = final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize); new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue); final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor(); final ExecutorService es = Executors.newSingleThreadExecutor();
@ -93,7 +94,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue = final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(); new LinkedBlockingDeque<InputProducer<Integer>.InputValue>();
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue); final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor(); final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(ip); es.submit(ip);

View File

@ -5,6 +5,7 @@ import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -116,6 +117,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
} }
static NanoSchedulerBasicTest exampleTest = null; static NanoSchedulerBasicTest exampleTest = null;
@BeforeSuite
public void setUp() throws Exception {
exampleTest = new NanoSchedulerBasicTest(10, 2, 1, 10, false);
}
@DataProvider(name = "NanoSchedulerBasicTest") @DataProvider(name = "NanoSchedulerBasicTest")
public Object[][] createNanoSchedulerBasicTest() { public Object[][] createNanoSchedulerBasicTest() {
// for ( final int bufferSize : Arrays.asList(1, 10) ) { // for ( final int bufferSize : Arrays.asList(1, 10) ) {
@ -134,7 +141,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
for ( final int end : Arrays.asList(0, 1, 2, 11, 100, 10000, 100000) ) { for ( final int end : Arrays.asList(0, 1, 2, 11, 100, 10000, 100000) ) {
for ( final boolean addDelays : Arrays.asList(true, false) ) { for ( final boolean addDelays : Arrays.asList(true, false) ) {
if ( end < 1000 ) if ( end < 1000 )
exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays); new NanoSchedulerBasicTest(bufferSize, nt, start, end, addDelays);
} }
} }
} }
@ -221,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
} }
@Test(expectedExceptions = NullPointerException.class, timeOut = 1000) @Test(expectedExceptions = NullPointerException.class, timeOut = 10000)
public void testInputErrorIsThrown_NPE() throws InterruptedException { public void testInputErrorIsThrown_NPE() throws InterruptedException {
executeTestErrorThrowingInput(new NullPointerException()); executeTestErrorThrowingInput(new NullPointerException());
} }
@Test(expectedExceptions = NullPointerException.class, timeOut = 1000) @Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000)
public void testInputErrorIsThrown_RSE() throws InterruptedException { public void testInputErrorIsThrown_RSE() throws InterruptedException {
executeTestErrorThrowingInput(new ReviewedStingException("test")); executeTestErrorThrowingInput(new ReviewedStingException("test"));
} }

View File

@ -1,6 +1,7 @@
package org.broadinstitute.sting.utils.nanoScheduler; package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert; import org.testng.Assert;
@ -92,7 +93,7 @@ public class ReducerUnitTest extends BaseTest {
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize); final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
final ReduceSumTest reduce = new ReduceSumTest(); final ReduceSumTest reduce = new ReduceSumTest();
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new SimpleTimer(), 0); final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs)); final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs));
final ExecutorService es = Executors.newSingleThreadExecutor(); final ExecutorService es = Executors.newSingleThreadExecutor();
@ -154,7 +155,7 @@ public class ReducerUnitTest extends BaseTest {
private void runSettingJobIDTwice() throws Exception { private void runSettingJobIDTwice() throws Exception {
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>(); final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new SimpleTimer(), 0); final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
reducer.setTotalJobCount(10); reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15); reducer.setTotalJobCount(15);