Merged bug fix from Stable into Unstable

This commit is contained in:
Mark DePristo 2012-04-13 09:25:43 -04:00
commit 2aa2d9aec0
5 changed files with 83 additions and 92 deletions

View File

@ -18,10 +18,7 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/** /**
* A microscheduler that schedules shards according to a tree-like structure. * A microscheduler that schedules shards according to a tree-like structure.
@ -44,11 +41,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>(); private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
/**
* An exception that's occurred in this traversal. If null, no exception has occurred.
*/
private RuntimeException error = null;
/** /**
* Queue of incoming shards. * Queue of incoming shards.
*/ */
@ -99,11 +91,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
ReduceTree reduceTree = new ReduceTree(this); ReduceTree reduceTree = new ReduceTree(this);
initializeWalker(walker); initializeWalker(walker);
//
// exception handling here is a bit complex. We used to catch and rethrow exceptions all over
// the place, but that just didn't work well. Now we have a specific execution exception (inner class)
// to use for multi-threading specific exceptions. All RuntimeExceptions that occur within the threads are rethrown
// up the stack as their underlying causes
//
while (isShardTraversePending() || isTreeReducePending()) { while (isShardTraversePending() || isTreeReducePending()) {
// Check for errors during execution.
if(hasTraversalErrorOccurred())
throw getTraversalError();
// Too many files sitting around taking up space? Merge them. // Too many files sitting around taking up space? Merge them.
if (isMergeLimitExceeded()) if (isMergeLimitExceeded())
mergeExistingOutput(false); mergeExistingOutput(false);
@ -130,12 +124,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
result = reduceTree.getResult().get(); result = reduceTree.getResult().get();
notifyTraversalDone(walker,result); notifyTraversalDone(walker,result);
} }
catch (ReviewedStingException ex) { catch( InterruptedException ex ) { handleException(ex); }
throw ex; catch( ExecutionException ex ) { handleException(ex); }
}
catch (Exception ex) {
throw new ReviewedStingException("Unable to retrieve result", ex);
}
// do final cleanup operations // do final cleanup operations
outputTracker.close(); outputTracker.close();
@ -338,32 +328,41 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
} }
/** /**
* Detects whether an execution error has occurred. * Handle an exception that occurred in a worker thread as needed by this scheduler.
* @return True if an error has occurred. False otherwise. *
* The way to use this function in a worker is:
*
* try { doSomeWork();
* catch ( InterruptedException ex ) { hms.handleException(ex); }
* catch ( ExecutionException ex ) { hms.handleException(ex); }
*
* @param ex the exception that occurred in the worker thread
*/ */
private synchronized boolean hasTraversalErrorOccurred() { protected final void handleException(InterruptedException ex) {
return error != null; throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce interrupted", ex);
}
private synchronized RuntimeException getTraversalError() {
if(!hasTraversalErrorOccurred())
throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists");
return error;
} }
/** /**
* Allows other threads to notify of an error during traversal. * Handle an exception that occurred in a worker thread as needed by this scheduler.
*
* The way to use this function in a worker is:
*
* try { doSomeWork();
* catch ( InterruptedException ex ) { hms.handleException(ex); }
* catch ( ExecutionException ex ) { hms.handleException(ex); }
*
* @param ex the exception that occurred in the worker thread
*/ */
protected synchronized void notifyOfTraversalError(Throwable error) { protected final void handleException(ExecutionException ex) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it. if ( ex.getCause() instanceof RuntimeException )
if (error instanceof RuntimeException) // if the cause was a runtime exception that's what we want to send up the stack
this.error = (RuntimeException)error; throw (RuntimeException )ex.getCause();
else else
this.error = new ReviewedStingException("An error occurred during the traversal.", error); throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce failed", ex);
} }
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
private class TreeReduceTask extends FutureTask { private class TreeReduceTask extends FutureTask {
private TreeReducer treeReducer = null; private TreeReducer treeReducer = null;
@ -382,6 +381,17 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
} }
} }
/**
* A specific exception class for HMS-specific failures such as
* Interrupted or ExecutionFailures that aren't clearly the fault
* of the underlying walker code
*/
public static class ExecutionFailure extends ReviewedStingException {
public ExecutionFailure(final String s, final Throwable throwable) {
super(s, throwable);
}
}
/** /**
* Used by the ShardTraverser to report time consumed traversing a given shard. * Used by the ShardTraverser to report time consumed traversing a given shard.
* *

View File

@ -27,16 +27,15 @@ import java.util.concurrent.Callable;
* Carries the walker over a given shard, in a callable interface. * Carries the walker over a given shard, in a callable interface.
*/ */
public class ShardTraverser implements Callable { public class ShardTraverser implements Callable {
private HierarchicalMicroScheduler microScheduler; final private HierarchicalMicroScheduler microScheduler;
private Walker walker; final private Walker walker;
private Shard shard; final private Shard shard;
private TraversalEngine traversalEngine; final private TraversalEngine traversalEngine;
private ThreadLocalOutputTracker outputTracker; final private ThreadLocalOutputTracker outputTracker;
private OutputMergeTask outputMergeTask; private OutputMergeTask outputMergeTask;
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(ShardTraverser.class); final protected static Logger logger = Logger.getLogger(ShardTraverser.class);
/** /**
* Is this traversal complete? * Is this traversal complete?
@ -58,11 +57,10 @@ public class ShardTraverser implements Callable {
public Object call() { public Object call() {
try { try {
traversalEngine.startTimersIfNecessary(); traversalEngine.startTimersIfNecessary();
long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
Object accumulator = walker.reduceInit(); Object accumulator = walker.reduceInit();
LocusWalker lWalker = (LocusWalker)walker; final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
microScheduler.getReadIterator(shard), microScheduler.getReadIterator(shard),
shard.getGenomeLocs(), shard.getGenomeLocs(),
microScheduler.engine.getSampleDB().getSampleNames()); // todo: microScheduler.engine is protected - is it okay to user it here? microScheduler.engine.getSampleDB().getSampleNames()); // todo: microScheduler.engine is protected - is it okay to user it here?
@ -76,18 +74,12 @@ public class ShardTraverser implements Callable {
windowMaker.close(); windowMaker.close();
outputMergeTask = outputTracker.closeStorage(); outputMergeTask = outputTracker.closeStorage();
long endTime = System.currentTimeMillis(); final long endTime = System.currentTimeMillis();
microScheduler.reportShardTraverseTime(endTime-startTime); microScheduler.reportShardTraverseTime(endTime-startTime);
return accumulator; return accumulator;
} } finally {
catch(Throwable t) {
// Notify that an exception has occurred and rethrow it.
microScheduler.notifyOfTraversalError(t);
throw new ReviewedStingException("An error has occurred during traversal",t);
}
finally {
synchronized(this) { synchronized(this) {
complete = true; complete = true;
notifyAll(); notifyAll();

View File

@ -25,20 +25,11 @@ import java.util.concurrent.Future;
* interface to force the reduce. * interface to force the reduce.
*/ */
public class TreeReducer implements Callable { public class TreeReducer implements Callable {
private HierarchicalMicroScheduler microScheduler; final private HierarchicalMicroScheduler microScheduler;
private TreeReducible walker; private TreeReducible walker;
private Future lhs; private Future lhs;
private Future rhs; private Future rhs;
/**
* Create a one-sided reduce. Result will be a simple pass-through of the result.
* @param microScheduler The parent hierarchical microscheduler for this reducer.
* @param lhs The one side of the reduce.
*/
public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs ) {
this( microScheduler, lhs, null );
}
/** /**
* Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future. * Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future.
* @param microScheduler The parent hierarchical microscheduler for this reducer. * @param microScheduler The parent hierarchical microscheduler for this reducer.
@ -67,10 +58,7 @@ public class TreeReducer implements Callable {
if( lhs == null ) if( lhs == null )
throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
if( rhs == null ) return lhs.isDone() && (rhs == null || rhs.isDone());
return lhs.isDone();
return lhs.isDone() && rhs.isDone();
} }
/** /**
@ -80,24 +68,21 @@ public class TreeReducer implements Callable {
public Object call() { public Object call() {
Object result = null; Object result = null;
long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
try { try {
if( lhs == null ) if( lhs == null )
result = lhs.get(); result = lhs.get();
// todo -- what the hell is this above line? Shouldn't it be the two below?
// if( lhs == null )
// throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
else else
result = walker.treeReduce( lhs.get(), rhs.get() ); result = walker.treeReduce( lhs.get(), rhs.get() );
} }
catch( InterruptedException ex ) { catch( InterruptedException ex ) { microScheduler.handleException(ex); }
microScheduler.notifyOfTraversalError(ex); catch( ExecutionException ex ) { microScheduler.handleException(ex); }
throw new ReviewedStingException("Hierarchical reduce interrupted", ex);
}
catch( ExecutionException ex ) {
microScheduler.notifyOfTraversalError(ex);
throw new ReviewedStingException("Hierarchical reduce failed", ex);
}
long endTime = System.currentTimeMillis(); final long endTime = System.currentTimeMillis();
// Constituent bits of this tree reduces are no longer required. Throw them away. // Constituent bits of this tree reduces are no longer required. Throw them away.
this.lhs = null; this.lhs = null;

View File

@ -26,18 +26,17 @@
package org.broadinstitute.sting; package org.broadinstitute.sting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.broad.tribble.FeatureCodec;
import org.broad.tribble.Tribble; import org.broad.tribble.Tribble;
import org.broad.tribble.index.Index; import org.broad.tribble.index.Index;
import org.broad.tribble.index.IndexFactory; import org.broad.tribble.index.IndexFactory;
import org.broadinstitute.sting.gatk.phonehome.GATKRunReport;
import org.broadinstitute.sting.utils.codecs.vcf.VCFCodec;
import org.broadinstitute.sting.gatk.CommandLineExecutable; import org.broadinstitute.sting.gatk.CommandLineExecutable;
import org.broadinstitute.sting.gatk.CommandLineGATK; import org.broadinstitute.sting.gatk.CommandLineGATK;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.gatk.phonehome.GATKRunReport;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.codecs.vcf.VCFCodec;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.StingException; import org.broadinstitute.sting.utils.exceptions.StingException;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
@ -315,9 +314,10 @@ public class WalkerTest extends BaseTest {
// it's the type we expected // it's the type we expected
System.out.println(String.format(" => %s PASSED", name)); System.out.println(String.format(" => %s PASSED", name));
} else { } else {
e.printStackTrace(); if ( e.getCause() != null )
Assert.fail(String.format("Test %s expected exception %s but got %s instead", e.getCause().printStackTrace(System.out); // must print to stdout to see the message
name, expectedException, e.getClass())); Assert.fail(String.format("Test %s expected exception %s but instead got %s with error message %s",
name, expectedException, e.getClass(), e.getMessage()));
} }
} else { } else {
// we didn't expect an exception but we got one :-( // we didn't expect an exception but we got one :-(

View File

@ -86,13 +86,15 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
private class EngineErrorHandlingTestProvider extends TestDataProvider { private class EngineErrorHandlingTestProvider extends TestDataProvider {
Class expectedException; final Class expectedException;
boolean multiThreaded; final boolean multiThreaded;
final int iterationsToTest;
public EngineErrorHandlingTestProvider(Class exceptedException, final boolean multiThreaded) { public EngineErrorHandlingTestProvider(Class exceptedException, final boolean multiThreaded) {
super(EngineErrorHandlingTestProvider.class); super(EngineErrorHandlingTestProvider.class);
this.expectedException = exceptedException; this.expectedException = exceptedException;
this.multiThreaded = multiThreaded; this.multiThreaded = multiThreaded;
this.iterationsToTest = multiThreaded ? 10 : 1;
setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded)); setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded));
} }
} }
@ -113,9 +115,11 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
// //
@Test(dataProvider = "EngineErrorHandlingTestProvider") @Test(dataProvider = "EngineErrorHandlingTestProvider")
public void testEngineErrorHandlingTestProvider(EngineErrorHandlingTestProvider cfg) { public void testEngineErrorHandlingTestProvider(EngineErrorHandlingTestProvider cfg) {
for ( int i = 0; i < cfg.iterationsToTest; i++ ) {
final String root = "-T ErrorThrowing -R " + b37KGReference; final String root = "-T ErrorThrowing -R " + b37KGReference;
final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName(); final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName();
WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException); WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException);
executeTest(cfg.toString(), spec); executeTest(cfg.toString(), spec);
} }
} }
}