Reverting old commits that made error handling better because ultimately they made things worse.
This commit is contained in:
parent
66b409921a
commit
e0c07f5567
|
|
@ -11,13 +11,17 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
|||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.StingException;
|
||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
|
||||
/**
|
||||
* A microscheduler that schedules shards according to a tree-like structure.
|
||||
|
|
@ -40,6 +44,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
|
||||
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||
|
||||
/**
|
||||
* An exception that's occurred in this traversal. If null, no exception has occurred.
|
||||
*/
|
||||
private RuntimeException error = null;
|
||||
|
||||
/**
|
||||
* Queue of incoming shards.
|
||||
*/
|
||||
|
|
@ -90,13 +99,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
ReduceTree reduceTree = new ReduceTree(this);
|
||||
initializeWalker(walker);
|
||||
|
||||
//
|
||||
// exception handling here is a bit complex. We used to catch and rethrow exceptions all over
|
||||
// the place, but that just didn't work well. Now we have a specific execution exception (inner class)
|
||||
// to use for multi-threading specific exceptions. All RuntimeExceptions that occur within the threads are rethrown
|
||||
// up the stack as their underlying causes
|
||||
//
|
||||
while (isShardTraversePending() || isTreeReducePending()) {
|
||||
// Check for errors during execution.
|
||||
if(hasTraversalErrorOccurred())
|
||||
throw getTraversalError();
|
||||
|
||||
// Too many files sitting around taking up space? Merge them.
|
||||
if (isMergeLimitExceeded())
|
||||
mergeExistingOutput(false);
|
||||
|
|
@ -123,8 +130,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
result = reduceTree.getResult().get();
|
||||
notifyTraversalDone(walker,result);
|
||||
}
|
||||
catch( InterruptedException ex ) { handleException(ex); }
|
||||
catch( ExecutionException ex ) { handleException(ex); }
|
||||
catch (ReviewedStingException ex) {
|
||||
throw ex;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new ReviewedStingException("Unable to retrieve result", ex);
|
||||
}
|
||||
|
||||
// do final cleanup operations
|
||||
outputTracker.close();
|
||||
|
|
@ -255,8 +266,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
// Specifically catch Tribble I/O exceptions and rethrow them as Reviewed. We don't expect
|
||||
// any issues here because we created the Tribble output file mere moments ago and expect it to
|
||||
// be completely valid.
|
||||
final String reason = ex.getMessage();
|
||||
throw new ReviewedStingException("Unable to merge temporary Tribble output file" + (reason == null ? "." : (" (" + reason + ").")), ex);
|
||||
throw new ReviewedStingException("Unable to merge temporary Tribble output file.",ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -328,39 +338,30 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
}
|
||||
|
||||
/**
|
||||
* Handle an exception that occurred in a worker thread as needed by this scheduler.
|
||||
*
|
||||
* The way to use this function in a worker is:
|
||||
*
|
||||
* try { doSomeWork();
|
||||
* catch ( InterruptedException ex ) { hms.handleException(ex); }
|
||||
* catch ( ExecutionException ex ) { hms.handleException(ex); }
|
||||
*
|
||||
* @param ex the exception that occurred in the worker thread
|
||||
* Detects whether an execution error has occurred.
|
||||
* @return True if an error has occurred. False otherwise.
|
||||
*/
|
||||
protected final void handleException(InterruptedException ex) {
|
||||
throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce interrupted", ex);
|
||||
private synchronized boolean hasTraversalErrorOccurred() {
|
||||
return error != null;
|
||||
}
|
||||
|
||||
private synchronized RuntimeException getTraversalError() {
|
||||
if(!hasTraversalErrorOccurred())
|
||||
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
|
||||
return error;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an exception that occurred in a worker thread as needed by this scheduler.
|
||||
*
|
||||
* The way to use this function in a worker is:
|
||||
*
|
||||
* try { doSomeWork();
|
||||
* catch ( InterruptedException ex ) { hms.handleException(ex); }
|
||||
* catch ( ExecutionException ex ) { hms.handleException(ex); }
|
||||
*
|
||||
* @param ex the exception that occurred in the worker thread
|
||||
* Allows other threads to notify of an error during traversal.
|
||||
*/
|
||||
protected final void handleException(ExecutionException ex) {
|
||||
if ( ex.getCause() instanceof RuntimeException )
|
||||
// if the cause was a runtime exception that's what we want to send up the stack
|
||||
throw (RuntimeException )ex.getCause();
|
||||
protected synchronized void notifyOfTraversalError(Throwable error) {
|
||||
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
|
||||
if (error instanceof RuntimeException)
|
||||
this.error = (RuntimeException)error;
|
||||
else
|
||||
throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce failed", ex);
|
||||
}
|
||||
this.error = new ReviewedStingException("An error occurred during the traversal.", error);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
|
||||
|
|
@ -381,17 +382,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A specific exception class for HMS-specific failures such as
|
||||
* Interrupted or ExecutionFailures that aren't clearly the fault
|
||||
* of the underlying walker code
|
||||
*/
|
||||
public static class ExecutionFailure extends ReviewedStingException {
|
||||
public ExecutionFailure(final String s, final Throwable throwable) {
|
||||
super(s, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by the ShardTraverser to report time consumed traversing a given shard.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -80,8 +80,8 @@ public class ShardTraverser implements Callable {
|
|||
|
||||
return accumulator;
|
||||
} catch(Throwable t) {
|
||||
// Notify that an exception has occurred
|
||||
microScheduler.handleException(new ExecutionException(t));
|
||||
// Notify that an exception has occurred and rethrow it.
|
||||
microScheduler.notifyOfTraversalError(t);
|
||||
throw new RuntimeException(t);
|
||||
} finally {
|
||||
synchronized(this) {
|
||||
|
|
|
|||
|
|
@ -79,8 +79,14 @@ public class TreeReducer implements Callable {
|
|||
else
|
||||
result = walker.treeReduce( lhs.get(), rhs.get() );
|
||||
}
|
||||
catch( InterruptedException ex ) { microScheduler.handleException(ex); }
|
||||
catch( ExecutionException ex ) { microScheduler.handleException(ex); }
|
||||
catch( InterruptedException ex ) {
|
||||
microScheduler.notifyOfTraversalError(ex);
|
||||
throw new ReviewedStingException("Hierarchical reduce interrupted", ex);
|
||||
}
|
||||
catch( ExecutionException ex ) {
|
||||
microScheduler.notifyOfTraversalError(ex);
|
||||
throw new ReviewedStingException("Hierarchical reduce failed", ex);
|
||||
}
|
||||
|
||||
final long endTime = System.currentTimeMillis();
|
||||
|
||||
|
|
|
|||
|
|
@ -58,13 +58,11 @@ import java.util.ArrayList;
|
|||
* of poor base quality. This walker generates tables based on various user-specified covariates (such as read group,
|
||||
* reported quality score, cycle, and dinucleotide). Since there is a large amount of data one can then calculate an empirical
|
||||
* probability of error given the particular covariates seen at this site, where p(error) = num mismatches / num observations.
|
||||
* The output file is a CSV list of (the several covariate values, num observations, num mismatches, empirical quality score).
|
||||
* The output file is a table (of the several covariate values, num observations, num mismatches, empirical quality score).
|
||||
* <p>
|
||||
* Note: ReadGroupCovariate and QualityScoreCovariate are required covariates and will be added for the user regardless of whether or not they were specified.
|
||||
*
|
||||
* <p>
|
||||
* See the GATK wiki for a tutorial and example recalibration accuracy plots.
|
||||
* http://www.broadinstitute.org/gsa/wiki/index.php/Base_quality_score_recalibration
|
||||
*
|
||||
* <h2>Input</h2>
|
||||
* <p>
|
||||
|
|
|
|||
Loading…
Reference in New Issue