From e0c07f5567efdab5c8c9fe64e9f2d2fb03a4f90a Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Wed, 25 Jul 2012 12:37:59 -0400 Subject: [PATCH 1/3] Reverting old commits that made error handling better because ultimately they made things worse. --- .../executive/HierarchicalMicroScheduler.java | 86 ++++++++----------- .../sting/gatk/executive/ShardTraverser.java | 4 +- .../sting/gatk/executive/TreeReducer.java | 10 ++- .../gatk/walkers/bqsr/BaseRecalibrator.java | 4 +- 4 files changed, 49 insertions(+), 55 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 5f20ac7af..1cea14a9d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,13 +11,17 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.StingException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; /** * A microscheduler that schedules shards according to a tree-like structure. @@ -40,6 +44,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar private final Queue reduceTasks = new LinkedList(); + /** + * An exception that's occurred in this traversal. If null, no exception has occurred. + */ + private RuntimeException error = null; + /** * Queue of incoming shards. */ @@ -90,13 +99,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); - // - // exception handling here is a bit complex. We used to catch and rethrow exceptions all over - // the place, but that just didn't work well. Now we have a specific execution exception (inner class) - // to use for multi-threading specific exceptions. All RuntimeExceptions that occur within the threads are rethrown - // up the stack as their underlying causes - // while (isShardTraversePending() || isTreeReducePending()) { + // Check for errors during execution. + if(hasTraversalErrorOccurred()) + throw getTraversalError(); + // Too many files sitting around taking up space? Merge them. if (isMergeLimitExceeded()) mergeExistingOutput(false); @@ -123,8 +130,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar result = reduceTree.getResult().get(); notifyTraversalDone(walker,result); } - catch( InterruptedException ex ) { handleException(ex); } - catch( ExecutionException ex ) { handleException(ex); } + catch (ReviewedStingException ex) { + throw ex; + } + catch (Exception ex) { + throw new ReviewedStingException("Unable to retrieve result", ex); + } // do final cleanup operations outputTracker.close(); @@ -255,8 +266,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar // Specifically catch Tribble I/O exceptions and rethrow them as Reviewed. We don't expect // any issues here because we created the Tribble output file mere moments ago and expect it to // be completely valid. - final String reason = ex.getMessage(); - throw new ReviewedStingException("Unable to merge temporary Tribble output file" + (reason == null ? "." : (" (" + reason + ").")), ex); + throw new ReviewedStingException("Unable to merge temporary Tribble output file.",ex); } } } @@ -328,39 +338,30 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } /** - * Handle an exception that occurred in a worker thread as needed by this scheduler. - * - * The way to use this function in a worker is: - * - * try { doSomeWork(); - * catch ( InterruptedException ex ) { hms.handleException(ex); } - * catch ( ExecutionException ex ) { hms.handleException(ex); } - * - * @param ex the exception that occurred in the worker thread + * Detects whether an execution error has occurred. + * @return True if an error has occurred. False otherwise. */ - protected final void handleException(InterruptedException ex) { - throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce interrupted", ex); + private synchronized boolean hasTraversalErrorOccurred() { + return error != null; + } + + private synchronized RuntimeException getTraversalError() { + if(!hasTraversalErrorOccurred()) + throw new ReviewedStingException("User has attempted to retrieve a traversal error when none exists"); + return error; } /** - * Handle an exception that occurred in a worker thread as needed by this scheduler. - * - * The way to use this function in a worker is: - * - * try { doSomeWork(); - * catch ( InterruptedException ex ) { hms.handleException(ex); } - * catch ( ExecutionException ex ) { hms.handleException(ex); } - * - * @param ex the exception that occurred in the worker thread + * Allows other threads to notify of an error during traversal. */ - protected final void handleException(ExecutionException ex) { - if ( ex.getCause() instanceof RuntimeException ) - // if the cause was a runtime exception that's what we want to send up the stack - throw (RuntimeException )ex.getCause(); + protected synchronized void notifyOfTraversalError(Throwable error) { + // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. + if (error instanceof RuntimeException) + this.error = (RuntimeException)error; else - throw new HierarchicalMicroScheduler.ExecutionFailure("Hierarchical reduce failed", ex); - } + this.error = new ReviewedStingException("An error occurred during the traversal.", error); + } /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ @@ -381,17 +382,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } } - /** - * A specific exception class for HMS-specific failures such as - * Interrupted or ExecutionFailures that aren't clearly the fault - * of the underlying walker code - */ - public static class ExecutionFailure extends ReviewedStingException { - public ExecutionFailure(final String s, final Throwable throwable) { - super(s, throwable); - } - } - /** * Used by the ShardTraverser to report time consumed traversing a given shard. * diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index bb0344848..5ec52cdb8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -80,8 +80,8 @@ public class ShardTraverser implements Callable { return accumulator; } catch(Throwable t) { - // Notify that an exception has occurred - microScheduler.handleException(new ExecutionException(t)); + // Notify that an exception has occurred and rethrow it. + microScheduler.notifyOfTraversalError(t); throw new RuntimeException(t); } finally { synchronized(this) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java index fc8a89c64..632638f64 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -79,8 +79,14 @@ public class TreeReducer implements Callable { else result = walker.treeReduce( lhs.get(), rhs.get() ); } - catch( InterruptedException ex ) { microScheduler.handleException(ex); } - catch( ExecutionException ex ) { microScheduler.handleException(ex); } + catch( InterruptedException ex ) { + microScheduler.notifyOfTraversalError(ex); + throw new ReviewedStingException("Hierarchical reduce interrupted", ex); + } + catch( ExecutionException ex ) { + microScheduler.notifyOfTraversalError(ex); + throw new ReviewedStingException("Hierarchical reduce failed", ex); + } final long endTime = System.currentTimeMillis(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java index 0b1f2c478..9f5429fb9 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java @@ -58,13 +58,11 @@ import java.util.ArrayList; * of poor base quality. This walker generates tables based on various user-specified covariates (such as read group, * reported quality score, cycle, and dinucleotide). Since there is a large amount of data one can then calculate an empirical * probability of error given the particular covariates seen at this site, where p(error) = num mismatches / num observations. - * The output file is a CSV list of (the several covariate values, num observations, num mismatches, empirical quality score). + * The output file is a table (of the several covariate values, num observations, num mismatches, empirical quality score). *

* Note: ReadGroupCovariate and QualityScoreCovariate are required covariates and will be added for the user regardless of whether or not they were specified. * *

- * See the GATK wiki for a tutorial and example recalibration accuracy plots. - * http://www.broadinstitute.org/gsa/wiki/index.php/Base_quality_score_recalibration * *

Input

*

From a5721a8846034bd97be63c9149b124822e79b2b2 Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Wed, 25 Jul 2012 13:38:07 -0400 Subject: [PATCH 2/3] Context covariate optimizations were not suited for multiple threads, so I removed them (since that ended up being much, much easier than trying to make the covariates thread local). Added -nt 2 layer to BQSR integration tests to confirm that it now works with multiple threads. --- .../walkers/bqsr/BQSRIntegrationTest.java | 25 +++++++++++++------ .../executive/HierarchicalMicroScheduler.java | 1 - .../gatk/walkers/bqsr/BaseRecalibrator.java | 8 +++--- .../gatk/walkers/bqsr/ContextCovariate.java | 25 ++++++++----------- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/protected/java/test/org/broadinstitute/sting/gatk/walkers/bqsr/BQSRIntegrationTest.java b/protected/java/test/org/broadinstitute/sting/gatk/walkers/bqsr/BQSRIntegrationTest.java index 1d506b88c..e33c8c6f0 100644 --- a/protected/java/test/org/broadinstitute/sting/gatk/walkers/bqsr/BQSRIntegrationTest.java +++ b/protected/java/test/org/broadinstitute/sting/gatk/walkers/bqsr/BQSRIntegrationTest.java @@ -28,6 +28,17 @@ public class BQSRIntegrationTest extends WalkerTest { this.md5 = md5; } + public String getCommandLine() { + return " -T BaseRecalibrator" + + " -R " + reference + + " -I " + bam + + " -L " + interval + + args + + " --no_plots" + + " -knownSites " + (reference.equals(b36KGReference) ? b36dbSNP129 : hg18dbSNP132) + + " -o %s"; + } + @Override public String toString() { return String.format("BQSR(bam='%s', args='%s')", bam, args); @@ -59,16 +70,14 @@ public class BQSRIntegrationTest extends WalkerTest { @Test(dataProvider = "BQSRTest") public void testBQSR(BQSRTest params) { WalkerTestSpec spec = new WalkerTestSpec( - " -T BaseRecalibrator" + - " -R " + params.reference + - " -I " + params.bam + - " -L " + params.interval + - params.args + - " --no_plots" + - " -knownSites " + (params.reference.equals(b36KGReference) ? b36dbSNP129 : hg18dbSNP132) + - " -o %s", + params.getCommandLine(), Arrays.asList(params.md5)); executeTest("testBQSR-"+params.args, spec).getFirst(); + + WalkerTestSpec specNT2 = new WalkerTestSpec( + params.getCommandLine() + " -nt 2", + Arrays.asList(params.md5)); + executeTest("testBQSR-nt2-"+params.args, specNT2).getFirst(); } @Test diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 1cea14a9d..1fe2b840f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,7 +11,6 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.StingException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java index 9f5429fb9..4cba67909 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java @@ -111,16 +111,16 @@ public class BaseRecalibrator extends LocusWalker implements TreeRed private QuantizationInfo quantizationInfo; // an object that keeps track of the information necessary for quality score quantization private RecalibrationTables recalibrationTables; - + private Covariate[] requestedCovariates; // list to hold the all the covariate objects that were requested (required + standard + experimental) private RecalibrationEngine recalibrationEngine; private int minimumQToUse; - protected static final String SKIP_RECORD_ATTRIBUTE = "SKIP"; // used to label reads that should be skipped. - protected static final String SEEN_ATTRIBUTE = "SEEN"; // used to label reads as processed. - protected static final String COVARS_ATTRIBUTE = "COVARS"; // used to store covariates array as a temporary attribute inside GATKSAMRecord.\ + protected static final String SKIP_RECORD_ATTRIBUTE = "SKIP"; // used to label reads that should be skipped. + protected static final String SEEN_ATTRIBUTE = "SEEN"; // used to label reads as processed. + protected static final String COVARS_ATTRIBUTE = "COVARS"; // used to store covariates array as a temporary attribute inside GATKSAMRecord.\ private static final String NO_DBSNP_EXCEPTION = "This calculation is critically dependent on being able to skip over known variant sites. Please provide a VCF file containing known sites of genetic variation."; diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ContextCovariate.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ContextCovariate.java index 8bc58063d..5fe8809fb 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ContextCovariate.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ContextCovariate.java @@ -51,10 +51,6 @@ public class ContextCovariate implements StandardCovariate { private static final int LENGTH_BITS = 4; private static final int LENGTH_MASK = 15; - // temporary lists to use for creating context covariate keys - private final ArrayList mismatchKeys = new ArrayList(200); - private final ArrayList indelKeys = new ArrayList(200); - // the maximum context size (number of bases) permitted; we need to keep the leftmost base free so that values are // not negative and we reserve 4 more bits to represent the length of the context; it takes 2 bits to encode one base. static final private int MAX_DNA_CONTEXT = 13; @@ -91,10 +87,8 @@ public class ContextCovariate implements StandardCovariate { if (negativeStrand) bases = BaseUtils.simpleReverseComplement(bases); - mismatchKeys.clear(); - indelKeys.clear(); - contextWith(bases, mismatchesContextSize, mismatchKeys, mismatchesKeyMask); - contextWith(bases, indelsContextSize, indelKeys, indelsKeyMask); + final ArrayList mismatchKeys = contextWith(bases, mismatchesContextSize, mismatchesKeyMask); + final ArrayList indelKeys = contextWith(bases, indelsContextSize, indelsKeyMask); final int readLength = bases.length; for (int i = 0; i < readLength; i++) { @@ -139,17 +133,19 @@ public class ContextCovariate implements StandardCovariate { * * @param bases the bases in the read to build the context from * @param contextSize context size to use building the context - * @param keys list to store the keys * @param mask mask for pulling out just the context bits */ - private static void contextWith(final byte[] bases, final int contextSize, final ArrayList keys, final int mask) { + private static ArrayList contextWith(final byte[] bases, final int contextSize, final int mask) { + + final int readLength = bases.length; + final ArrayList keys = new ArrayList(readLength); // the first contextSize-1 bases will not have enough previous context - for (int i = 1; i < contextSize && i <= bases.length; i++) + for (int i = 1; i < contextSize && i <= readLength; i++) keys.add(-1); - if (bases.length < contextSize) - return; + if (readLength < contextSize) + return keys; final int newBaseOffset = 2 * (contextSize - 1) + LENGTH_BITS; @@ -171,7 +167,6 @@ public class ContextCovariate implements StandardCovariate { } } - final int readLength = bases.length; for (int currentIndex = contextSize; currentIndex < readLength; currentIndex++) { final int baseIndex = BaseUtils.simpleBaseToBaseIndex(bases[currentIndex]); if (baseIndex == -1) { // ignore non-ACGT bases @@ -191,6 +186,8 @@ public class ContextCovariate implements StandardCovariate { keys.add(-1); } } + + return keys; } public static int keyFromContext(final String dna) { From 357e0b35af1cd0c4be03f5f4161684cd440fa3f7 Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Wed, 25 Jul 2012 14:11:03 -0400 Subject: [PATCH 3/3] Register GATK-full-only walkers and rethrow the missing walker error as a not supported in GATK lite error --- .../sting/gatk/GenomeAnalysisEngine.java | 9 ++++++++- .../sting/utils/classloader/GATKLiteUtils.java | 15 +++++++++++++++ .../sting/utils/exceptions/UserException.java | 2 +- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 8dfa89083..eab9fde79 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -281,7 +281,14 @@ public class GenomeAnalysisEngine { * @return An instance of the walker. */ public Walker getWalkerByName(String walkerName) { - return walkerManager.createByName(walkerName); + try { + return walkerManager.createByName(walkerName); + } catch ( UserException e ) { + if ( isGATKLite() && GATKLiteUtils.isAvailableOnlyInFullGATK(walkerName) ) { + e = new UserException.NotSupportedInGATKLite("the " + walkerName + " walker is available only in the full version of the GATK"); + } + throw e; + } } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/classloader/GATKLiteUtils.java b/public/java/src/org/broadinstitute/sting/utils/classloader/GATKLiteUtils.java index db9b9d8b7..2ab7d0618 100755 --- a/public/java/src/org/broadinstitute/sting/utils/classloader/GATKLiteUtils.java +++ b/public/java/src/org/broadinstitute/sting/utils/classloader/GATKLiteUtils.java @@ -41,6 +41,21 @@ public class GATKLiteUtils { */ private GATKLiteUtils() { } + + private static Set fullVersionGATKWalkers = new HashSet(); + static { + fullVersionGATKWalkers.add("HaplotypeCaller"); + fullVersionGATKWalkers.add("ReduceReads"); + } + /** + * Utility method to check whether a given walker is only available in the full GATK release + * + * @param walkerName the walker class name (not the package) to check + */ + public static boolean isAvailableOnlyInFullGATK(final String walkerName) { + return fullVersionGATKWalkers.contains(walkerName); + } + /** * Utility method to determine whether this is the lite version of the GATK */ diff --git a/public/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java b/public/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java index f950cbfb3..9ce4e82a1 100755 --- a/public/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java +++ b/public/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java @@ -80,7 +80,7 @@ public class UserException extends ReviewedStingException { public static class NotSupportedInGATKLite extends UserException { public NotSupportedInGATKLite(String message) { - super(String.format("GATK Lite does support all of the features of the full version: %s", message)); + super(String.format("GATK Lite does not support all of the features of the full version: %s", message)); } }