Add thread efficiency monitoring to GATK HMS

-- See https://jira.broadinstitute.org/browse/GSA-502
-- New command line argument -mt enables thread monitoring
-- If enabled, HMS uses StateMonitoringThreadFactory to create monitored threads, and prints out an efficiency report when HMS exits, telling the user information like:

for BQSR – known to be inefficient locking
INFO 17:10:33,195 StateMonitoringThreadFactory - Number of activeThreads used: 8
INFO 17:10:33,196 StateMonitoringThreadFactory - Total runtime 90.3 m
INFO 17:10:33,196 StateMonitoringThreadFactory - Fraction of time spent blocked is 0.72 ( 64.8 m)
INFO 17:10:33,197 StateMonitoringThreadFactory - Fraction of time spent running is 0.26 ( 23.7 m)
INFO 17:10:33,197 StateMonitoringThreadFactory - Fraction of time spent waiting is 0.02 ( 112.8 s)
INFO 17:10:33,197 StateMonitoringThreadFactory - Efficiency of multi-threading: 26.19% of time spent doing productive work

for CountLoci
INFO 17:06:12,777 StateMonitoringThreadFactory - Number of activeThreads used: 8
INFO 17:06:12,777 StateMonitoringThreadFactory - Total runtime 43.5 m
INFO 17:06:12,778 StateMonitoringThreadFactory - Fraction of time spent blocked is 0.00 ( 4.2 s)
INFO 17:06:12,778 StateMonitoringThreadFactory - Fraction of time spent running is 1.00 ( 43.3 m)
INFO 17:06:12,779 StateMonitoringThreadFactory - Fraction of time spent waiting is 0.00 ( 6.0 s)
INFO 17:06:12,779 StateMonitoringThreadFactory - Efficiency of multi-threading: 99.61% of time spent doing productive work
This commit is contained in:
Mark DePristo 2012-08-22 09:10:09 -04:00
parent 27842ba448
commit 18060f237b
5 changed files with 47 additions and 7 deletions

View File

@ -372,7 +372,7 @@ public class GenomeAnalysisEngine {
else if(argCollection.numberOfIOThreads != null)
numIOThreads = argCollection.numberOfIOThreads;
this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads,numCPUThreads,numIOThreads);
this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, argCollection.monitorThreads);
}
/**

View File

@ -283,6 +283,10 @@ public class GATKArgumentCollection {
@Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false)
public Integer numberOfThreads = 1;
/** Should we monitor threading efficiency? . */
@Argument(fullName = "monitorThreads", shortName = "mt", doc = "Should we monitor the threading efficiency when running in multi-threaded mode?", required = false)
public Boolean monitorThreads = false;
/**
* The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types.
* TODO: Kill this when I can do a tagged integer in Queue.

View File

@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.StateMonitoringThreadFactory;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
import java.util.Collection;
@ -72,6 +73,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** What is the total time spent merging output? */
private long totalOutputMergeTime = 0;
/** may be null */
final StateMonitoringThreadFactory monitoringThreadFactory;
/**
* Create a new hierarchical microscheduler to process the given reads and reference.
*
@ -80,9 +84,22 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* @param reference Reference for driving the traversal.
* @param nThreadsToUse maximum number of threads to use to do the work
*/
protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse ) {
protected HierarchicalMicroScheduler(final GenomeAnalysisEngine engine,
final Walker walker,
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int nThreadsToUse,
final boolean monitorThreadPerformance ) {
super(engine, walker, reads, reference, rods);
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
if ( monitorThreadPerformance ) {
this.monitoringThreadFactory = new StateMonitoringThreadFactory(nThreadsToUse);
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
} else {
this.monitoringThreadFactory = null;
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
}
}
public Object execute( Walker walker, Iterable<Shard> shardStrategy ) {
@ -140,10 +157,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
// do final cleanup operations
outputTracker.close();
cleanup();
printThreadingEfficiency();
return result;
}
/**
* Print out the threading efficiency of this HMS, if state monitoring is enabled
*/
private void printThreadingEfficiency() {
if ( monitoringThreadFactory != null )
monitoringThreadFactory.printUsageInformation(logger);
}
/**
* Run the initialize method of the walker. Ensure that any calls
* to the output stream will bypass thread local storage and write

View File

@ -98,7 +98,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
if(walker instanceof ReadWalker)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads());
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.shouldMonitorThreads());
} else {
if(threadAllocation.getNumCPUThreads() > 1)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));

View File

@ -40,6 +40,11 @@ public class ThreadAllocation {
*/
private final int numIOThreads;
/**
* Should we monitor thread efficiency?
*/
private final boolean monitorThreads;
public int getNumCPUThreads() {
return numCPUThreads;
}
@ -48,11 +53,15 @@ public class ThreadAllocation {
return numIOThreads;
}
public boolean shouldMonitorThreads() {
return monitorThreads;
}
/**
* Construct the default thread allocation.
*/
public ThreadAllocation() {
this(1,null,null);
this(1, null, null, false);
}
/**
@ -62,7 +71,7 @@ public class ThreadAllocation {
* @param numCPUThreads Total number of threads allocated to the traversal.
* @param numIOThreads Total number of threads allocated exclusively to IO.
*/
public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads) {
public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorThreads) {
// If no allocation information is present, allocate all threads to CPU
if(numCPUThreads == null && numIOThreads == null) {
this.numCPUThreads = totalThreads;
@ -88,6 +97,7 @@ public class ThreadAllocation {
this.numCPUThreads = numCPUThreads;
this.numIOThreads = numIOThreads;
}
}
this.monitorThreads = monitorThreads;
}
}