More improvements to exception handling during multithreaded runs based on
a bug reported by Ryan. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3855 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
83798225ac
commit
e9d243babb
|
|
@ -52,6 +52,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
private final Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
private final Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
||||||
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An exception that's occurred in this traversal. If null, no exception has occurred.
|
||||||
|
*/
|
||||||
|
private Throwable error = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keep a queue of shard traversals, and constantly monitor it to see what output
|
* Keep a queue of shard traversals, and constantly monitor it to see what output
|
||||||
* merge tasks remain.
|
* merge tasks remain.
|
||||||
|
|
@ -126,6 +131,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
totalTraversals = traverseTasks.size();
|
totalTraversals = traverseTasks.size();
|
||||||
|
|
||||||
while (isShardTraversePending() || isTreeReducePending()) {
|
while (isShardTraversePending() || isTreeReducePending()) {
|
||||||
|
// Check for errors during execution.
|
||||||
|
if(hasTraversalErrorOccurred())
|
||||||
|
throw new StingException("An error has occurred during the traversal.",getTraversalError());
|
||||||
|
|
||||||
// Too many files sitting around taking up space? Merge them.
|
// Too many files sitting around taking up space? Merge them.
|
||||||
if (isMergeLimitExceeded())
|
if (isMergeLimitExceeded())
|
||||||
mergeExistingOutput(false);
|
mergeExistingOutput(false);
|
||||||
|
|
@ -343,6 +352,27 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
return reducer;
|
return reducer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Detects whether an execution error has occurred.
|
||||||
|
* @return True if an error has occurred. False otherwise.
|
||||||
|
*/
|
||||||
|
private synchronized boolean hasTraversalErrorOccurred() {
|
||||||
|
return error != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized Throwable getTraversalError() {
|
||||||
|
if(!hasTraversalErrorOccurred())
|
||||||
|
throw new StingException("User has attempted to retrieve a traversal error when none exists");
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows other threads to notify of an error during traversal.
|
||||||
|
*/
|
||||||
|
protected synchronized void notifyOfTraversalError(Throwable error) {
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
|
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
|
||||||
private class TreeReduceTask extends FutureTask {
|
private class TreeReduceTask extends FutureTask {
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,11 @@ public class ShardTraverser implements Callable {
|
||||||
|
|
||||||
return accumulator;
|
return accumulator;
|
||||||
}
|
}
|
||||||
|
catch(Throwable t) {
|
||||||
|
// Notify that an exception has occurred and rethrow it.
|
||||||
|
microScheduler.notifyOfTraversalError(t);
|
||||||
|
throw new StingException("An error has occurred during traversal",t);
|
||||||
|
}
|
||||||
finally {
|
finally {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
complete = true;
|
complete = true;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package org.broadinstitute.sting.gatk.executive;
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||||
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
@ -88,10 +89,12 @@ public class TreeReducer implements Callable {
|
||||||
result = walker.treeReduce( lhs.get(), rhs.get() );
|
result = walker.treeReduce( lhs.get(), rhs.get() );
|
||||||
}
|
}
|
||||||
catch( InterruptedException ex ) {
|
catch( InterruptedException ex ) {
|
||||||
throw new RuntimeException("Hierarchical reduce interrupted", ex);
|
microScheduler.notifyOfTraversalError(ex);
|
||||||
|
throw new StingException("Hierarchical reduce interrupted", ex);
|
||||||
}
|
}
|
||||||
catch( ExecutionException ex ) {
|
catch( ExecutionException ex ) {
|
||||||
throw new RuntimeException("Hierarchical reduce failed", ex);
|
microScheduler.notifyOfTraversalError(ex);
|
||||||
|
throw new StingException("Hierarchical reduce failed", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
long endTime = System.currentTimeMillis();
|
long endTime = System.currentTimeMillis();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue