From 773af05980e8d15f4006fc4f135bcd1df18f86f8 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 19 Sep 2012 11:39:49 -0400 Subject: [PATCH] Intermediate commit for proper error handling in the NanoScheduler -- Refactored error handling from HMS into utils.TraversalErrorManager, which is now used by HMS and will be usable by NanoScheduler -- Generalized EngineFeaturesIntegrationTest to test map / reduce error throwing for nt 1, nt 2 and nct 2 (disabled) -- Added unit tests for failing input iterator in NanoScheduler (fails) -- Made ErrorThrowing NanoScheduable --- .../executive/HierarchicalMicroScheduler.java | 38 +++---------- .../sting/gatk/walkers/qc/ErrorThrowing.java | 15 ++++-- .../sting/utils/TraversalErrorManager.java | 53 +++++++++++++++++++ .../gatk/EngineFeaturesIntegrationTest.java | 29 ++++++---- .../nanoScheduler/NanoSchedulerUnitTest.java | 28 ++++++++++ 5 files changed, 115 insertions(+), 48 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 1bac72f3e..0ddced502 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.utils.TraversalErrorManager; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; @@ -45,7 +46,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * An exception that's occurred in this traversal. If null, no exception has occurred. */ - private RuntimeException error = null; + final TraversalErrorManager errorTracker = new TraversalErrorManager(); /** * Queue of incoming shards. @@ -112,8 +113,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar while (isShardTraversePending() || isTreeReducePending()) { // Check for errors during execution. - if(hasTraversalErrorOccurred()) - throw getTraversalError(); + errorTracker.throwErrorIfPending(); // Too many files sitting around taking up space? Merge them. if (isMergeLimitExceeded()) @@ -130,8 +130,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar queueNextShardTraverse(walker, reduceTree); } - if(hasTraversalErrorOccurred()) - throw getTraversalError(); + errorTracker.throwErrorIfPending(); threadPool.shutdown(); @@ -147,7 +146,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar throw ex; } catch ( ExecutionException ex ) { // the thread died and we are failing to get the result, rethrow it as a runtime exception - throw toRuntimeException(ex.getCause()); + throw notifyOfTraversalError(ex.getCause()); } catch (Exception ex) { throw new ReviewedStingException("Unable to retrieve result", ex); } @@ -348,38 +347,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar return reducer; } - /** - * Detects whether an execution error has occurred. - * @return True if an error has occurred. False otherwise. - */ - private synchronized boolean hasTraversalErrorOccurred() { - return error != null; - } - - private synchronized RuntimeException getTraversalError() { - if(!hasTraversalErrorOccurred()) - throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); - return error; - } - /** * Allows other threads to notify of an error during traversal. */ protected synchronized RuntimeException notifyOfTraversalError(Throwable error) { - // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. - this.error = toRuntimeException(error); - return this.error; + return errorTracker.notifyOfTraversalError(error); } - private RuntimeException toRuntimeException(final Throwable error) { - // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. - if (error instanceof RuntimeException) - return (RuntimeException)error; - else - return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); - } - - /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { final private TreeReducer treeReducer; diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/ErrorThrowing.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/ErrorThrowing.java index d3ee4e832..2039b7394 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/ErrorThrowing.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/ErrorThrowing.java @@ -31,7 +31,8 @@ import org.broadinstitute.sting.gatk.CommandLineGATK; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; -import org.broadinstitute.sting.gatk.walkers.RodWalker; +import org.broadinstitute.sting.gatk.walkers.NanoSchedulable; +import org.broadinstitute.sting.gatk.walkers.RefWalker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -42,7 +43,7 @@ import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; */ @Hidden @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) -public class ErrorThrowing extends RodWalker implements TreeReducible { +public class ErrorThrowing extends RefWalker implements TreeReducible, NanoSchedulable { @Input(fullName="exception", shortName = "E", doc="Java class of exception to throw", required=true) public String exceptionToThrow; @@ -60,8 +61,12 @@ public class ErrorThrowing extends RodWalker implements TreeRed // @Override public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) { + if ( ref == null ) // only throw exception when we are in proper map, not special map(null) call + return null; + if ( failMethod == FailMethod.MAP ) fail(); + return 0; } @@ -72,15 +77,15 @@ public class ErrorThrowing extends RodWalker implements TreeRed @Override public Integer reduce(Integer value, Integer sum) { - if ( failMethod == FailMethod.REDUCE ) + if ( value != null && failMethod == FailMethod.REDUCE ) fail(); - return value + sum; + return sum; } public Integer treeReduce(final Integer lhs, final Integer rhs) { if ( failMethod == FailMethod.TREE_REDUCE ) fail(); - return lhs + rhs; + return rhs; } private void fail() { diff --git a/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java b/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java new file mode 100644 index 000000000..dd57950e0 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/TraversalErrorManager.java @@ -0,0 +1,53 @@ +package org.broadinstitute.sting.utils; + +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +/** + * Created with IntelliJ IDEA. + * User: depristo + * Date: 9/19/12 + * Time: 11:20 AM + * To change this template use File | Settings | File Templates. + */ +public class TraversalErrorManager { + /** + * An exception that's occurred in this traversal. If null, no exception has occurred. + */ + private RuntimeException error = null; + + public synchronized void throwErrorIfPending() { + if (hasTraversalErrorOccurred()) + throw getTraversalError(); + } + + /** + * Detects whether an execution error has occurred. + * @return True if an error has occurred. False otherwise. + */ + public synchronized boolean hasTraversalErrorOccurred() { + return error != null; + } + + public synchronized RuntimeException getTraversalError() { + if(!hasTraversalErrorOccurred()) + throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); + return error; + } + + /** + * Allows other threads to notify of an error during traversal. + */ + public synchronized RuntimeException notifyOfTraversalError(Throwable error) { + // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. + this.error = toRuntimeException(error); + return this.error; + } + + private RuntimeException toRuntimeException(final Throwable error) { + // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. + if (error instanceof RuntimeException) + return (RuntimeException)error; + else + return new ReviewedStingException("An error occurred during the traversal. Message=" + error.getMessage(), error); + } +} diff --git a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java index 5c4db08bd..d07bd104d 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/EngineFeaturesIntegrationTest.java @@ -25,6 +25,7 @@ package org.broadinstitute.sting.gatk; import org.broadinstitute.sting.WalkerTest; +import org.broadinstitute.sting.gatk.walkers.qc.ErrorThrowing; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; import org.testng.annotations.DataProvider; @@ -83,24 +84,30 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { private class EngineErrorHandlingTestProvider extends TestDataProvider { final Class expectedException; - final boolean multiThreaded; + final String args; final int iterationsToTest; - public EngineErrorHandlingTestProvider(Class exceptedException, final boolean multiThreaded) { + public EngineErrorHandlingTestProvider(Class exceptedException, final String args) { super(EngineErrorHandlingTestProvider.class); this.expectedException = exceptedException; - this.multiThreaded = multiThreaded; - this.iterationsToTest = multiThreaded ? 1000 : 1; - setName(String.format("Engine error handling: expected %s, is-multithreaded %b", exceptedException, multiThreaded)); + this.args = args; + this.iterationsToTest = args.equals("") ? 1 : 1; // TODO -- update to 1000 + setName(String.format("Engine error handling: expected %s with args %s", exceptedException, args)); } } @DataProvider(name = "EngineErrorHandlingTestProvider") public Object[][] makeEngineErrorHandlingTestProvider() { - for ( final boolean multiThreaded : Arrays.asList(true, false)) { - new EngineErrorHandlingTestProvider(NullPointerException.class, multiThreaded); - new EngineErrorHandlingTestProvider(UserException.class, multiThreaded); - new EngineErrorHandlingTestProvider(ReviewedStingException.class, multiThreaded); + for ( final ErrorThrowing.FailMethod failMethod : ErrorThrowing.FailMethod.values() ) { + if ( failMethod == ErrorThrowing.FailMethod.TREE_REDUCE ) + continue; // cannot reliably throw errors in TREE_REDUCE + + final String failArg = " -fail " + failMethod.name(); + for ( final String args : Arrays.asList("", " -nt 2") ) { // , " -nct 2") ) { + new EngineErrorHandlingTestProvider(NullPointerException.class, failArg + args); + new EngineErrorHandlingTestProvider(UserException.class, failArg + args); + new EngineErrorHandlingTestProvider(ReviewedStingException.class, failArg + args); + } } return EngineErrorHandlingTestProvider.getTests(EngineErrorHandlingTestProvider.class); @@ -109,11 +116,11 @@ public class EngineFeaturesIntegrationTest extends WalkerTest { // // Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type // - @Test(dataProvider = "EngineErrorHandlingTestProvider") + @Test(dataProvider = "EngineErrorHandlingTestProvider", timeOut = 60 * 1000 ) public void testEngineErrorHandlingTestProvider(final EngineErrorHandlingTestProvider cfg) { for ( int i = 0; i < cfg.iterationsToTest; i++ ) { final String root = "-T ErrorThrowing -R " + exampleFASTA; - final String args = root + (cfg.multiThreaded ? " -nt 2" : "") + " -E " + cfg.expectedException.getSimpleName(); + final String args = root + cfg.args + " -E " + cfg.expectedException.getSimpleName(); WalkerTestSpec spec = new WalkerTestSpec(args, 0, cfg.expectedException); executeTest(cfg.toString(), spec); } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index d9fe4ddd6..dc8674d88 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.BasicConfigurator; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -220,6 +221,33 @@ public class NanoSchedulerUnitTest extends BaseTest { nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); } + @Test(expectedExceptions = NullPointerException.class, timeOut = 1000) + public void testInputErrorIsThrown_NPE() throws InterruptedException { + executeTestErrorThrowingInput(new NullPointerException()); + } + + @Test(expectedExceptions = NullPointerException.class, timeOut = 1000) + public void testInputErrorIsThrown_RSE() throws InterruptedException { + executeTestErrorThrowingInput(new ReviewedStingException("test")); + } + + private void executeTestErrorThrowingInput(final RuntimeException ex) { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 2); + nanoScheduler.execute(new ErrorThrowingIterator(ex), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); + } + + private static class ErrorThrowingIterator implements Iterator { + final RuntimeException ex; + + private ErrorThrowingIterator(RuntimeException ex) { + this.ex = ex; + } + + @Override public boolean hasNext() { throw ex; } + @Override public Integer next() { throw ex; } + @Override public void remove() { throw ex; } + } + public static void main(String [ ] args) { org.apache.log4j.Logger logger = org.apache.log4j.Logger.getRootLogger(); BasicConfigurator.configure();