Cleanup GATK efficiency monitor classes

-- Invert logic in GATKArgumentCollection to disable monitoring, not enable.  That means monitoring is on by default
-- Fix testing error in unit tests
-- Rename variables in ThreadAllocation to be clearer
This commit is contained in:
Mark DePristo 2012-08-22 16:45:53 -04:00
parent 1d47d2b573
commit 63af0cbcba
6 changed files with 28 additions and 26 deletions

View File

@ -372,7 +372,7 @@ public class GenomeAnalysisEngine {
else if(argCollection.numberOfIOThreads != null) else if(argCollection.numberOfIOThreads != null)
numIOThreads = argCollection.numberOfIOThreads; numIOThreads = argCollection.numberOfIOThreads;
this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, argCollection.monitorThreads); this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, ! argCollection.disableEfficiencyMonitor);
} }
/** /**

View File

@ -283,9 +283,13 @@ public class GATKArgumentCollection {
@Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false) @Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false)
public Integer numberOfThreads = 1; public Integer numberOfThreads = 1;
/** Should we monitor threading efficiency? . */ /**
@Argument(fullName = "monitorThreads", shortName = "mt", doc = "Should we monitor the threading efficiency when running in multi-threaded mode?", required = false) * By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny
public Boolean monitorThreads = false; * cost (< 0.1%) in runtime because of turning on the JavaBean. This argument allows you
* to disable the monitor
*/
@Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false)
public Boolean disableEfficiencyMonitor = false;
/** /**
* The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types. * The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types.

View File

@ -39,7 +39,6 @@ import org.broadinstitute.sting.gatk.traversals.*;
import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
import javax.management.JMException; import javax.management.JMException;
@ -107,11 +106,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
if(walker instanceof ReadWalker) if(walker instanceof ReadWalker)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.shouldMonitorThreads()); return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
} else { } else {
if(threadAllocation.getNumCPUThreads() > 1) if(threadAllocation.getNumCPUThreads() > 1)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.shouldMonitorThreads()); return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.monitorThreadEfficiency());
} }
} }

View File

@ -43,7 +43,7 @@ public class ThreadAllocation {
/** /**
* Should we monitor thread efficiency? * Should we monitor thread efficiency?
*/ */
private final boolean monitorThreads; private final boolean monitorEfficiency;
public int getNumCPUThreads() { public int getNumCPUThreads() {
return numCPUThreads; return numCPUThreads;
@ -53,8 +53,8 @@ public class ThreadAllocation {
return numIOThreads; return numIOThreads;
} }
public boolean shouldMonitorThreads() { public boolean monitorThreadEfficiency() {
return monitorThreads; return monitorEfficiency;
} }
/** /**
@ -71,7 +71,7 @@ public class ThreadAllocation {
* @param numCPUThreads Total number of threads allocated to the traversal. * @param numCPUThreads Total number of threads allocated to the traversal.
* @param numIOThreads Total number of threads allocated exclusively to IO. * @param numIOThreads Total number of threads allocated exclusively to IO.
*/ */
public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorThreads) { public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorEfficiency) {
// If no allocation information is present, allocate all threads to CPU // If no allocation information is present, allocate all threads to CPU
if(numCPUThreads == null && numIOThreads == null) { if(numCPUThreads == null && numIOThreads == null) {
this.numCPUThreads = totalThreads; this.numCPUThreads = totalThreads;
@ -98,6 +98,6 @@ public class ThreadAllocation {
this.numIOThreads = numIOThreads; this.numIOThreads = numIOThreads;
} }
this.monitorThreads = monitorThreads; this.monitorEfficiency = monitorEfficiency;
} }
} }

View File

@ -99,9 +99,9 @@ public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor i
} }
@Ensures({ @Ensures({
"activeThreads.size() < old(activeThreads.size())", "activeThreads.size() <= old(activeThreads.size())",
"! activeThreads.contains(thread)", "! activeThreads.contains(thread)",
"countDownLatch.getCount() < old(countDownLatch.getCount())" "countDownLatch.getCount() <= old(countDownLatch.getCount())"
}) })
@Override @Override
public synchronized void threadIsDone(final Thread thread) { public synchronized void threadIsDone(final Thread thread) {
@ -111,13 +111,12 @@ public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor i
super.threadIsDone(thread); super.threadIsDone(thread);
// remove the thread from the list of active activeThreads // remove the thread from the list of active activeThreads, if it's in there, and decrement the countdown latch
if ( ! activeThreads.remove(thread) ) if ( activeThreads.remove(thread) ) {
throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads"); // one less thread is live for those blocking on all activeThreads to be complete
countDownLatch.countDown();
// one less thread is live for those blocking on all activeThreads to be complete if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
countDownLatch.countDown(); }
if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
} }
/** /**

View File

@ -39,7 +39,7 @@ import java.util.concurrent.*;
/** /**
* Tests for the state monitoring thread factory. * Tests for the state monitoring thread factory.
*/ */
public class StateMonitoringThreadFactoryUnitTest extends BaseTest { public class EfficiencyMonitoringThreadFactoryUnitTest extends BaseTest {
// the duration of the tests -- 100 ms is tolerable given the number of tests we are doing // the duration of the tests -- 100 ms is tolerable given the number of tests we are doing
private final static long THREAD_TARGET_DURATION_IN_MILLISECOND = 1000; private final static long THREAD_TARGET_DURATION_IN_MILLISECOND = 1000;
final static Object GLOBAL_LOCK = new Object(); final static Object GLOBAL_LOCK = new Object();
@ -61,8 +61,8 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
public int getNStates() { return statesForThreads.size(); } public int getNStates() { return statesForThreads.size(); }
public double maxStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; } public double maxStatePercent(final EfficiencyMonitoringThreadFactory.State state) { return 100*(fraction(state) + TOLERANCE); }
public double minStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; } public double minStatePercent(final EfficiencyMonitoringThreadFactory.State state) { return 100*(fraction(state) - TOLERANCE); }
private double fraction(final EfficiencyMonitoringThreadFactory.State state) { private double fraction(final EfficiencyMonitoringThreadFactory.State state) {
return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size()); return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size());
@ -159,8 +159,8 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime); Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime);
for (final EfficiencyMonitoringThreadFactory.State state : EfficiencyMonitoringThreadFactory.State.values() ) { for (final EfficiencyMonitoringThreadFactory.State state : EfficiencyMonitoringThreadFactory.State.values() ) {
final double min = test.minStateFraction(state); final double min = test.minStatePercent(state);
final double max = test.maxStateFraction(state); final double max = test.maxStatePercent(state);
final double obs = factory.getStatePercent(state); final double obs = factory.getStatePercent(state);
// logger.warn(" Checking " + state // logger.warn(" Checking " + state
// + " min " + String.format("%.2f", min) // + " min " + String.format("%.2f", min)