diff --git a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index ac506fa4a..6d3746d3e 100755 --- a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -176,6 +176,10 @@ public class GATKArgumentCollection { @Argument(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read with read groups matching : or a .txt file containing the filter strings one per line.", required = false) public List readGroupBlackList = null; + @Element(required = false) + @Argument(fullName = "enable_threaded_debugging",shortName="etd", doc="Enable debugging of threaded apps by applying exception catching in the threaded version of the GATK.", required = false) + public boolean enableThreadedDebugging = false; + /** * marshal the data out to a object * @@ -333,6 +337,9 @@ public class GATKArgumentCollection { if (other.intervalMerging != this.intervalMerging) { return false; } + if (enableThreadedDebugging != other.enableThreadedDebugging) { + return false; + } if ((other.RODToInterval == null && RODToInterval != null) || (other.RODToInterval != null && !other.RODToInterval.equals(RODToInterval))) { return false; diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ExceptionAwareThreadPool.java b/java/src/org/broadinstitute/sting/gatk/executive/ExceptionAwareThreadPool.java new file mode 100644 index 000000000..7c8c2c308 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/ExceptionAwareThreadPool.java @@ -0,0 +1,65 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.StingException; +import org.broadinstitute.sting.utils.Utils; + +import java.util.concurrent.*; + +/** + * an override of the ThreadedPoolExecutor, that throws the exception when an exception is seen in a thread. + */ +public class ExceptionAwareThreadPool extends ThreadPoolExecutor { + /** + * our log, which we want to capture anything from this class + */ + private static Logger logger = Logger.getLogger(ExceptionAwareThreadPool.class); + + public ExceptionAwareThreadPool(int numberOfThreads) { + super(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + Utils.warnUser("Using the etd (enable threaded debugging) mode and the ExceptionAwareThreadPool is dangerous to " + + "yourself and others. DONT USE IT IN A PRODUCTION SETTING, OR AT ALL!!!!"); + } + + /** + * attempt to determine the fate of a runnable object + * @param r the runnable (in our case a Future object) + * @param t any throwables from the thread. + */ + @Override + public void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + // from http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6459119. The throwable in the method parameters + // is not actually the thrown exception from the underlying thread, we have to go get the cause. + if (r instanceof Future) { + try { + Object result = ((Future) r).get(); + // once we have the result, we know everything went fine + logger.debug("Thread completed successfully"); + } catch (InterruptedException ie) { + caughtException(ie); + } catch (ExecutionException ee) { + caughtException(ee.getCause()); + } catch (CancellationException ce) { + caughtException(ce); + } + } + } + + /** + * ungracefully shutdown the GATK + * @param e the throwable object, which caused us to fail + */ + public void caughtException(Throwable e) { + // shutdown all the threads, not waiting to finish + this.shutdownNow(); + + // cite the reason we crashed out + logger.fatal("Thread pool caught an exception from a thread: "); + e.printStackTrace(); + + // bail in the ugliest way possible + System.exit(1); + } + +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 4ac840d29..5134f0eeb 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -81,7 +81,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, int nThreadsToUse ) { super(engine, walker, reads, reference, rods); - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + + if (GenomeAnalysisEngine.instance.getArguments().enableThreadedDebugging) + this.threadPool = new ExceptionAwareThreadPool(nThreadsToUse); + else + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 1330f6d3c..4baaa33ca 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -1,5 +1,6 @@ package org.broadinstitute.sting.gatk.executive; +import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.shards.Shard; @@ -33,6 +34,10 @@ public class ShardTraverser implements Callable { private ThreadLocalOutputTracker outputTracker; private OutputMergeTask outputMergeTask; + /** our log, which we want to capture anything from this class */ + protected static Logger logger = Logger.getLogger(ShardTraverser.class); + + /** * Is this traversal complete? */ @@ -55,14 +60,16 @@ public class ShardTraverser implements Callable { Object accumulator = walker.reduceInit(); WindowMaker windowMaker = new WindowMaker(microScheduler.getReadIterator(shard),shard.getGenomeLocs()); + ShardDataProvider dataProvider = null; try { for(WindowMaker.WindowMakerIterator iterator: windowMaker) { - ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods); + dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods); accumulator = traversalEngine.traverse( walker, dataProvider, accumulator ); dataProvider.close(); } } finally { + if (dataProvider != null) dataProvider.close(); windowMaker.close(); outputMergeTask = outputTracker.closeStorage(); diff --git a/java/src/org/broadinstitute/sting/gatk/refdata/tracks/builders/TribbleRMDTrackBuilder.java b/java/src/org/broadinstitute/sting/gatk/refdata/tracks/builders/TribbleRMDTrackBuilder.java index 8dd4bfab7..f9a45eba5 100644 --- a/java/src/org/broadinstitute/sting/gatk/refdata/tracks/builders/TribbleRMDTrackBuilder.java +++ b/java/src/org/broadinstitute/sting/gatk/refdata/tracks/builders/TribbleRMDTrackBuilder.java @@ -158,7 +158,7 @@ public class TribbleRMDTrackBuilder extends PluginManager implemen * @return a linear index for the specified type * @throws IOException if we cannot write the index file */ - public static Index loadIndex(File inputFile, FeatureCodec codec, boolean onDisk) throws IOException { + public synchronized static Index loadIndex(File inputFile, FeatureCodec codec, boolean onDisk) throws IOException { // our return index LinearIndex returnIndex = null; diff --git a/java/src/org/broadinstitute/sting/utils/file/FSLock.java b/java/src/org/broadinstitute/sting/utils/file/FSLock.java index df5003340..e5be9624c 100644 --- a/java/src/org/broadinstitute/sting/utils/file/FSLock.java +++ b/java/src/org/broadinstitute/sting/utils/file/FSLock.java @@ -35,7 +35,7 @@ public class FSLock { */ public FSLock(File baseLocation) { lockFile = new File(baseLocation.getAbsoluteFile() + lockString); - lockFile.deleteOnExit(); + if (lockFile != null) lockFile.deleteOnExit(); } /**