adding a command line option, -etd (enable threaded debugging), that uses a custom thread pool class to catch exceptions thrown inside of a thread.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3450 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
0e3c76ab84
commit
cded9ec985
|
|
@ -176,6 +176,10 @@ public class GATKArgumentCollection {
|
||||||
@Argument(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read with read groups matching <TAG>:<SUBSTRING> or a .txt file containing the filter strings one per line.", required = false)
|
@Argument(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read with read groups matching <TAG>:<SUBSTRING> or a .txt file containing the filter strings one per line.", required = false)
|
||||||
public List<String> readGroupBlackList = null;
|
public List<String> readGroupBlackList = null;
|
||||||
|
|
||||||
|
@Element(required = false)
|
||||||
|
@Argument(fullName = "enable_threaded_debugging",shortName="etd", doc="Enable debugging of threaded apps by applying exception catching in the threaded version of the GATK.", required = false)
|
||||||
|
public boolean enableThreadedDebugging = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* marshal the data out to a object
|
* marshal the data out to a object
|
||||||
*
|
*
|
||||||
|
|
@ -333,6 +337,9 @@ public class GATKArgumentCollection {
|
||||||
if (other.intervalMerging != this.intervalMerging) {
|
if (other.intervalMerging != this.intervalMerging) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (enableThreadedDebugging != other.enableThreadedDebugging) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if ((other.RODToInterval == null && RODToInterval != null) ||
|
if ((other.RODToInterval == null && RODToInterval != null) ||
|
||||||
(other.RODToInterval != null && !other.RODToInterval.equals(RODToInterval))) {
|
(other.RODToInterval != null && !other.RODToInterval.equals(RODToInterval))) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
|
import org.broadinstitute.sting.utils.Utils;
|
||||||
|
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* an override of the ThreadedPoolExecutor, that throws the exception when an exception is seen in a thread.
|
||||||
|
*/
|
||||||
|
public class ExceptionAwareThreadPool extends ThreadPoolExecutor {
|
||||||
|
/**
|
||||||
|
* our log, which we want to capture anything from this class
|
||||||
|
*/
|
||||||
|
private static Logger logger = Logger.getLogger(ExceptionAwareThreadPool.class);
|
||||||
|
|
||||||
|
public ExceptionAwareThreadPool(int numberOfThreads) {
|
||||||
|
super(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
||||||
|
Utils.warnUser("Using the etd (enable threaded debugging) mode and the ExceptionAwareThreadPool is dangerous to " +
|
||||||
|
"yourself and others. DONT USE IT IN A PRODUCTION SETTING, OR AT ALL!!!!");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* attempt to determine the fate of a runnable object
|
||||||
|
* @param r the runnable (in our case a Future object)
|
||||||
|
* @param t any throwables from the thread.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void afterExecute(Runnable r, Throwable t) {
|
||||||
|
super.afterExecute(r, t);
|
||||||
|
// from http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6459119. The throwable in the method parameters
|
||||||
|
// is not actually the thrown exception from the underlying thread, we have to go get the cause.
|
||||||
|
if (r instanceof Future<?>) {
|
||||||
|
try {
|
||||||
|
Object result = ((Future<?>) r).get();
|
||||||
|
// once we have the result, we know everything went fine
|
||||||
|
logger.debug("Thread completed successfully");
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
caughtException(ie);
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
caughtException(ee.getCause());
|
||||||
|
} catch (CancellationException ce) {
|
||||||
|
caughtException(ce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ungracefully shutdown the GATK
|
||||||
|
* @param e the throwable object, which caused us to fail
|
||||||
|
*/
|
||||||
|
public void caughtException(Throwable e) {
|
||||||
|
// shutdown all the threads, not waiting to finish
|
||||||
|
this.shutdownNow();
|
||||||
|
|
||||||
|
// cite the reason we crashed out
|
||||||
|
logger.fatal("Thread pool caught an exception from a thread: ");
|
||||||
|
e.printStackTrace();
|
||||||
|
|
||||||
|
// bail in the ugliest way possible
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -81,7 +81,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
*/
|
*/
|
||||||
protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse ) {
|
protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse ) {
|
||||||
super(engine, walker, reads, reference, rods);
|
super(engine, walker, reads, reference, rods);
|
||||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
|
||||||
|
if (GenomeAnalysisEngine.instance.getArguments().enableThreadedDebugging)
|
||||||
|
this.threadPool = new ExceptionAwareThreadPool(nThreadsToUse);
|
||||||
|
else
|
||||||
|
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
package org.broadinstitute.sting.gatk.executive;
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
||||||
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
|
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
|
||||||
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
|
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
|
||||||
|
|
@ -33,6 +34,10 @@ public class ShardTraverser implements Callable {
|
||||||
private ThreadLocalOutputTracker outputTracker;
|
private ThreadLocalOutputTracker outputTracker;
|
||||||
private OutputMergeTask outputMergeTask;
|
private OutputMergeTask outputMergeTask;
|
||||||
|
|
||||||
|
/** our log, which we want to capture anything from this class */
|
||||||
|
protected static Logger logger = Logger.getLogger(ShardTraverser.class);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this traversal complete?
|
* Is this traversal complete?
|
||||||
*/
|
*/
|
||||||
|
|
@ -55,14 +60,16 @@ public class ShardTraverser implements Callable {
|
||||||
|
|
||||||
Object accumulator = walker.reduceInit();
|
Object accumulator = walker.reduceInit();
|
||||||
WindowMaker windowMaker = new WindowMaker(microScheduler.getReadIterator(shard),shard.getGenomeLocs());
|
WindowMaker windowMaker = new WindowMaker(microScheduler.getReadIterator(shard),shard.getGenomeLocs());
|
||||||
|
ShardDataProvider dataProvider = null;
|
||||||
try {
|
try {
|
||||||
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
|
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
|
||||||
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods);
|
dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods);
|
||||||
accumulator = traversalEngine.traverse( walker, dataProvider, accumulator );
|
accumulator = traversalEngine.traverse( walker, dataProvider, accumulator );
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
if (dataProvider != null) dataProvider.close();
|
||||||
windowMaker.close();
|
windowMaker.close();
|
||||||
outputMergeTask = outputTracker.closeStorage();
|
outputMergeTask = outputTracker.closeStorage();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,7 @@ public class TribbleRMDTrackBuilder extends PluginManager<FeatureCodec> implemen
|
||||||
* @return a linear index for the specified type
|
* @return a linear index for the specified type
|
||||||
* @throws IOException if we cannot write the index file
|
* @throws IOException if we cannot write the index file
|
||||||
*/
|
*/
|
||||||
public static Index loadIndex(File inputFile, FeatureCodec codec, boolean onDisk) throws IOException {
|
public synchronized static Index loadIndex(File inputFile, FeatureCodec codec, boolean onDisk) throws IOException {
|
||||||
|
|
||||||
// our return index
|
// our return index
|
||||||
LinearIndex returnIndex = null;
|
LinearIndex returnIndex = null;
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ public class FSLock {
|
||||||
*/
|
*/
|
||||||
public FSLock(File baseLocation) {
|
public FSLock(File baseLocation) {
|
||||||
lockFile = new File(baseLocation.getAbsoluteFile() + lockString);
|
lockFile = new File(baseLocation.getAbsoluteFile() + lockString);
|
||||||
lockFile.deleteOnExit();
|
if (lockFile != null) lockFile.deleteOnExit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue