diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 9a9febb78..0d1c34ced 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -372,7 +372,7 @@ public class GenomeAnalysisEngine { else if(argCollection.numberOfIOThreads != null) numIOThreads = argCollection.numberOfIOThreads; - this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, argCollection.monitorThreads); + this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, ! argCollection.disableEfficiencyMonitor); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index 6a14373f3..72cb5e02f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -283,9 +283,13 @@ public class GATKArgumentCollection { @Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false) public Integer numberOfThreads = 1; - /** Should we monitor threading efficiency? . */ - @Argument(fullName = "monitorThreads", shortName = "mt", doc = "Should we monitor the threading efficiency when running in multi-threaded mode?", required = false) - public Boolean monitorThreads = false; + /** + * By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny + * cost (< 0.1%) in runtime because of turning on the JavaBean. This argument allows you + * to disable the monitor + */ + @Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false) + public Boolean disableEfficiencyMonitor = false; /** * The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 0abd75b65..b755cdd77 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -39,7 +39,6 @@ import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; @@ -107,11 +106,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { if(walker instanceof ReadWalker) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.shouldMonitorThreads()); + return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); } else { if(threadAllocation.getNumCPUThreads() > 1) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); - return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.shouldMonitorThreads()); + return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.monitorThreadEfficiency()); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java index 07a45c0f9..caae55ac5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java +++ b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java @@ -43,7 +43,7 @@ public class ThreadAllocation { /** * Should we monitor thread efficiency? */ - private final boolean monitorThreads; + private final boolean monitorEfficiency; public int getNumCPUThreads() { return numCPUThreads; @@ -53,8 +53,8 @@ public class ThreadAllocation { return numIOThreads; } - public boolean shouldMonitorThreads() { - return monitorThreads; + public boolean monitorThreadEfficiency() { + return monitorEfficiency; } /** @@ -71,7 +71,7 @@ public class ThreadAllocation { * @param numCPUThreads Total number of threads allocated to the traversal. * @param numIOThreads Total number of threads allocated exclusively to IO. */ - public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorThreads) { + public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorEfficiency) { // If no allocation information is present, allocate all threads to CPU if(numCPUThreads == null && numIOThreads == null) { this.numCPUThreads = totalThreads; @@ -98,6 +98,6 @@ public class ThreadAllocation { this.numIOThreads = numIOThreads; } - this.monitorThreads = monitorThreads; + this.monitorEfficiency = monitorEfficiency; } } diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java b/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java index 51af08681..b30198608 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java +++ b/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java @@ -99,9 +99,9 @@ public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor i } @Ensures({ - "activeThreads.size() < old(activeThreads.size())", + "activeThreads.size() <= old(activeThreads.size())", "! activeThreads.contains(thread)", - "countDownLatch.getCount() < old(countDownLatch.getCount())" + "countDownLatch.getCount() <= old(countDownLatch.getCount())" }) @Override public synchronized void threadIsDone(final Thread thread) { @@ -111,13 +111,12 @@ public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor i super.threadIsDone(thread); - // remove the thread from the list of active activeThreads - if ( ! activeThreads.remove(thread) ) - throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads"); - - // one less thread is live for those blocking on all activeThreads to be complete - countDownLatch.countDown(); - if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); + // remove the thread from the list of active activeThreads, if it's in there, and decrement the countdown latch + if ( activeThreads.remove(thread) ) { + // one less thread is live for those blocking on all activeThreads to be complete + countDownLatch.countDown(); + if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); + } } /** diff --git a/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactoryUnitTest.java similarity index 94% rename from public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java rename to public/java/test/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactoryUnitTest.java index 0b655873d..35dc9754c 100755 --- a/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactoryUnitTest.java @@ -39,7 +39,7 @@ import java.util.concurrent.*; /** * Tests for the state monitoring thread factory. */ -public class StateMonitoringThreadFactoryUnitTest extends BaseTest { +public class EfficiencyMonitoringThreadFactoryUnitTest extends BaseTest { // the duration of the tests -- 100 ms is tolerable given the number of tests we are doing private final static long THREAD_TARGET_DURATION_IN_MILLISECOND = 1000; final static Object GLOBAL_LOCK = new Object(); @@ -61,8 +61,8 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { public int getNStates() { return statesForThreads.size(); } - public double maxStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; } - public double minStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; } + public double maxStatePercent(final EfficiencyMonitoringThreadFactory.State state) { return 100*(fraction(state) + TOLERANCE); } + public double minStatePercent(final EfficiencyMonitoringThreadFactory.State state) { return 100*(fraction(state) - TOLERANCE); } private double fraction(final EfficiencyMonitoringThreadFactory.State state) { return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size()); @@ -159,8 +159,8 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime); for (final EfficiencyMonitoringThreadFactory.State state : EfficiencyMonitoringThreadFactory.State.values() ) { - final double min = test.minStateFraction(state); - final double max = test.maxStateFraction(state); + final double min = test.minStatePercent(state); + final double max = test.maxStatePercent(state); final double obs = factory.getStatePercent(state); // logger.warn(" Checking " + state // + " min " + String.format("%.2f", min)