New way to handle exceptions in multi-threaded GATK

-- HMS no longer tries to grab and throw all exceptions.  Exceptions are just thrown directly now.
-- Proper error handling is handled by functions in HMS, which are used by ShardTraverser and TreeReducer
-- Better printing of stack traces in WalkerTest
This commit is contained in:
Mark DePristo 2012-04-13 09:23:20 -04:00
parent e85e9a8cf5
commit 27e7e17dc7
4 changed files with 67 additions and 79 deletions

View File

@ -18,10 +18,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.*;
/**
* A microscheduler that schedules shards according to a tree-like structure.
@ -44,11 +41,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
private RuntimeException error = null;
/**
* Queue of incoming shards.
*/
@ -99,11 +91,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
ReduceTree reduceTree = new ReduceTree(this);
initializeWalker(walker);
//
// exception handling here is a bit complex. We used to catch and rethrow exceptions all over
// the place, but that just didn't work well. Now we have a specific execution exception (inner class)
// to use for multi-threading specific exceptions. All RuntimeExceptions that occur within the threads are rethrown
// up the stack as their underlying causes
//
while (isShardTraversePending() || isTreeReducePending()) {
// Check for errors during execution.
if(hasTraversalErrorOccurred())
throw getTraversalError();
// Too many files sitting around taking up space? Merge them.
if (isMergeLimitExceeded())
mergeExistingOutput(false);
@ -130,12 +124,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
result = reduceTree.getResult().get();
notifyTraversalDone(walker,result);
}
catch (ReviewedStingException ex) {
throw ex;
}
catch (Exception ex) {
throw new ReviewedStingException("Unable to retrieve result", ex);
}
catch( InterruptedException ex ) { handleException(ex); }
catch( ExecutionException ex ) { handleException(ex); }
// do final cleanup operations
outputTracker.close();
@ -338,32 +328,41 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
}
/**
* Detects whether an execution error has occurred.
* @return True if an error has occurred. False otherwise.
* Handle an exception that occurred in a worker thread as needed by this scheduler.
*
* The way to use this function in a worker is:
*
* try { doSomeWork();
* catch ( InterruptedException ex ) { hms.handleException(ex); }
* catch ( ExecutionException ex ) { hms.handleException(ex); }
*
* @param ex the exception that occurred in the worker thread
*/
private synchronized boolean hasTraversalErrorOccurred() {
return error != null;
}
private synchronized RuntimeException getTraversalError() {
if(!hasTraversalErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
protected final void handleException(InterruptedException ex) {
throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce interrupted", ex);
}
/**
* Allows other threads to notify of an error during traversal.
* Handle an exception that occurred in a worker thread as needed by this scheduler.
*
* The way to use this function in a worker is:
*
* try { doSomeWork();
* catch ( InterruptedException ex ) { hms.handleException(ex); }
* catch ( ExecutionException ex ) { hms.handleException(ex); }
*
* @param ex the exception that occurred in the worker thread
*/
protected synchronized void notifyOfTraversalError(Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
this.error = (RuntimeException)error;
protected final void handleException(ExecutionException ex) {
if ( ex.getCause() instanceof RuntimeException )
// if the cause was a runtime exception that's what we want to send up the stack
throw (RuntimeException )ex.getCause();
else
this.error = new ReviewedStingException("An error occurred during the traversal.", error);
throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce failed", ex);
}
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
private class TreeReduceTask extends FutureTask {
private TreeReducer treeReducer = null;
@ -382,6 +381,17 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
}
}
/**
* A specific exception class for HMS-specific failures such as
* Interrupted or ExecutionFailures that aren't clearly the fault
* of the underlying walker code
*/
public static class ExecutionFailure extends ReviewedStingException {
public ExecutionFailure(final String s, final Throwable throwable) {
super(s, throwable);
}
}
/**
* Used by the ShardTraverser to report time consumed traversing a given shard.
*

View File

@ -27,16 +27,15 @@ import java.util.concurrent.Callable;
* Carries the walker over a given shard, in a callable interface.
*/
public class ShardTraverser implements Callable {
private HierarchicalMicroScheduler microScheduler;
private Walker walker;
private Shard shard;
private TraversalEngine traversalEngine;
private ThreadLocalOutputTracker outputTracker;
final private HierarchicalMicroScheduler microScheduler;
final private Walker walker;
final private Shard shard;
final private TraversalEngine traversalEngine;
final private ThreadLocalOutputTracker outputTracker;
private OutputMergeTask outputMergeTask;
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(ShardTraverser.class);
final protected static Logger logger = Logger.getLogger(ShardTraverser.class);
/**
* Is this traversal complete?
@ -58,11 +57,10 @@ public class ShardTraverser implements Callable {
public Object call() {
try {
traversalEngine.startTimersIfNecessary();
long startTime = System.currentTimeMillis();
final long startTime = System.currentTimeMillis();
Object accumulator = walker.reduceInit();
LocusWalker lWalker = (LocusWalker)walker;
WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
microScheduler.getReadIterator(shard),
shard.getGenomeLocs(),
microScheduler.engine.getSampleDB().getSampleNames()); // todo: microScheduler.engine is protected - is it okay to user it here?
@ -76,18 +74,12 @@ public class ShardTraverser implements Callable {
windowMaker.close();
outputMergeTask = outputTracker.closeStorage();
long endTime = System.currentTimeMillis();
final long endTime = System.currentTimeMillis();
microScheduler.reportShardTraverseTime(endTime-startTime);
return accumulator;
}
catch(Throwable t) {
// Notify that an exception has occurred and rethrow it.
microScheduler.notifyOfTraversalError(t);
throw new ReviewedStingException("An error has occurred during traversal",t);
}
finally {
} finally {
synchronized(this) {
complete = true;
notifyAll();

View File

@ -25,20 +25,11 @@ import java.util.concurrent.Future;
* interface to force the reduce.
*/
public class TreeReducer implements Callable {
private HierarchicalMicroScheduler microScheduler;
final private HierarchicalMicroScheduler microScheduler;
private TreeReducible walker;
private Future lhs;
private Future rhs;
/**
* Create a one-sided reduce. Result will be a simple pass-through of the result.
* @param microScheduler The parent hierarchical microscheduler for this reducer.
* @param lhs The one side of the reduce.
*/
public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs ) {
this( microScheduler, lhs, null );
}
/**
* Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future.
* @param microScheduler The parent hierarchical microscheduler for this reducer.
@ -67,10 +58,7 @@ public class TreeReducer implements Callable {
if( lhs == null )
throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
if( rhs == null )
return lhs.isDone();
return lhs.isDone() && rhs.isDone();
return lhs.isDone() && (rhs == null || rhs.isDone());
}
/**
@ -80,24 +68,21 @@ public class TreeReducer implements Callable {
public Object call() {
Object result = null;
long startTime = System.currentTimeMillis();
final long startTime = System.currentTimeMillis();
try {
if( lhs == null )
result = lhs.get();
// todo -- what the hell is this above line? Shouldn't it be the two below?
// if( lhs == null )
// throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
else
result = walker.treeReduce( lhs.get(), rhs.get() );
}
catch( InterruptedException ex ) {
microScheduler.notifyOfTraversalError(ex);
throw new ReviewedStingException("Hierarchical reduce interrupted", ex);
}
catch( ExecutionException ex ) {
microScheduler.notifyOfTraversalError(ex);
throw new ReviewedStingException("Hierarchical reduce failed", ex);
}
catch( InterruptedException ex ) { microScheduler.handleException(ex); }
catch( ExecutionException ex ) { microScheduler.handleException(ex); }
long endTime = System.currentTimeMillis();
final long endTime = System.currentTimeMillis();
// Constituent bits of this tree reduces are no longer required. Throw them away.
this.lhs = null;

View File

@ -314,7 +314,8 @@ public class WalkerTest extends BaseTest {
// it's the type we expected
System.out.println(String.format(" => %s PASSED", name));
} else {
e.printStackTrace(System.out); // must print to stdout to see the message
if ( e.getCause() != null )
e.getCause().printStackTrace(System.out); // must print to stdout to see the message
Assert.fail(String.format("Test %s expected exception %s but instead got %s with error message %s",
name, expectedException, e.getClass(), e.getMessage()));
}