From 18060f237b21b53e2442526c4b06ea955cd8baac Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 22 Aug 2012 09:10:09 -0400 Subject: [PATCH] Add thread efficiency monitoring to GATK HMS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -- See https://jira.broadinstitute.org/browse/GSA-502 -- New command line argument -mt enables thread monitoring -- If enabled, HMS uses StateMonitoringThreadFactory to create monitored threads, and prints out an efficiency report when HMS exits, telling the user information like: for BQSR – known to be inefficient locking INFO 17:10:33,195 StateMonitoringThreadFactory - Number of activeThreads used: 8 INFO 17:10:33,196 StateMonitoringThreadFactory - Total runtime 90.3 m INFO 17:10:33,196 StateMonitoringThreadFactory - Fraction of time spent blocked is 0.72 ( 64.8 m) INFO 17:10:33,197 StateMonitoringThreadFactory - Fraction of time spent running is 0.26 ( 23.7 m) INFO 17:10:33,197 StateMonitoringThreadFactory - Fraction of time spent waiting is 0.02 ( 112.8 s) INFO 17:10:33,197 StateMonitoringThreadFactory - Efficiency of multi-threading: 26.19% of time spent doing productive work for CountLoci INFO 17:06:12,777 StateMonitoringThreadFactory - Number of activeThreads used: 8 INFO 17:06:12,777 StateMonitoringThreadFactory - Total runtime 43.5 m INFO 17:06:12,778 StateMonitoringThreadFactory - Fraction of time spent blocked is 0.00 ( 4.2 s) INFO 17:06:12,778 StateMonitoringThreadFactory - Fraction of time spent running is 1.00 ( 43.3 m) INFO 17:06:12,779 StateMonitoringThreadFactory - Fraction of time spent waiting is 0.00 ( 6.0 s) INFO 17:06:12,779 StateMonitoringThreadFactory - Efficiency of multi-threading: 99.61% of time spent doing productive work --- .../sting/gatk/GenomeAnalysisEngine.java | 2 +- .../arguments/GATKArgumentCollection.java | 4 +++ .../executive/HierarchicalMicroScheduler.java | 30 +++++++++++++++++-- .../sting/gatk/executive/MicroScheduler.java | 2 +- .../resourcemanagement/ThreadAllocation.java | 16 ++++++++-- 5 files changed, 47 insertions(+), 7 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index e76cde43a..9a9febb78 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -372,7 +372,7 @@ public class GenomeAnalysisEngine { else if(argCollection.numberOfIOThreads != null) numIOThreads = argCollection.numberOfIOThreads; - this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads,numCPUThreads,numIOThreads); + this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, argCollection.monitorThreads); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index f66e229bc..6a14373f3 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -283,6 +283,10 @@ public class GATKArgumentCollection { @Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false) public Integer numberOfThreads = 1; + /** Should we monitor threading efficiency? . */ + @Argument(fullName = "monitorThreads", shortName = "mt", doc = "Should we monitor the threading efficiency when running in multi-threaded mode?", required = false) + public Boolean monitorThreads = false; + /** * The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types. * TODO: Kill this when I can do a tagged integer in Queue. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 70b1be0e1..017eeb55a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.threading.StateMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; @@ -72,6 +73,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** What is the total time spent merging output? */ private long totalOutputMergeTime = 0; + /** may be null */ + final StateMonitoringThreadFactory monitoringThreadFactory; + /** * Create a new hierarchical microscheduler to process the given reads and reference. * @@ -80,9 +84,22 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @param reference Reference for driving the traversal. * @param nThreadsToUse maximum number of threads to use to do the work */ - protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, int nThreadsToUse ) { + protected HierarchicalMicroScheduler(final GenomeAnalysisEngine engine, + final Walker walker, + final SAMDataSource reads, + final IndexedFastaSequenceFile reference, + final Collection rods, + final int nThreadsToUse, + final boolean monitorThreadPerformance ) { super(engine, walker, reads, reference, rods); - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + + if ( monitorThreadPerformance ) { + this.monitoringThreadFactory = new StateMonitoringThreadFactory(nThreadsToUse); + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory); + } else { + this.monitoringThreadFactory = null; + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + } } public Object execute( Walker walker, Iterable shardStrategy ) { @@ -140,10 +157,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar // do final cleanup operations outputTracker.close(); cleanup(); + printThreadingEfficiency(); return result; } + /** + * Print out the threading efficiency of this HMS, if state monitoring is enabled + */ + private void printThreadingEfficiency() { + if ( monitoringThreadFactory != null ) + monitoringThreadFactory.printUsageInformation(logger); + } + /** * Run the initialize method of the walker. Ensure that any calls * to the output stream will bypass thread local storage and write diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 95e39b7c6..c845bbce0 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -98,7 +98,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { if(walker instanceof ReadWalker) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads()); + return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.shouldMonitorThreads()); } else { if(threadAllocation.getNumCPUThreads() > 1) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); diff --git a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java index 0c81af07b..07a45c0f9 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java +++ b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java @@ -40,6 +40,11 @@ public class ThreadAllocation { */ private final int numIOThreads; + /** + * Should we monitor thread efficiency? + */ + private final boolean monitorThreads; + public int getNumCPUThreads() { return numCPUThreads; } @@ -48,11 +53,15 @@ public class ThreadAllocation { return numIOThreads; } + public boolean shouldMonitorThreads() { + return monitorThreads; + } + /** * Construct the default thread allocation. */ public ThreadAllocation() { - this(1,null,null); + this(1, null, null, false); } /** @@ -62,7 +71,7 @@ public class ThreadAllocation { * @param numCPUThreads Total number of threads allocated to the traversal. * @param numIOThreads Total number of threads allocated exclusively to IO. */ - public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads) { + public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorThreads) { // If no allocation information is present, allocate all threads to CPU if(numCPUThreads == null && numIOThreads == null) { this.numCPUThreads = totalThreads; @@ -88,6 +97,7 @@ public class ThreadAllocation { this.numCPUThreads = numCPUThreads; this.numIOThreads = numIOThreads; } - } + this.monitorThreads = monitorThreads; + } }