Intermediate commit for proper error handling in the NanoScheduler

-- Refactored error handling from HMS into utils.TraversalErrorManager, which is now used by HMS and will be usable by NanoScheduler
-- Generalized EngineFeaturesIntegrationTest to test map / reduce error throwing for nt 1, nt 2 and nct 2 (disabled)
-- Added unit tests for failing input iterator in NanoScheduler (fails)
-- Made ErrorThrowing NanoScheduable
This commit is contained in:
Mark DePristo 2012-09-19 11:39:49 -04:00
parent eb24dc920a
commit 773af05980
5 changed files with 115 additions and 48 deletions

View File

@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.TraversalErrorManager;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
@ -45,7 +46,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
private RuntimeException error = null;
final TraversalErrorManager errorTracker = new TraversalErrorManager();
/**
* Queue of incoming shards.
@ -112,8 +113,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
while (isShardTraversePending() || isTreeReducePending()) {
// Check for errors during execution.
if(hasTraversalErrorOccurred())
throw getTraversalError();
errorTracker.throwErrorIfPending();
// Too many files sitting around taking up space? Merge them.
if (isMergeLimitExceeded())
@ -130,8 +130,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
queueNextShardTraverse(walker, reduceTree);
}
if(hasTraversalErrorOccurred())
throw getTraversalError();
errorTracker.throwErrorIfPending();
threadPool.shutdown();
@ -147,7 +146,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
throw ex;
} catch ( ExecutionException ex ) {
// the thread died and we are failing to get the result, rethrow it as a runtime exception
throw toRuntimeException(ex.getCause());
throw notifyOfTraversalError(ex.getCause());
} catch (Exception ex) {
throw new ReviewedStingException("Unable to retrieve result", ex);
}
@ -348,38 +347,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
return reducer;
}
/**
* Detects whether an execution error has occurred.
* @return True if an error has occurred. False otherwise.
*/
private synchronized boolean hasTraversalErrorOccurred() {
return error != null;
}
private synchronized RuntimeException getTraversalError() {
if(!hasTraversalErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
}
/**
* Allows other threads to notify of an error during traversal.
*/
protected synchronized RuntimeException notifyOfTraversalError(Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
this.error = toRuntimeException(error);
return this.error;
return errorTracker.notifyOfTraversalError(error);
}
private RuntimeException toRuntimeException(final Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
return (RuntimeException)error;
else
return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
}
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
private class TreeReduceTask extends FutureTask {
final private TreeReducer treeReducer;

View File

@ -31,7 +31,8 @@ import org.broadinstitute.sting.gatk.CommandLineGATK;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.RodWalker;
import org.broadinstitute.sting.gatk.walkers.NanoSchedulable;
import org.broadinstitute.sting.gatk.walkers.RefWalker;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -42,7 +43,7 @@ import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
*/
@Hidden
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
public class ErrorThrowing extends RodWalker<Integer,Integer> implements TreeReducible<Integer> {
public class ErrorThrowing extends RefWalker<Integer,Integer> implements TreeReducible<Integer>, NanoSchedulable {
@Input(fullName="exception", shortName = "E", doc="Java class of exception to throw", required=true)
public String exceptionToThrow;
@ -60,8 +61,12 @@ public class ErrorThrowing extends RodWalker<Integer,Integer> implements TreeRed
//
@Override
public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
if ( ref == null ) // only throw exception when we are in proper map, not special map(null) call
return null;
if ( failMethod == FailMethod.MAP )
fail();
return 0;
}
@ -72,15 +77,15 @@ public class ErrorThrowing extends RodWalker<Integer,Integer> implements TreeRed
@Override
public Integer reduce(Integer value, Integer sum) {
if ( failMethod == FailMethod.REDUCE )
if ( value != null && failMethod == FailMethod.REDUCE )
fail();
return value + sum;
return sum;
}
public Integer treeReduce(final Integer lhs, final Integer rhs) {
if ( failMethod == FailMethod.TREE_REDUCE )
fail();
return lhs + rhs;
return rhs;
}
private void fail() {

View File

@ -0,0 +1,53 @@
package org.broadinstitute.sting.utils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 9/19/12
* Time: 11:20 AM
* To change this template use File | Settings | File Templates.
*/
public class TraversalErrorManager {
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
private RuntimeException error = null;
public synchronized void throwErrorIfPending() {
if (hasTraversalErrorOccurred())
throw getTraversalError();
}
/**
* Detects whether an execution error has occurred.
* @return True if an error has occurred. False otherwise.
*/
public synchronized boolean hasTraversalErrorOccurred() {
return error != null;
}
public synchronized RuntimeException getTraversalError() {
if(!hasTraversalErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
}
/**
* Allows other threads to notify of an error during traversal.
*/
public synchronized RuntimeException notifyOfTraversalError(Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
this.error = toRuntimeException(error);
return this.error;
}
private RuntimeException toRuntimeException(final Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
return (RuntimeException)error;
else
return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
}
}

View File

@ -25,6 +25,7 @@
package org.broadinstitute.sting.gatk;
import org.broadinstitute.sting.WalkerTest;
import org.broadinstitute.sting.gatk.walkers.qc.ErrorThrowing;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.testng.annotations.DataProvider;
@ -83,24 +84,30 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
private class EngineErrorHandlingTestProvider extends TestDataProvider {
final Class expectedException;
final boolean multiThreaded;
final String args;
final int iterationsToTest;
public EngineErrorHandlingTestProvider(Class exceptedException, final boolean multiThreaded) {
public EngineErrorHandlingTestProvider(Class exceptedException, final String args) {
super(EngineErrorHandlingTestProvider.class);
this.expectedException = exceptedException;
this.multiThreaded = multiThreaded;
this.iterationsToTest = multiThreaded ? 1000 : 1;
setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded));
this.args = args;
this.iterationsToTest = args.equals("") ? 1 : 1; // TODO -- update to 1000
setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args));
}
}
@DataProvider(name = "EngineErrorHandlingTestProvider")
public Object[][] makeEngineErrorHandlingTestProvider() {
for ( final boolean multiThreaded : Arrays.asList(true, false)) {
new EngineErrorHandlingTestProvider(NullPointerException.class, multiThreaded);
new EngineErrorHandlingTestProvider(UserException.class, multiThreaded);
new EngineErrorHandlingTestProvider(ReviewedStingException.class, multiThreaded);
for ( final ErrorThrowing.FailMethod failMethod : ErrorThrowing.FailMethod.values() ) {
if ( failMethod == ErrorThrowing.FailMethod.TREE_REDUCE )
continue; // cannot reliably throw errors in TREE_REDUCE
final String failArg = " -fail " + failMethod.name();
for ( final String args : Arrays.asList("", " -nt 2") ) { // , " -nct 2") ) {
new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args);
new EngineErrorHandlingTestProvider(UserException.class, failArg + args);
new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args);
}
}
return EngineErrorHandlingTestProvider.getTests(EngineErrorHandlingTestProvider.class);
@ -109,11 +116,11 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
//
// Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type
//
@Test(dataProvider = "EngineErrorHandlingTestProvider")
@Test(dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 )
public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) {
for ( int i = 0; i < cfg.iterationsToTest; i++ ) {
final String root = "-T ErrorThrowing -R " + exampleFASTA;
final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName();
final String args = root + cfg.args + " -E " + cfg.expectedException.getSimpleName();
WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException);
executeTest(cfg.toString(), spec);
}

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.BasicConfigurator;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@ -220,6 +221,33 @@ public class NanoSchedulerUnitTest extends BaseTest {
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
}
@Test(expectedExceptions = NullPointerException.class, timeOut = 1000)
public void testInputErrorIsThrown_NPE() throws InterruptedException {
executeTestErrorThrowingInput(new NullPointerException());
}
@Test(expectedExceptions = NullPointerException.class, timeOut = 1000)
public void testInputErrorIsThrown_RSE() throws InterruptedException {
executeTestErrorThrowingInput(new ReviewedStingException("test"));
}
private void executeTestErrorThrowingInput(final RuntimeException ex) {
final NanoScheduler<Integer, Integer, Integer> nanoScheduler = new NanoScheduler<Integer, Integer, Integer>(1, 2);
nanoScheduler.execute(new ErrorThrowingIterator(ex), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
}
private static class ErrorThrowingIterator implements Iterator<Integer> {
final RuntimeException ex;
private ErrorThrowingIterator(RuntimeException ex) {
this.ex = ex;
}
@Override public boolean hasNext() { throw ex; }
@Override public Integer next() { throw ex; }
@Override public void remove() { throw ex; }
}
public static void main(String [ ] args) {
org.apache.log4j.Logger logger = org.apache.log4j.Logger.getRootLogger();
BasicConfigurator.configure();