From 8c418a15dae95184cea70c43d723e93536c2056c Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 25 Jul 2012 23:13:12 -0400 Subject: [PATCH] Sorting out HMS error handling (fingers crossed) -- Check if a traversal error occurred in the last shard -- Catch ExecutionException from the TreeReducer and throw as our HMS execption -- ShardTraverser just throws the exception as formatted by the HMS, rather than wrapping it as a RuntimeException itself -- EngineFeaturesIntegrationTests now uses public exampleFASTA (faster), and does 1000x iterations (slower) --- .../executive/HierarchicalMicroScheduler.java | 30 +++++++++++-------- .../sting/gatk/executive/ShardTraverser.java | 3 +- .../org/broadinstitute/sting/BaseTest.java | 2 ++ .../gatk/EngineFeaturesIntegrationTest.java | 4 +-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 3dc36dda2..70b1be0e1 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -17,10 +17,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; +import java.util.concurrent.*; /** * A microscheduler that schedules shards according to a tree-like structure. @@ -118,6 +115,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar queueNextShardTraverse(walker, reduceTree); } + if(hasTraversalErrorOccurred()) + throw getTraversalError(); + threadPool.shutdown(); // Merge any lingering output files. If these files aren't ready, @@ -128,11 +128,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar try { result = reduceTree.getResult().get(); notifyTraversalDone(walker,result); - } - catch (ReviewedStingException ex) { + } catch (ReviewedStingException ex) { throw ex; - } - catch (Exception ex) { + } catch ( ExecutionException ex ) { + // the thread died and we are failing to get the result, rethrow it as a runtime exception + throw toRuntimeException(ex.getCause()); + } catch (Exception ex) { throw new ReviewedStingException("Unable to retrieve result", ex); } @@ -353,13 +354,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Allows other threads to notify of an error during traversal. */ - protected synchronized void notifyOfTraversalError(Throwable error) { + protected synchronized RuntimeException notifyOfTraversalError(Throwable error) { + // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. + this.error = toRuntimeException(error); + return this.error; + } + + private final RuntimeException toRuntimeException(final Throwable error) { // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. if (error instanceof RuntimeException) - this.error = (RuntimeException)error; + return (RuntimeException)error; else - this.error = new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); - + return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 5ec52cdb8..aefa9c12d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -81,8 +81,7 @@ public class ShardTraverser implements Callable { return accumulator; } catch(Throwable t) { // Notify that an exception has occurred and rethrow it. - microScheduler.notifyOfTraversalError(t); - throw new RuntimeException(t); + throw microScheduler.notifyOfTraversalError(t); } finally { synchronized(this) { complete = true; diff --git a/public/java/test/org/broadinstitute/sting/BaseTest.java b/public/java/test/org/broadinstitute/sting/BaseTest.java index 86b7e60ff..af4891856 100755 --- a/public/java/test/org/broadinstitute/sting/BaseTest.java +++ b/public/java/test/org/broadinstitute/sting/BaseTest.java @@ -95,6 +95,8 @@ public abstract class BaseTest { public static final String keysDataLocation = validationDataLocation + "keys/"; public static final String gatkKeyFile = CryptUtils.GATK_USER_KEY_DIRECTORY + "gsamembers_broadinstitute.org.key"; + public static final String exampleFASTA = publicTestDir + "exampleFASTA.fasta"; + /** before the class starts up */ static { // setup a basic log configuration diff --git a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java index f1acfda20..5c4db08bd 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java @@ -90,7 +90,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { super(EngineErrorHandlingTestProvider.class); this.expectedException = exceptedException; this.multiThreaded = multiThreaded; - this.iterationsToTest = multiThreaded ? 50 : 1; + this.iterationsToTest = multiThreaded ? 1000 : 1; setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded)); } } @@ -112,7 +112,7 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { @Test(dataProvider = "EngineErrorHandlingTestProvider") public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) { for ( int i = 0; i < cfg.iterationsToTest; i++ ) { - final String root = "-T ErrorThrowing -R " + b37KGReference; + final String root = "-T ErrorThrowing -R " + exampleFASTA; final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName(); WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException); executeTest(cfg.toString(), spec);