Merged bug fix from Stable into Unstable

This commit is contained in:
Mark DePristo 2012-07-25 23:14:15 -04:00
commit 1e8610b2c6
4 changed files with 23 additions and 16 deletions

View File

@ -17,10 +17,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.*;
/**
* A microscheduler that schedules shards according to a tree-like structure.
@ -118,6 +115,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
queueNextShardTraverse(walker, reduceTree);
}
if(hasTraversalErrorOccurred())
throw getTraversalError();
threadPool.shutdown();
// Merge any lingering output files. If these files aren't ready,
@ -128,11 +128,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
try {
result = reduceTree.getResult().get();
notifyTraversalDone(walker,result);
}
catch (ReviewedStingException ex) {
} catch (ReviewedStingException ex) {
throw ex;
}
catch (Exception ex) {
} catch ( ExecutionException ex ) {
// the thread died and we are failing to get the result, rethrow it as a runtime exception
throw toRuntimeException(ex.getCause());
} catch (Exception ex) {
throw new ReviewedStingException("Unable to retrieve result", ex);
}
@ -353,13 +354,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* Allows other threads to notify of an error during traversal.
*/
protected synchronized void notifyOfTraversalError(Throwable error) {
protected synchronized RuntimeException notifyOfTraversalError(Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
this.error = toRuntimeException(error);
return this.error;
}
private final RuntimeException toRuntimeException(final Throwable error) {
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
if (error instanceof RuntimeException)
this.error = (RuntimeException)error;
return (RuntimeException)error;
else
this.error = new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error);
}

View File

@ -81,8 +81,7 @@ public class ShardTraverser implements Callable {
return accumulator;
} catch(Throwable t) {
// Notify that an exception has occurred and rethrow it.
microScheduler.notifyOfTraversalError(t);
throw new RuntimeException(t);
throw microScheduler.notifyOfTraversalError(t);
} finally {
synchronized(this) {
complete = true;

View File

@ -95,6 +95,8 @@ public abstract class BaseTest {
public static final String keysDataLocation = validationDataLocation + "keys/";
public static final String gatkKeyFile = CryptUtils.GATK_USER_KEY_DIRECTORY + "gsamembers_broadinstitute.org.key";
public static final String exampleFASTA = publicTestDir + "exampleFASTA.fasta";
/** before the class starts up */
static {
// setup a basic log configuration

View File

@ -90,7 +90,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
super(EngineErrorHandlingTestProvider.class);
this.expectedException = exceptedException;
this.multiThreaded = multiThreaded;
this.iterationsToTest = multiThreaded ? 50 : 1;
this.iterationsToTest = multiThreaded ? 1000 : 1;
setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded));
}
}
@ -112,7 +112,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
@Test(dataProvider = "EngineErrorHandlingTestProvider")
public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) {
for ( int i = 0; i < cfg.iterationsToTest; i++ ) {
final String root = "-T ErrorThrowing -R " + b37KGReference;
final String root = "-T ErrorThrowing -R " + exampleFASTA;
final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName();
WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException);
executeTest(cfg.toString(), spec);