From e1293f0ef27f33cb5c32ff2ec61c1a6b9bf831f4 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 22 Aug 2012 11:31:14 -0400 Subject: [PATCH] GSA-507: Thread monitoring refactored so it can work without a thread factory -- Old version StateMonitoringThreadFactory refactored into base class ThreadEfficiencyMonitor and subclass EfficiencyMonitoringThreadFactory. -- Base class is used by LinearMicroScheduler to monitor performance of GATK in single threaded mode -- MicroScheduler now handles management of the efficiency monitor. Includes master thread in monitor, meaning that reduce is now included for both schedulers --- .../executive/HierarchicalMicroScheduler.java | 19 +- .../gatk/executive/LinearMicroScheduler.java | 13 +- .../sting/gatk/executive/MicroScheduler.java | 33 +- .../EfficiencyMonitoringThreadFactory.java | 159 +++++++++ .../StateMonitoringThreadFactory.java | 321 ------------------ .../threading/ThreadEfficiencyMonitor.java | 206 +++++++++++ .../StateMonitoringThreadFactoryUnitTest.java | 36 +- 7 files changed, 431 insertions(+), 356 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactory.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 017eeb55a..70cdaab22 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.threading.StateMonitoringThreadFactory; +import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; @@ -73,9 +73,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** What is the total time spent merging output? */ private long totalOutputMergeTime = 0; - /** may be null */ - final StateMonitoringThreadFactory monitoringThreadFactory; - /** * Create a new hierarchical microscheduler to process the given reads and reference. * @@ -94,10 +91,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar super(engine, walker, reads, reference, rods); if ( monitorThreadPerformance ) { - this.monitoringThreadFactory = new StateMonitoringThreadFactory(nThreadsToUse); + final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse); + setThreadEfficiencyMonitor(monitoringThreadFactory); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory); } else { - this.monitoringThreadFactory = null; this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); } } @@ -157,19 +154,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar // do final cleanup operations outputTracker.close(); cleanup(); - printThreadingEfficiency(); + executionIsDone(); return result; } - /** - * Print out the threading efficiency of this HMS, if state monitoring is enabled - */ - private void printThreadingEfficiency() { - if ( monitoringThreadFactory != null ) - monitoringThreadFactory.printUsageInformation(logger); - } - /** * Run the initialize method of the walker. Ensure that any calls * to the output stream will bypass thread local storage and write diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index b35abb775..7a6902fff 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -13,6 +13,7 @@ import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.SampleUtils; +import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import java.util.Collection; @@ -33,8 +34,17 @@ public class LinearMicroScheduler extends MicroScheduler { * @param reference Reference for driving the traversal. * @param rods Reference-ordered data. */ - protected LinearMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods ) { + protected LinearMicroScheduler(final GenomeAnalysisEngine engine, + final Walker walker, + final SAMDataSource reads, + final IndexedFastaSequenceFile reference, + final Collection rods, + final boolean monitorThreadPerformance ) { super(engine, walker, reads, reference, rods); + + if ( monitorThreadPerformance ) + setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor()); + } /** @@ -88,6 +98,7 @@ public class LinearMicroScheduler extends MicroScheduler { outputTracker.close(); cleanup(); + executionIsDone(); return accumulator; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index c845bbce0..0abd75b65 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -39,6 +39,8 @@ import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; +import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; import javax.management.MBeanServer; @@ -79,6 +81,13 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { private final MBeanServer mBeanServer; private final ObjectName mBeanName; + /** + * Threading efficiency monitor for tracking the resource utilization of the GATK + * + * may be null + */ + ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -102,7 +111,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } else { if(threadAllocation.getNumCPUThreads() > 1) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); - return new LinearMicroScheduler(engine, walker, reads, reference, rods); + return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.shouldMonitorThreads()); } } @@ -150,6 +159,16 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + + /** + * Inform this Microscheduler to use the efficiency monitor used to create threads in subclasses + * + * @param threadEfficiencyMonitor + */ + public void setThreadEfficiencyMonitor(final ThreadEfficiencyMonitor threadEfficiencyMonitor) { + this.threadEfficiencyMonitor = threadEfficiencyMonitor; + } + /** * Walks a walker over the given list of intervals. * @@ -183,6 +202,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { traversalEngine.printOnTraversalDone(); } + /** + * Must be called by subclasses when execute is done + */ + protected void executionIsDone() { + // Print out the threading efficiency of this HMS, if state monitoring is enabled + if ( threadEfficiencyMonitor != null ) { + // include the master thread information + threadEfficiencyMonitor.threadIsDone(Thread.currentThread()); + threadEfficiencyMonitor.printUsageInformation(logger); + } + } + /** * Gets the engine that created this microscheduler. * @return The engine owning this microscheduler. diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java b/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java new file mode 100644 index 000000000..51af08681 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/threading/EfficiencyMonitoringThreadFactory.java @@ -0,0 +1,159 @@ +/* + * The MIT License + * + * Copyright (c) 2009 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package org.broadinstitute.sting.utils.threading; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; +import org.broadinstitute.sting.utils.AutoFormattingTime; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Creates threads that automatically monitor their efficiency via the parent ThreadEfficiencyMonitor + * + * User: depristo + * Date: 8/14/12 + * Time: 8:47 AM + */ +@Invariant({ + "activeThreads.size() <= nThreadsToCreate", + "countDownLatch.getCount() <= nThreadsToCreate", + "nThreadsCreated <= nThreadsToCreate" +}) +public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor implements ThreadFactory { + final int nThreadsToCreate; + final List activeThreads; + + int nThreadsCreated = 0; + + /** + * Counts down the number of active activeThreads whose runtime info hasn't been incorporated into + * times. Counts down from nThreadsToCreate to 0, at which point any code waiting + * on the final times is freed to run. + */ + final CountDownLatch countDownLatch; + + /** + * Create a new factory generating threads whose runtime and contention + * behavior is tracked in this factory. + * + * @param nThreadsToCreate the number of threads we will create in the factory before it's considered complete + */ + public EfficiencyMonitoringThreadFactory(final int nThreadsToCreate) { + super(); + if ( nThreadsToCreate <= 0 ) throw new IllegalArgumentException("nThreadsToCreate <= 0: " + nThreadsToCreate); + + this.nThreadsToCreate = nThreadsToCreate; + activeThreads = new ArrayList(nThreadsToCreate); + countDownLatch = new CountDownLatch(nThreadsToCreate); + } + + /** + * How many threads have been created by this factory so far? + * @return + */ + @Ensures("result >= 0") + public int getNThreadsCreated() { + return nThreadsCreated; + } + + /** + * Only useful for testing, so that we can wait for all of the threads in the factory to complete running + * + * @throws InterruptedException + */ + protected void waitForAllThreadsToComplete() throws InterruptedException { + countDownLatch.await(); + } + + @Ensures({ + "activeThreads.size() < old(activeThreads.size())", + "! activeThreads.contains(thread)", + "countDownLatch.getCount() < old(countDownLatch.getCount())" + }) + @Override + public synchronized void threadIsDone(final Thread thread) { + nThreadsAnalyzed++; + + if ( DEBUG ) logger.warn(" Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); + + super.threadIsDone(thread); + + // remove the thread from the list of active activeThreads + if ( ! activeThreads.remove(thread) ) + throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads"); + + // one less thread is live for those blocking on all activeThreads to be complete + countDownLatch.countDown(); + if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); + } + + /** + * Create a new thread from this factory + * + * @param runnable + * @return + */ + @Override + @Ensures({ + "activeThreads.size() > old(activeThreads.size())", + "activeThreads.contains(result)", + "nThreadsCreated == old(nThreadsCreated) + 1" + }) + public synchronized Thread newThread(final Runnable runnable) { + if ( activeThreads.size() >= nThreadsToCreate) + throw new IllegalStateException("Attempting to create more activeThreads than allowed by constructor argument nThreadsToCreate " + nThreadsToCreate); + + nThreadsCreated++; + final Thread myThread = new TrackingThread(runnable); + activeThreads.add(myThread); + return myThread; + } + + /** + * A wrapper around Thread that tracks the runtime of the thread and calls threadIsDone() when complete + */ + private class TrackingThread extends Thread { + private TrackingThread(Runnable runnable) { + super(runnable); + } + + @Override + public void run() { + super.run(); + threadIsDone(this); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactory.java b/public/java/src/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactory.java deleted file mode 100644 index a62501f08..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactory.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * The MIT License - * - * Copyright (c) 2009 The Broad Institute - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package org.broadinstitute.sting.utils.threading; - -import com.google.java.contract.Ensures; -import com.google.java.contract.Invariant; -import com.google.java.contract.Requires; -import org.apache.log4j.Logger; -import org.apache.log4j.Priority; -import org.broadinstitute.sting.utils.AutoFormattingTime; - -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * Create activeThreads, collecting statistics about their running state over time - * - * Uses a ThreadMXBean to capture info via ThreadInfo - * - * User: depristo - * Date: 8/14/12 - * Time: 8:47 AM - */ -@Invariant({ - "activeThreads.size() <= nThreadsToCreate", - "countDownLatch.getCount() <= nThreadsToCreate", - "nThreadsCreated <= nThreadsToCreate" -}) -public class StateMonitoringThreadFactory implements ThreadFactory { - protected static final boolean DEBUG = true; - private static Logger logger = Logger.getLogger(StateMonitoringThreadFactory.class); - - public enum State { - BLOCKING("blocking on synchronized data structure"), - WAITING("waiting on some other thread"), - USER_CPU("doing productive CPU work"), - WAITING_FOR_IO("waiting for I/O"); - - private final String userFriendlyName; - - private State(String userFriendlyName) { - this.userFriendlyName = userFriendlyName; - } - - public String getUserFriendlyName() { - return userFriendlyName; - } - } - - // todo -- it would be nice to not have to specify upfront the number of threads. - // todo -- can we dynamically increment countDownLatch? It seems not... - final int nThreadsToCreate; - final List activeThreads; - final EnumMap times = new EnumMap(State.class); - - int nThreadsCreated = 0; - - /** - * The bean used to get the thread info about blocked and waiting times - */ - final ThreadMXBean bean; - - /** - * Counts down the number of active activeThreads whose runtime info hasn't been incorporated into - * times. Counts down from nThreadsToCreate to 0, at which point any code waiting - * on the final times is freed to run. - */ - final CountDownLatch countDownLatch; - - /** - * Create a new factory generating threads whose runtime and contention - * behavior is tracked in this factory. - * - * @param nThreadsToCreate the number of threads we will create in the factory before it's considered complete - * // TODO -- remove argument when we figure out how to implement this capability - */ - public StateMonitoringThreadFactory(final int nThreadsToCreate) { - if ( nThreadsToCreate <= 0 ) throw new IllegalArgumentException("nThreadsToCreate <= 0: " + nThreadsToCreate); - - this.nThreadsToCreate = nThreadsToCreate; - activeThreads = new ArrayList(nThreadsToCreate); - - // initialize times to 0 - for ( final State state : State.values() ) - times.put(state, 0l); - - // get the bean, and start tracking - bean = ManagementFactory.getThreadMXBean(); - if ( bean.isThreadContentionMonitoringSupported() ) - bean.setThreadContentionMonitoringEnabled(true); - else - logger.warn("Thread contention monitoring not supported, we cannot track GATK multi-threaded efficiency"); - //bean.setThreadCpuTimeEnabled(true); - - if ( bean.isThreadCpuTimeSupported() ) - bean.setThreadCpuTimeEnabled(true); - else - logger.warn("Thread CPU monitoring not supported, we cannot track GATK multi-threaded efficiency"); - - countDownLatch = new CountDownLatch(nThreadsToCreate); - } - - /** - * Get the time spent in state across all threads created by this factory - * - * @param state to get information about - * @return the time in milliseconds - */ - @Ensures({"result >= 0"}) - public synchronized long getStateTime(final State state) { - return times.get(state); - } - - /** - * Get the total time spent in all states across all threads created by this factory - * - * @return the time in milliseconds - */ - @Ensures({"result >= 0"}) - public synchronized long getTotalTime() { - long total = 0; - for ( final long time : times.values() ) - total += time; - return total; - } - - /** - * Get the fraction of time spent in state across all threads created by this factory - * - * @return the fraction (0.0-1.0) of time spent in state over all state times of all threads - */ - @Ensures({"result >= 0.0", "result <= 1.0"}) - public synchronized double getStateFraction(final State state) { - return getStateTime(state) / (1.0 * Math.max(getTotalTime(), 1)); - } - - /** - * How many threads have been created by this factory so far? - * @return - */ - @Ensures("result >= 0") - public int getNThreadsCreated() { - return nThreadsCreated; - } - - /** - * Only useful for testing, so that we can wait for all of the threads in the factory to complete running - * - * @throws InterruptedException - */ - protected void waitForAllThreadsToComplete() throws InterruptedException { - countDownLatch.await(); - } - - @Override - public synchronized String toString() { - final StringBuilder b = new StringBuilder(); - - b.append("total ").append(getTotalTime()).append(" "); - for ( final State state : State.values() ) { - b.append(state).append(" ").append(getStateTime(state)).append(" "); - } - - return b.toString(); - } - - /** - * Print usage information about threads from this factory to logger - * with the INFO priority - * - * @param logger - */ - public synchronized void printUsageInformation(final Logger logger) { - printUsageInformation(logger, Priority.INFO); - } - - /** - * Print usage information about threads from this factory to logger - * with the provided priority - * - * @param logger - */ - public synchronized void printUsageInformation(final Logger logger, final Priority priority) { - logger.log(priority, "Number of activeThreads used: " + getNThreadsCreated()); - logger.log(priority, "Total runtime " + new AutoFormattingTime(TimeUnit.MILLISECONDS.toSeconds(getTotalTime()))); - for ( final State state : State.values() ) { - logger.log(priority, String.format(" Fraction of time spent %s is %.2f (%s)", - state.getUserFriendlyName(), - getStateFraction(state), - new AutoFormattingTime(getStateTime(state) / 1000.0))); - } - logger.log(priority, String.format("CPU efficiency : %.2f%% of time spent doing productive work", - getStateFraction(State.USER_CPU) * 100)); - logger.log(priority, String.format("I/O inefficiency: %.2f%% of time spent waiting on I/O", - getStateFraction(State.WAITING_FOR_IO) * 100)); - } - - /** - * Create a new thread from this factory - * - * @param runnable - * @return - */ - @Override - @Ensures({ - "activeThreads.size() > old(activeThreads.size())", - "activeThreads.contains(result)", - "nThreadsCreated == old(nThreadsCreated) + 1" - }) - public synchronized Thread newThread(final Runnable runnable) { - if ( activeThreads.size() >= nThreadsToCreate) - throw new IllegalStateException("Attempting to create more activeThreads than allowed by constructor argument nThreadsToCreate " + nThreadsToCreate); - - nThreadsCreated++; - final Thread myThread = new TrackingThread(runnable); - activeThreads.add(myThread); - return myThread; - } - - /** - * Update the information about completed thread that ran for runtime in milliseconds - * - * This method updates all of the key timing and tracking information in the factory so that - * thread can be retired. After this call the factory shouldn't have a pointer to the thread any longer - * - * @param thread the thread whose information we are updating - */ - @Ensures({ - "activeThreads.size() < old(activeThreads.size())", - "! activeThreads.contains(thread)", - "getTotalTime() >= old(getTotalTime())", - "countDownLatch.getCount() < old(countDownLatch.getCount())" - }) - private synchronized void threadIsDone(final Thread thread) { - if ( DEBUG ) logger.warn(" Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); - if ( DEBUG ) logger.warn("UpdateThreadInfo called"); - - final long threadID = thread.getId(); - final ThreadInfo info = bean.getThreadInfo(thread.getId()); - final long totalTimeNano = bean.getThreadCpuTime(threadID); - final long userTimeNano = bean.getThreadUserTime(threadID); - final long systemTimeNano = totalTimeNano - userTimeNano; - final long userTimeInMilliseconds = nanoToMilli(userTimeNano); - final long systemTimeInMilliseconds = nanoToMilli(systemTimeNano); - - if ( info != null ) { - if ( DEBUG ) logger.warn("Updating thread with user runtime " + userTimeInMilliseconds + " and system runtime " + systemTimeInMilliseconds + " of which blocked " + info.getBlockedTime() + " and waiting " + info.getWaitedTime()); - incTimes(State.BLOCKING, info.getBlockedTime()); - incTimes(State.WAITING, info.getWaitedTime()); - incTimes(State.USER_CPU, userTimeInMilliseconds); - incTimes(State.WAITING_FOR_IO, systemTimeInMilliseconds); - } - - // remove the thread from the list of active activeThreads - if ( ! activeThreads.remove(thread) ) - throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads"); - - // one less thread is live for those blocking on all activeThreads to be complete - countDownLatch.countDown(); - if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName()); - } - - /** - * Helper function that increments the times counter by by for state - * - * @param state - * @param by - */ - @Requires({"state != null", "by >= 0"}) - @Ensures("getTotalTime() == old(getTotalTime()) + by") - private synchronized void incTimes(final State state, final long by) { - times.put(state, times.get(state) + by); - } - - private static long nanoToMilli(final long timeInNano) { - return TimeUnit.NANOSECONDS.toMillis(timeInNano); - } - - /** - * A wrapper around Thread that tracks the runtime of the thread and calls threadIsDone() when complete - */ - private class TrackingThread extends Thread { - private TrackingThread(Runnable runnable) { - super(runnable); - } - - @Override - public void run() { - super.run(); - threadIsDone(this); - } - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java b/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java new file mode 100644 index 000000000..ef836a06d --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java @@ -0,0 +1,206 @@ +package org.broadinstitute.sting.utils.threading; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; +import org.broadinstitute.sting.utils.AutoFormattingTime; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.EnumMap; +import java.util.concurrent.TimeUnit; + +/** + * Uses an MXBean to monitor thread efficiency + * + * Once the monitor is created, calls to threadIsDone() can be used to add information + * about the efficiency of the provided thread to this monitor. + * + * Provides simple print() for displaying efficiency information to a logger + * + * User: depristo + * Date: 8/22/12 + * Time: 10:48 AM + */ +@Invariant({"nThreadsAnalyzed >= 0"}) +public class ThreadEfficiencyMonitor { + protected static final boolean DEBUG = false; + protected static Logger logger = Logger.getLogger(EfficiencyMonitoringThreadFactory.class); + final EnumMap times = new EnumMap(State.class); + + /** + * The number of threads we've included in our efficiency monitoring + */ + int nThreadsAnalyzed = 0; + + /** + * The bean used to get the thread info about blocked and waiting times + */ + final ThreadMXBean bean; + + public ThreadEfficiencyMonitor() { + bean = ManagementFactory.getThreadMXBean(); + + // get the bean, and start tracking + if ( bean.isThreadContentionMonitoringSupported() ) + bean.setThreadContentionMonitoringEnabled(true); + else + logger.warn("Thread contention monitoring not supported, we cannot track GATK multi-threaded efficiency"); + //bean.setThreadCpuTimeEnabled(true); + + if ( bean.isThreadCpuTimeSupported() ) + bean.setThreadCpuTimeEnabled(true); + else + logger.warn("Thread CPU monitoring not supported, we cannot track GATK multi-threaded efficiency"); + + // initialize times to 0 + for ( final State state : State.values() ) + times.put(state, 0l); + } + + private static long nanoToMilli(final long timeInNano) { + return TimeUnit.NANOSECONDS.toMillis(timeInNano); + } + + /** + * Get the time spent in state across all threads created by this factory + * + * @param state to get information about + * @return the time in milliseconds + */ + @Ensures({"result >= 0"}) + public synchronized long getStateTime(final State state) { + return times.get(state); + } + + /** + * Get the total time spent in all states across all threads created by this factory + * + * @return the time in milliseconds + */ + @Ensures({"result >= 0"}) + public synchronized long getTotalTime() { + long total = 0; + for ( final long time : times.values() ) + total += time; + return total; + } + + /** + * Get the fraction of time spent in state across all threads created by this factory + * + * @return the percentage (0.0-100.0) of time spent in state over all state times of all threads + */ + @Ensures({"result >= 0.0", "result <= 100.0"}) + public synchronized double getStatePercent(final State state) { + return (100.0 * getStateTime(state)) / Math.max(getTotalTime(), 1); + } + + public int getnThreadsAnalyzed() { + return nThreadsAnalyzed; + } + + @Override + public synchronized String toString() { + final StringBuilder b = new StringBuilder(); + + b.append("total ").append(getTotalTime()).append(" "); + for ( final State state : State.values() ) { + b.append(state).append(" ").append(getStateTime(state)).append(" "); + } + + return b.toString(); + } + + /** + * Print usage information about threads from this factory to logger + * with the INFO priority + * + * @param logger + */ + public synchronized void printUsageInformation(final Logger logger) { + printUsageInformation(logger, Priority.INFO); + } + + /** + * Print usage information about threads from this factory to logger + * with the provided priority + * + * @param logger + */ + public synchronized void printUsageInformation(final Logger logger, final Priority priority) { + logger.debug("Number of threads monitored: " + getnThreadsAnalyzed()); + logger.debug("Total runtime " + new AutoFormattingTime(TimeUnit.MILLISECONDS.toSeconds(getTotalTime()))); + for ( final State state : State.values() ) { + logger.debug(String.format("\tPercent of time spent %s is %.2f", state.getUserFriendlyName(), getStatePercent(state))); + } + logger.log(priority, String.format("CPU efficiency : %6.2f%% of time spent %s", getStatePercent(State.USER_CPU), State.USER_CPU.getUserFriendlyName())); + logger.log(priority, String.format("Walker inefficiency : %6.2f%% of time spent %s", getStatePercent(State.BLOCKING), State.BLOCKING.getUserFriendlyName())); + logger.log(priority, String.format("I/O inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING_FOR_IO), State.WAITING_FOR_IO.getUserFriendlyName())); + } + + /** + * Update the information about completed thread that ran for runtime in milliseconds + * + * This method updates all of the key timing and tracking information in the factory so that + * thread can be retired. After this call the factory shouldn't have a pointer to the thread any longer + * + * @param thread the thread whose information we are updating + */ + @Ensures({ + "getTotalTime() >= old(getTotalTime())" + }) + public synchronized void threadIsDone(final Thread thread) { + nThreadsAnalyzed++; + + if ( DEBUG ) logger.warn("UpdateThreadInfo called"); + + final long threadID = thread.getId(); + final ThreadInfo info = bean.getThreadInfo(thread.getId()); + final long totalTimeNano = bean.getThreadCpuTime(threadID); + final long userTimeNano = bean.getThreadUserTime(threadID); + final long systemTimeNano = totalTimeNano - userTimeNano; + final long userTimeInMilliseconds = nanoToMilli(userTimeNano); + final long systemTimeInMilliseconds = nanoToMilli(systemTimeNano); + + if ( info != null ) { + if ( DEBUG ) logger.warn("Updating thread with user runtime " + userTimeInMilliseconds + " and system runtime " + systemTimeInMilliseconds + " of which blocked " + info.getBlockedTime() + " and waiting " + info.getWaitedTime()); + incTimes(State.BLOCKING, info.getBlockedTime()); + incTimes(State.WAITING, info.getWaitedTime()); + incTimes(State.USER_CPU, userTimeInMilliseconds); + incTimes(State.WAITING_FOR_IO, systemTimeInMilliseconds); + } + } + + /** + * Helper function that increments the times counter by by for state + * + * @param state + * @param by + */ + @Requires({"state != null", "by >= 0"}) + @Ensures("getTotalTime() == old(getTotalTime()) + by") + private synchronized void incTimes(final State state, final long by) { + times.put(state, times.get(state) + by); + } + + public enum State { + BLOCKING("blocking on synchronized data structures"), + WAITING("waiting on some other thread"), + USER_CPU("doing productive CPU work"), + WAITING_FOR_IO("waiting for I/O"); + + private final String userFriendlyName; + + private State(String userFriendlyName) { + this.userFriendlyName = userFriendlyName; + } + + public String getUserFriendlyName() { + return userFriendlyName; + } + } +} diff --git a/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java index b41070a14..0b655873d 100755 --- a/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/threading/StateMonitoringThreadFactoryUnitTest.java @@ -47,24 +47,24 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { private class StateTest extends TestDataProvider { private final double TOLERANCE = 0.1; // willing to tolerate a 10% error - final List statesForThreads; + final List statesForThreads; - public StateTest(final List statesForThreads) { + public StateTest(final List statesForThreads) { super(StateTest.class); this.statesForThreads = statesForThreads; setName("StateTest " + Utils.join(",", statesForThreads)); } - public List getStatesForThreads() { + public List getStatesForThreads() { return statesForThreads; } public int getNStates() { return statesForThreads.size(); } - public double maxStateFraction(final StateMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; } - public double minStateFraction(final StateMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; } + public double maxStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; } + public double minStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; } - private double fraction(final StateMonitoringThreadFactory.State state) { + private double fraction(final EfficiencyMonitoringThreadFactory.State state) { return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size()); } } @@ -74,9 +74,9 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { * requested for input argument */ private static class StateTestThread implements Callable { - private final StateMonitoringThreadFactory.State stateToImplement; + private final EfficiencyMonitoringThreadFactory.State stateToImplement; - private StateTestThread(final StateMonitoringThreadFactory.State stateToImplement) { + private StateTestThread(final EfficiencyMonitoringThreadFactory.State stateToImplement) { this.stateToImplement = stateToImplement; } @@ -95,10 +95,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { Thread.currentThread().sleep(THREAD_TARGET_DURATION_IN_MILLISECOND); return 0.0; case BLOCKING: - if ( StateMonitoringThreadFactory.DEBUG ) logger.warn("Blocking..."); + if ( EfficiencyMonitoringThreadFactory.DEBUG ) logger.warn("Blocking..."); synchronized (GLOBAL_LOCK) { // the GLOBAL_LOCK must be held by the unit test itself for this to properly block - if ( StateMonitoringThreadFactory.DEBUG ) logger.warn(" ... done blocking"); + if ( EfficiencyMonitoringThreadFactory.DEBUG ) logger.warn(" ... done blocking"); } return 0.0; case WAITING_FOR_IO: @@ -114,10 +114,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { @DataProvider(name = "StateTest") public Object[][] createStateTest() { for ( final int nThreads : Arrays.asList(3) ) { - //final List allStates = Arrays.asList(StateMonitoringThreadFactory.State.WAITING_FOR_IO); - final List allStates = Arrays.asList(StateMonitoringThreadFactory.State.USER_CPU, StateMonitoringThreadFactory.State.WAITING, StateMonitoringThreadFactory.State.BLOCKING); - //final List allStates = Arrays.asList(StateMonitoringThreadFactory.State.values()); - for (final List states : Utils.makePermutations(allStates, nThreads, true) ) { + //final List allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.WAITING_FOR_IO); + final List allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.USER_CPU, EfficiencyMonitoringThreadFactory.State.WAITING, EfficiencyMonitoringThreadFactory.State.BLOCKING); + //final List allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.values()); + for (final List states : Utils.makePermutations(allStates, nThreads, true) ) { //if ( Collections.frequency(states, Thread.State.BLOCKED) > 0) new StateTest(states); } @@ -129,13 +129,13 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { @Test(enabled = true, dataProvider = "StateTest") public void testStateTest(final StateTest test) throws InterruptedException { // allows us to test blocking - final StateMonitoringThreadFactory factory = new StateMonitoringThreadFactory(test.getNStates()); + final EfficiencyMonitoringThreadFactory factory = new EfficiencyMonitoringThreadFactory(test.getNStates()); final ExecutorService threadPool = Executors.newFixedThreadPool(test.getNStates(), factory); logger.warn("Running " + test); synchronized (GLOBAL_LOCK) { //logger.warn(" Have lock"); - for ( final StateMonitoringThreadFactory.State threadToRunState : test.getStatesForThreads() ) + for ( final EfficiencyMonitoringThreadFactory.State threadToRunState : test.getStatesForThreads() ) threadPool.submit(new StateTestThread(threadToRunState)); // lock has to be here for the whole running of the activeThreads but end before the sleep so the blocked activeThreads @@ -158,10 +158,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest { Assert.assertTrue(totalTime >= minTime, "Factory results not properly accumulated: totalTime = " + totalTime + " < minTime = " + minTime); Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime); - for (final StateMonitoringThreadFactory.State state : StateMonitoringThreadFactory.State.values() ) { + for (final EfficiencyMonitoringThreadFactory.State state : EfficiencyMonitoringThreadFactory.State.values() ) { final double min = test.minStateFraction(state); final double max = test.maxStateFraction(state); - final double obs = factory.getStateFraction(state); + final double obs = factory.getStatePercent(state); // logger.warn(" Checking " + state // + " min " + String.format("%.2f", min) // + " max " + String.format("%.2f", max)