diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 44f3730c6..543d2f6d9 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -52,6 +52,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar private final Queue traverseTasks = new LinkedList(); private final Queue reduceTasks = new LinkedList(); + /** + * An exception that's occurred in this traversal. If null, no exception has occurred. + */ + private Throwable error = null; + /** * Keep a queue of shard traversals, and constantly monitor it to see what output * merge tasks remain. @@ -126,6 +131,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar totalTraversals = traverseTasks.size(); while (isShardTraversePending() || isTreeReducePending()) { + // Check for errors during execution. + if(hasTraversalErrorOccurred()) + throw new StingException("An error has occurred during the traversal.",getTraversalError()); + // Too many files sitting around taking up space? Merge them. if (isMergeLimitExceeded()) mergeExistingOutput(false); @@ -343,6 +352,27 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar return reducer; } + /** + * Detects whether an execution error has occurred. + * @return True if an error has occurred. False otherwise. + */ + private synchronized boolean hasTraversalErrorOccurred() { + return error != null; + } + + private synchronized Throwable getTraversalError() { + if(!hasTraversalErrorOccurred()) + throw new StingException("User has attempted to retrieve a traversal error when none exists"); + return error; + } + + /** + * Allows other threads to notify of an error during traversal. + */ + protected synchronized void notifyOfTraversalError(Throwable error) { + this.error = error; + } + /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 40d194341..cf110d1e2 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -81,6 +81,11 @@ public class ShardTraverser implements Callable { return accumulator; } + catch(Throwable t) { + // Notify that an exception has occurred and rethrow it. + microScheduler.notifyOfTraversalError(t); + throw new StingException("An error has occurred during traversal",t); + } finally { synchronized(this) { complete = true; diff --git a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java index 4ea2ee447..5b49f762d 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.gatk.walkers.TreeReducible; +import org.broadinstitute.sting.utils.StingException; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -88,10 +89,12 @@ public class TreeReducer implements Callable { result = walker.treeReduce( lhs.get(), rhs.get() ); } catch( InterruptedException ex ) { - throw new RuntimeException("Hierarchical reduce interrupted", ex); + microScheduler.notifyOfTraversalError(ex); + throw new StingException("Hierarchical reduce interrupted", ex); } catch( ExecutionException ex ) { - throw new RuntimeException("Hierarchical reduce failed", ex); + microScheduler.notifyOfTraversalError(ex); + throw new StingException("Hierarchical reduce failed", ex); } long endTime = System.currentTimeMillis();