From 27e7e17dc766c3dbbd477a6cf04163eb8a41b4ea Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 13 Apr 2012 09:23:20 -0400 Subject: [PATCH] New way to handle exceptions in multi-threaded GATK -- HMS no longer tries to grab and throw all exceptions. Exceptions are just thrown directly now. -- Proper error handling is handled by functions in HMS, which are used by ShardTraverser and TreeReducer -- Better printing of stack traces in WalkerTest --- .../executive/HierarchicalMicroScheduler.java | 82 +++++++++++-------- .../sting/gatk/executive/ShardTraverser.java | 28 +++---- .../sting/gatk/executive/TreeReducer.java | 33 ++------ .../org/broadinstitute/sting/WalkerTest.java | 3 +- 4 files changed, 67 insertions(+), 79 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 1cea14a9d..b821b98e6 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -18,10 +18,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; +import java.util.concurrent.*; /** * A microscheduler that schedules shards according to a tree-like structure. @@ -44,11 +41,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar private final Queue reduceTasks = new LinkedList(); - /** - * An exception that's occurred in this traversal. If null, no exception has occurred. - */ - private RuntimeException error = null; - /** * Queue of incoming shards. */ @@ -99,11 +91,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); + // + // exception handling here is a bit complex. We used to catch and rethrow exceptions all over + // the place, but that just didn't work well. Now we have a specific execution exception (inner class) + // to use for multi-threading specific exceptions. All RuntimeExceptions that occur within the threads are rethrown + // up the stack as their underlying causes + // while (isShardTraversePending() || isTreeReducePending()) { - // Check for errors during execution. - if(hasTraversalErrorOccurred()) - throw getTraversalError(); - // Too many files sitting around taking up space? Merge them. if (isMergeLimitExceeded()) mergeExistingOutput(false); @@ -130,12 +124,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar result = reduceTree.getResult().get(); notifyTraversalDone(walker,result); } - catch (ReviewedStingException ex) { - throw ex; - } - catch (Exception ex) { - throw new ReviewedStingException("Unable to retrieve result", ex); - } + catch( InterruptedException ex ) { handleException(ex); } + catch( ExecutionException ex ) { handleException(ex); } // do final cleanup operations outputTracker.close(); @@ -338,32 +328,41 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } /** - * Detects whether an execution error has occurred. - * @return True if an error has occurred. False otherwise. + * Handle an exception that occurred in a worker thread as needed by this scheduler. + * + * The way to use this function in a worker is: + * + * try { doSomeWork(); + * catch ( InterruptedException ex ) { hms.handleException(ex); } + * catch ( ExecutionException ex ) { hms.handleException(ex); } + * + * @param ex the exception that occurred in the worker thread */ - private synchronized boolean hasTraversalErrorOccurred() { - return error != null; - } - - private synchronized RuntimeException getTraversalError() { - if(!hasTraversalErrorOccurred()) - throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); - return error; + protected final void handleException(InterruptedException ex) { + throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce interrupted", ex); } /** - * Allows other threads to notify of an error during traversal. + * Handle an exception that occurred in a worker thread as needed by this scheduler. + * + * The way to use this function in a worker is: + * + * try { doSomeWork(); + * catch ( InterruptedException ex ) { hms.handleException(ex); } + * catch ( ExecutionException ex ) { hms.handleException(ex); } + * + * @param ex the exception that occurred in the worker thread */ - protected synchronized void notifyOfTraversalError(Throwable error) { - // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. - if (error instanceof RuntimeException) - this.error = (RuntimeException)error; + protected final void handleException(ExecutionException ex) { + if ( ex.getCause() instanceof RuntimeException ) + // if the cause was a runtime exception that's what we want to send up the stack + throw (RuntimeException )ex.getCause(); else - this.error = new ReviewedStingException("An error occurred during the traversal.", error); - + throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce failed", ex); } + /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { private TreeReducer treeReducer = null; @@ -382,6 +381,17 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } } + /** + * A specific exception class for HMS-specific failures such as + * Interrupted or ExecutionFailures that aren't clearly the fault + * of the underlying walker code + */ + public static class ExecutionFailure extends ReviewedStingException { + public ExecutionFailure(final String s, final Throwable throwable) { + super(s, throwable); + } + } + /** * Used by the ShardTraverser to report time consumed traversing a given shard. * diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index badd39860..9920213a3 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -27,16 +27,15 @@ import java.util.concurrent.Callable; * Carries the walker over a given shard, in a callable interface. */ public class ShardTraverser implements Callable { - private HierarchicalMicroScheduler microScheduler; - private Walker walker; - private Shard shard; - private TraversalEngine traversalEngine; - private ThreadLocalOutputTracker outputTracker; + final private HierarchicalMicroScheduler microScheduler; + final private Walker walker; + final private Shard shard; + final private TraversalEngine traversalEngine; + final private ThreadLocalOutputTracker outputTracker; private OutputMergeTask outputMergeTask; /** our log, which we want to capture anything from this class */ - protected static Logger logger = Logger.getLogger(ShardTraverser.class); - + final protected static Logger logger = Logger.getLogger(ShardTraverser.class); /** * Is this traversal complete? @@ -58,11 +57,10 @@ public class ShardTraverser implements Callable { public Object call() { try { traversalEngine.startTimersIfNecessary(); - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); Object accumulator = walker.reduceInit(); - LocusWalker lWalker = (LocusWalker)walker; - WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(), + final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(), microScheduler.getReadIterator(shard), shard.getGenomeLocs(), microScheduler.engine.getSampleDB().getSampleNames()); // todo: microScheduler.engine is protected - is it okay to user it here? @@ -76,18 +74,12 @@ public class ShardTraverser implements Callable { windowMaker.close(); outputMergeTask = outputTracker.closeStorage(); - long endTime = System.currentTimeMillis(); + final long endTime = System.currentTimeMillis(); microScheduler.reportShardTraverseTime(endTime-startTime); return accumulator; - } - catch(Throwable t) { - // Notify that an exception has occurred and rethrow it. - microScheduler.notifyOfTraversalError(t); - throw new ReviewedStingException("An error has occurred during traversal",t); - } - finally { + } finally { synchronized(this) { complete = true; notifyAll(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java index 6acaadd50..fc8a89c64 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -25,20 +25,11 @@ import java.util.concurrent.Future; * interface to force the reduce. */ public class TreeReducer implements Callable { - private HierarchicalMicroScheduler microScheduler; + final private HierarchicalMicroScheduler microScheduler; private TreeReducible walker; private Future lhs; private Future rhs; - /** - * Create a one-sided reduce. Result will be a simple pass-through of the result. - * @param microScheduler The parent hierarchical microscheduler for this reducer. - * @param lhs The one side of the reduce. - */ - public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs ) { - this( microScheduler, lhs, null ); - } - /** * Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future. * @param microScheduler The parent hierarchical microscheduler for this reducer. @@ -67,10 +58,7 @@ public class TreeReducer implements Callable { if( lhs == null ) throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); - if( rhs == null ) - return lhs.isDone(); - - return lhs.isDone() && rhs.isDone(); + return lhs.isDone() && (rhs == null || rhs.isDone()); } /** @@ -80,24 +68,21 @@ public class TreeReducer implements Callable { public Object call() { Object result = null; - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); try { if( lhs == null ) result = lhs.get(); + // todo -- what the hell is this above line? Shouldn't it be the two below? +// if( lhs == null ) +// throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); else result = walker.treeReduce( lhs.get(), rhs.get() ); } - catch( InterruptedException ex ) { - microScheduler.notifyOfTraversalError(ex); - throw new ReviewedStingException("Hierarchical reduce interrupted", ex); - } - catch( ExecutionException ex ) { - microScheduler.notifyOfTraversalError(ex); - throw new ReviewedStingException("Hierarchical reduce failed", ex); - } + catch( InterruptedException ex ) { microScheduler.handleException(ex); } + catch( ExecutionException ex ) { microScheduler.handleException(ex); } - long endTime = System.currentTimeMillis(); + final long endTime = System.currentTimeMillis(); // Constituent bits of this tree reduces are no longer required. Throw them away. this.lhs = null; diff --git a/public/java/test/org/broadinstitute/sting/WalkerTest.java b/public/java/test/org/broadinstitute/sting/WalkerTest.java index 6e8689e82..f477fedc9 100755 --- a/public/java/test/org/broadinstitute/sting/WalkerTest.java +++ b/public/java/test/org/broadinstitute/sting/WalkerTest.java @@ -314,7 +314,8 @@ public class WalkerTest extends BaseTest { // it's the type we expected System.out.println(String.format(" => %s PASSED", name)); } else { - e.printStackTrace(System.out); // must print to stdout to see the message + if ( e.getCause() != null ) + e.getCause().printStackTrace(System.out); // must print to stdout to see the message Assert.fail(String.format("Test %s expected exception %s but instead got %s with error message %s", name, expectedException, e.getClass(), e.getMessage())); }