diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index e76cde43a..9a9febb78 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -372,7 +372,7 @@ public class GenomeAnalysisEngine { else if(argCollection.numberOfIOThreads != null) numIOThreads = argCollection.numberOfIOThreads; - this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads,numCPUThreads,numIOThreads); + this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, argCollection.monitorThreads); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index f66e229bc..6a14373f3 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -283,6 +283,10 @@ public class GATKArgumentCollection { @Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false) public Integer numberOfThreads = 1; + /** Should we monitor threading efficiency? . */ + @Argument(fullName = "monitorThreads", shortName = "mt", doc = "Should we monitor the threading efficiency when running in multi-threaded mode?", required = false) + public Boolean monitorThreads = false; + /** * The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types. * TODO: Kill this when I can do a tagged integer in Queue. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 70b1be0e1..017eeb55a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.threading.StateMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; @@ -72,6 +73,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** What is the total time spent merging output? */ private long totalOutputMergeTime = 0; + /** may be null */ + final StateMonitoringThreadFactory monitoringThreadFactory; + /** * Create a new hierarchical microscheduler to process the given reads and reference. * @@ -80,9 +84,22 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @param reference Reference for driving the traversal. * @param nThreadsToUse maximum number of threads to use to do the work */ - protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, int nThreadsToUse ) { + protected HierarchicalMicroScheduler(final GenomeAnalysisEngine engine, + final Walker walker, + final SAMDataSource reads, + final IndexedFastaSequenceFile reference, + final Collection rods, + final int nThreadsToUse, + final boolean monitorThreadPerformance ) { super(engine, walker, reads, reference, rods); - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + + if ( monitorThreadPerformance ) { + this.monitoringThreadFactory = new StateMonitoringThreadFactory(nThreadsToUse); + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory); + } else { + this.monitoringThreadFactory = null; + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + } } public Object execute( Walker walker, Iterable shardStrategy ) { @@ -140,10 +157,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar // do final cleanup operations outputTracker.close(); cleanup(); + printThreadingEfficiency(); return result; } + /** + * Print out the threading efficiency of this HMS, if state monitoring is enabled + */ + private void printThreadingEfficiency() { + if ( monitoringThreadFactory != null ) + monitoringThreadFactory.printUsageInformation(logger); + } + /** * Run the initialize method of the walker. Ensure that any calls * to the output stream will bypass thread local storage and write diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 95e39b7c6..c845bbce0 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -98,7 +98,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { if(walker instanceof ReadWalker) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads()); + return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.shouldMonitorThreads()); } else { if(threadAllocation.getNumCPUThreads() > 1) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); diff --git a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java index 0c81af07b..07a45c0f9 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java +++ b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java @@ -40,6 +40,11 @@ public class ThreadAllocation { */ private final int numIOThreads; + /** + * Should we monitor thread efficiency? + */ + private final boolean monitorThreads; + public int getNumCPUThreads() { return numCPUThreads; } @@ -48,11 +53,15 @@ public class ThreadAllocation { return numIOThreads; } + public boolean shouldMonitorThreads() { + return monitorThreads; + } + /** * Construct the default thread allocation. */ public ThreadAllocation() { - this(1,null,null); + this(1, null, null, false); } /** @@ -62,7 +71,7 @@ public class ThreadAllocation { * @param numCPUThreads Total number of threads allocated to the traversal. * @param numIOThreads Total number of threads allocated exclusively to IO. */ - public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads) { + public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorThreads) { // If no allocation information is present, allocate all threads to CPU if(numCPUThreads == null && numIOThreads == null) { this.numCPUThreads = totalThreads; @@ -88,6 +97,7 @@ public class ThreadAllocation { this.numCPUThreads = numCPUThreads; this.numIOThreads = numIOThreads; } - } + this.monitorThreads = monitorThreads; + } }