GSA-507: Thread monitoring refactored so it can work without a thread factory
-- Old version StateMonitoringThreadFactory refactored into base class ThreadEfficiencyMonitor and subclass EfficiencyMonitoringThreadFactory. -- Base class is used by LinearMicroScheduler to monitor performance of GATK in single threaded mode -- MicroScheduler now handles management of the efficiency monitor. Includes master thread in monitor, meaning that reduce is now included for both schedulers
This commit is contained in:
parent
f876c51277
commit
e1293f0ef2
|
|
@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
||||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.threading.StateMonitoringThreadFactory;
|
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
|
||||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
@ -73,9 +73,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
/** What is the total time spent merging output? */
|
/** What is the total time spent merging output? */
|
||||||
private long totalOutputMergeTime = 0;
|
private long totalOutputMergeTime = 0;
|
||||||
|
|
||||||
/** may be null */
|
|
||||||
final StateMonitoringThreadFactory monitoringThreadFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new hierarchical microscheduler to process the given reads and reference.
|
* Create a new hierarchical microscheduler to process the given reads and reference.
|
||||||
*
|
*
|
||||||
|
|
@ -94,10 +91,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
super(engine, walker, reads, reference, rods);
|
super(engine, walker, reads, reference, rods);
|
||||||
|
|
||||||
if ( monitorThreadPerformance ) {
|
if ( monitorThreadPerformance ) {
|
||||||
this.monitoringThreadFactory = new StateMonitoringThreadFactory(nThreadsToUse);
|
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse);
|
||||||
|
setThreadEfficiencyMonitor(monitoringThreadFactory);
|
||||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
|
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
|
||||||
} else {
|
} else {
|
||||||
this.monitoringThreadFactory = null;
|
|
||||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -157,19 +154,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
// do final cleanup operations
|
// do final cleanup operations
|
||||||
outputTracker.close();
|
outputTracker.close();
|
||||||
cleanup();
|
cleanup();
|
||||||
printThreadingEfficiency();
|
executionIsDone();
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Print out the threading efficiency of this HMS, if state monitoring is enabled
|
|
||||||
*/
|
|
||||||
private void printThreadingEfficiency() {
|
|
||||||
if ( monitoringThreadFactory != null )
|
|
||||||
monitoringThreadFactory.printUsageInformation(logger);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run the initialize method of the walker. Ensure that any calls
|
* Run the initialize method of the walker. Ensure that any calls
|
||||||
* to the output stream will bypass thread local storage and write
|
* to the output stream will bypass thread local storage and write
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import org.broadinstitute.sting.gatk.io.OutputTracker;
|
||||||
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
|
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
import org.broadinstitute.sting.utils.SampleUtils;
|
import org.broadinstitute.sting.utils.SampleUtils;
|
||||||
|
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
|
@ -33,8 +34,17 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
* @param reference Reference for driving the traversal.
|
* @param reference Reference for driving the traversal.
|
||||||
* @param rods Reference-ordered data.
|
* @param rods Reference-ordered data.
|
||||||
*/
|
*/
|
||||||
protected LinearMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods ) {
|
protected LinearMicroScheduler(final GenomeAnalysisEngine engine,
|
||||||
|
final Walker walker,
|
||||||
|
final SAMDataSource reads,
|
||||||
|
final IndexedFastaSequenceFile reference,
|
||||||
|
final Collection<ReferenceOrderedDataSource> rods,
|
||||||
|
final boolean monitorThreadPerformance ) {
|
||||||
super(engine, walker, reads, reference, rods);
|
super(engine, walker, reads, reference, rods);
|
||||||
|
|
||||||
|
if ( monitorThreadPerformance )
|
||||||
|
setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -88,6 +98,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
|
|
||||||
outputTracker.close();
|
outputTracker.close();
|
||||||
cleanup();
|
cleanup();
|
||||||
|
executionIsDone();
|
||||||
|
|
||||||
return accumulator;
|
return accumulator;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,8 @@ import org.broadinstitute.sting.gatk.traversals.*;
|
||||||
import org.broadinstitute.sting.gatk.walkers.*;
|
import org.broadinstitute.sting.gatk.walkers.*;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||||
|
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
|
||||||
|
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
||||||
|
|
||||||
import javax.management.JMException;
|
import javax.management.JMException;
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
|
|
@ -79,6 +81,13 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
private final MBeanServer mBeanServer;
|
private final MBeanServer mBeanServer;
|
||||||
private final ObjectName mBeanName;
|
private final ObjectName mBeanName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Threading efficiency monitor for tracking the resource utilization of the GATK
|
||||||
|
*
|
||||||
|
* may be null
|
||||||
|
*/
|
||||||
|
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
||||||
* selected walker.
|
* selected walker.
|
||||||
|
|
@ -102,7 +111,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
} else {
|
} else {
|
||||||
if(threadAllocation.getNumCPUThreads() > 1)
|
if(threadAllocation.getNumCPUThreads() > 1)
|
||||||
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
||||||
return new LinearMicroScheduler(engine, walker, reads, reference, rods);
|
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.shouldMonitorThreads());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -150,6 +159,16 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inform this Microscheduler to use the efficiency monitor used to create threads in subclasses
|
||||||
|
*
|
||||||
|
* @param threadEfficiencyMonitor
|
||||||
|
*/
|
||||||
|
public void setThreadEfficiencyMonitor(final ThreadEfficiencyMonitor threadEfficiencyMonitor) {
|
||||||
|
this.threadEfficiencyMonitor = threadEfficiencyMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Walks a walker over the given list of intervals.
|
* Walks a walker over the given list of intervals.
|
||||||
*
|
*
|
||||||
|
|
@ -183,6 +202,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
traversalEngine.printOnTraversalDone();
|
traversalEngine.printOnTraversalDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Must be called by subclasses when execute is done
|
||||||
|
*/
|
||||||
|
protected void executionIsDone() {
|
||||||
|
// Print out the threading efficiency of this HMS, if state monitoring is enabled
|
||||||
|
if ( threadEfficiencyMonitor != null ) {
|
||||||
|
// include the master thread information
|
||||||
|
threadEfficiencyMonitor.threadIsDone(Thread.currentThread());
|
||||||
|
threadEfficiencyMonitor.printUsageInformation(logger);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the engine that created this microscheduler.
|
* Gets the engine that created this microscheduler.
|
||||||
* @return The engine owning this microscheduler.
|
* @return The engine owning this microscheduler.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,159 @@
|
||||||
|
/*
|
||||||
|
* The MIT License
|
||||||
|
*
|
||||||
|
* Copyright (c) 2009 The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to deal
|
||||||
|
* in the Software without restriction, including without limitation the rights
|
||||||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in
|
||||||
|
* all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
* THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
package org.broadinstitute.sting.utils.threading;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
|
import com.google.java.contract.Invariant;
|
||||||
|
import com.google.java.contract.Requires;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.Priority;
|
||||||
|
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
||||||
|
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadInfo;
|
||||||
|
import java.lang.management.ThreadMXBean;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates threads that automatically monitor their efficiency via the parent ThreadEfficiencyMonitor
|
||||||
|
*
|
||||||
|
* User: depristo
|
||||||
|
* Date: 8/14/12
|
||||||
|
* Time: 8:47 AM
|
||||||
|
*/
|
||||||
|
@Invariant({
|
||||||
|
"activeThreads.size() <= nThreadsToCreate",
|
||||||
|
"countDownLatch.getCount() <= nThreadsToCreate",
|
||||||
|
"nThreadsCreated <= nThreadsToCreate"
|
||||||
|
})
|
||||||
|
public class EfficiencyMonitoringThreadFactory extends ThreadEfficiencyMonitor implements ThreadFactory {
|
||||||
|
final int nThreadsToCreate;
|
||||||
|
final List<Thread> activeThreads;
|
||||||
|
|
||||||
|
int nThreadsCreated = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counts down the number of active activeThreads whose runtime info hasn't been incorporated into
|
||||||
|
* times. Counts down from nThreadsToCreate to 0, at which point any code waiting
|
||||||
|
* on the final times is freed to run.
|
||||||
|
*/
|
||||||
|
final CountDownLatch countDownLatch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new factory generating threads whose runtime and contention
|
||||||
|
* behavior is tracked in this factory.
|
||||||
|
*
|
||||||
|
* @param nThreadsToCreate the number of threads we will create in the factory before it's considered complete
|
||||||
|
*/
|
||||||
|
public EfficiencyMonitoringThreadFactory(final int nThreadsToCreate) {
|
||||||
|
super();
|
||||||
|
if ( nThreadsToCreate <= 0 ) throw new IllegalArgumentException("nThreadsToCreate <= 0: " + nThreadsToCreate);
|
||||||
|
|
||||||
|
this.nThreadsToCreate = nThreadsToCreate;
|
||||||
|
activeThreads = new ArrayList<Thread>(nThreadsToCreate);
|
||||||
|
countDownLatch = new CountDownLatch(nThreadsToCreate);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many threads have been created by this factory so far?
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Ensures("result >= 0")
|
||||||
|
public int getNThreadsCreated() {
|
||||||
|
return nThreadsCreated;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only useful for testing, so that we can wait for all of the threads in the factory to complete running
|
||||||
|
*
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
protected void waitForAllThreadsToComplete() throws InterruptedException {
|
||||||
|
countDownLatch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ensures({
|
||||||
|
"activeThreads.size() < old(activeThreads.size())",
|
||||||
|
"! activeThreads.contains(thread)",
|
||||||
|
"countDownLatch.getCount() < old(countDownLatch.getCount())"
|
||||||
|
})
|
||||||
|
@Override
|
||||||
|
public synchronized void threadIsDone(final Thread thread) {
|
||||||
|
nThreadsAnalyzed++;
|
||||||
|
|
||||||
|
if ( DEBUG ) logger.warn(" Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
|
||||||
|
|
||||||
|
super.threadIsDone(thread);
|
||||||
|
|
||||||
|
// remove the thread from the list of active activeThreads
|
||||||
|
if ( ! activeThreads.remove(thread) )
|
||||||
|
throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads");
|
||||||
|
|
||||||
|
// one less thread is live for those blocking on all activeThreads to be complete
|
||||||
|
countDownLatch.countDown();
|
||||||
|
if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new thread from this factory
|
||||||
|
*
|
||||||
|
* @param runnable
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
@Ensures({
|
||||||
|
"activeThreads.size() > old(activeThreads.size())",
|
||||||
|
"activeThreads.contains(result)",
|
||||||
|
"nThreadsCreated == old(nThreadsCreated) + 1"
|
||||||
|
})
|
||||||
|
public synchronized Thread newThread(final Runnable runnable) {
|
||||||
|
if ( activeThreads.size() >= nThreadsToCreate)
|
||||||
|
throw new IllegalStateException("Attempting to create more activeThreads than allowed by constructor argument nThreadsToCreate " + nThreadsToCreate);
|
||||||
|
|
||||||
|
nThreadsCreated++;
|
||||||
|
final Thread myThread = new TrackingThread(runnable);
|
||||||
|
activeThreads.add(myThread);
|
||||||
|
return myThread;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper around Thread that tracks the runtime of the thread and calls threadIsDone() when complete
|
||||||
|
*/
|
||||||
|
private class TrackingThread extends Thread {
|
||||||
|
private TrackingThread(Runnable runnable) {
|
||||||
|
super(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
super.run();
|
||||||
|
threadIsDone(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,321 +0,0 @@
|
||||||
/*
|
|
||||||
* The MIT License
|
|
||||||
*
|
|
||||||
* Copyright (c) 2009 The Broad Institute
|
|
||||||
*
|
|
||||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
* of this software and associated documentation files (the "Software"), to deal
|
|
||||||
* in the Software without restriction, including without limitation the rights
|
|
||||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
* copies of the Software, and to permit persons to whom the Software is
|
|
||||||
* furnished to do so, subject to the following conditions:
|
|
||||||
*
|
|
||||||
* The above copyright notice and this permission notice shall be included in
|
|
||||||
* all copies or substantial portions of the Software.
|
|
||||||
*
|
|
||||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
||||||
* THE SOFTWARE.
|
|
||||||
*/
|
|
||||||
package org.broadinstitute.sting.utils.threading;
|
|
||||||
|
|
||||||
import com.google.java.contract.Ensures;
|
|
||||||
import com.google.java.contract.Invariant;
|
|
||||||
import com.google.java.contract.Requires;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.log4j.Priority;
|
|
||||||
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
|
||||||
|
|
||||||
import java.lang.management.ManagementFactory;
|
|
||||||
import java.lang.management.ThreadInfo;
|
|
||||||
import java.lang.management.ThreadMXBean;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.EnumMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create activeThreads, collecting statistics about their running state over time
|
|
||||||
*
|
|
||||||
* Uses a ThreadMXBean to capture info via ThreadInfo
|
|
||||||
*
|
|
||||||
* User: depristo
|
|
||||||
* Date: 8/14/12
|
|
||||||
* Time: 8:47 AM
|
|
||||||
*/
|
|
||||||
@Invariant({
|
|
||||||
"activeThreads.size() <= nThreadsToCreate",
|
|
||||||
"countDownLatch.getCount() <= nThreadsToCreate",
|
|
||||||
"nThreadsCreated <= nThreadsToCreate"
|
|
||||||
})
|
|
||||||
public class StateMonitoringThreadFactory implements ThreadFactory {
|
|
||||||
protected static final boolean DEBUG = true;
|
|
||||||
private static Logger logger = Logger.getLogger(StateMonitoringThreadFactory.class);
|
|
||||||
|
|
||||||
public enum State {
|
|
||||||
BLOCKING("blocking on synchronized data structure"),
|
|
||||||
WAITING("waiting on some other thread"),
|
|
||||||
USER_CPU("doing productive CPU work"),
|
|
||||||
WAITING_FOR_IO("waiting for I/O");
|
|
||||||
|
|
||||||
private final String userFriendlyName;
|
|
||||||
|
|
||||||
private State(String userFriendlyName) {
|
|
||||||
this.userFriendlyName = userFriendlyName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUserFriendlyName() {
|
|
||||||
return userFriendlyName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo -- it would be nice to not have to specify upfront the number of threads.
|
|
||||||
// todo -- can we dynamically increment countDownLatch? It seems not...
|
|
||||||
final int nThreadsToCreate;
|
|
||||||
final List<Thread> activeThreads;
|
|
||||||
final EnumMap<State, Long> times = new EnumMap<State, Long>(State.class);
|
|
||||||
|
|
||||||
int nThreadsCreated = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The bean used to get the thread info about blocked and waiting times
|
|
||||||
*/
|
|
||||||
final ThreadMXBean bean;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Counts down the number of active activeThreads whose runtime info hasn't been incorporated into
|
|
||||||
* times. Counts down from nThreadsToCreate to 0, at which point any code waiting
|
|
||||||
* on the final times is freed to run.
|
|
||||||
*/
|
|
||||||
final CountDownLatch countDownLatch;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new factory generating threads whose runtime and contention
|
|
||||||
* behavior is tracked in this factory.
|
|
||||||
*
|
|
||||||
* @param nThreadsToCreate the number of threads we will create in the factory before it's considered complete
|
|
||||||
* // TODO -- remove argument when we figure out how to implement this capability
|
|
||||||
*/
|
|
||||||
public StateMonitoringThreadFactory(final int nThreadsToCreate) {
|
|
||||||
if ( nThreadsToCreate <= 0 ) throw new IllegalArgumentException("nThreadsToCreate <= 0: " + nThreadsToCreate);
|
|
||||||
|
|
||||||
this.nThreadsToCreate = nThreadsToCreate;
|
|
||||||
activeThreads = new ArrayList<Thread>(nThreadsToCreate);
|
|
||||||
|
|
||||||
// initialize times to 0
|
|
||||||
for ( final State state : State.values() )
|
|
||||||
times.put(state, 0l);
|
|
||||||
|
|
||||||
// get the bean, and start tracking
|
|
||||||
bean = ManagementFactory.getThreadMXBean();
|
|
||||||
if ( bean.isThreadContentionMonitoringSupported() )
|
|
||||||
bean.setThreadContentionMonitoringEnabled(true);
|
|
||||||
else
|
|
||||||
logger.warn("Thread contention monitoring not supported, we cannot track GATK multi-threaded efficiency");
|
|
||||||
//bean.setThreadCpuTimeEnabled(true);
|
|
||||||
|
|
||||||
if ( bean.isThreadCpuTimeSupported() )
|
|
||||||
bean.setThreadCpuTimeEnabled(true);
|
|
||||||
else
|
|
||||||
logger.warn("Thread CPU monitoring not supported, we cannot track GATK multi-threaded efficiency");
|
|
||||||
|
|
||||||
countDownLatch = new CountDownLatch(nThreadsToCreate);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the time spent in state across all threads created by this factory
|
|
||||||
*
|
|
||||||
* @param state to get information about
|
|
||||||
* @return the time in milliseconds
|
|
||||||
*/
|
|
||||||
@Ensures({"result >= 0"})
|
|
||||||
public synchronized long getStateTime(final State state) {
|
|
||||||
return times.get(state);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the total time spent in all states across all threads created by this factory
|
|
||||||
*
|
|
||||||
* @return the time in milliseconds
|
|
||||||
*/
|
|
||||||
@Ensures({"result >= 0"})
|
|
||||||
public synchronized long getTotalTime() {
|
|
||||||
long total = 0;
|
|
||||||
for ( final long time : times.values() )
|
|
||||||
total += time;
|
|
||||||
return total;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the fraction of time spent in state across all threads created by this factory
|
|
||||||
*
|
|
||||||
* @return the fraction (0.0-1.0) of time spent in state over all state times of all threads
|
|
||||||
*/
|
|
||||||
@Ensures({"result >= 0.0", "result <= 1.0"})
|
|
||||||
public synchronized double getStateFraction(final State state) {
|
|
||||||
return getStateTime(state) / (1.0 * Math.max(getTotalTime(), 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How many threads have been created by this factory so far?
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Ensures("result >= 0")
|
|
||||||
public int getNThreadsCreated() {
|
|
||||||
return nThreadsCreated;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Only useful for testing, so that we can wait for all of the threads in the factory to complete running
|
|
||||||
*
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
protected void waitForAllThreadsToComplete() throws InterruptedException {
|
|
||||||
countDownLatch.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized String toString() {
|
|
||||||
final StringBuilder b = new StringBuilder();
|
|
||||||
|
|
||||||
b.append("total ").append(getTotalTime()).append(" ");
|
|
||||||
for ( final State state : State.values() ) {
|
|
||||||
b.append(state).append(" ").append(getStateTime(state)).append(" ");
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Print usage information about threads from this factory to logger
|
|
||||||
* with the INFO priority
|
|
||||||
*
|
|
||||||
* @param logger
|
|
||||||
*/
|
|
||||||
public synchronized void printUsageInformation(final Logger logger) {
|
|
||||||
printUsageInformation(logger, Priority.INFO);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Print usage information about threads from this factory to logger
|
|
||||||
* with the provided priority
|
|
||||||
*
|
|
||||||
* @param logger
|
|
||||||
*/
|
|
||||||
public synchronized void printUsageInformation(final Logger logger, final Priority priority) {
|
|
||||||
logger.log(priority, "Number of activeThreads used: " + getNThreadsCreated());
|
|
||||||
logger.log(priority, "Total runtime " + new AutoFormattingTime(TimeUnit.MILLISECONDS.toSeconds(getTotalTime())));
|
|
||||||
for ( final State state : State.values() ) {
|
|
||||||
logger.log(priority, String.format(" Fraction of time spent %s is %.2f (%s)",
|
|
||||||
state.getUserFriendlyName(),
|
|
||||||
getStateFraction(state),
|
|
||||||
new AutoFormattingTime(getStateTime(state) / 1000.0)));
|
|
||||||
}
|
|
||||||
logger.log(priority, String.format("CPU efficiency : %.2f%% of time spent doing productive work",
|
|
||||||
getStateFraction(State.USER_CPU) * 100));
|
|
||||||
logger.log(priority, String.format("I/O inefficiency: %.2f%% of time spent waiting on I/O",
|
|
||||||
getStateFraction(State.WAITING_FOR_IO) * 100));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new thread from this factory
|
|
||||||
*
|
|
||||||
* @param runnable
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
@Ensures({
|
|
||||||
"activeThreads.size() > old(activeThreads.size())",
|
|
||||||
"activeThreads.contains(result)",
|
|
||||||
"nThreadsCreated == old(nThreadsCreated) + 1"
|
|
||||||
})
|
|
||||||
public synchronized Thread newThread(final Runnable runnable) {
|
|
||||||
if ( activeThreads.size() >= nThreadsToCreate)
|
|
||||||
throw new IllegalStateException("Attempting to create more activeThreads than allowed by constructor argument nThreadsToCreate " + nThreadsToCreate);
|
|
||||||
|
|
||||||
nThreadsCreated++;
|
|
||||||
final Thread myThread = new TrackingThread(runnable);
|
|
||||||
activeThreads.add(myThread);
|
|
||||||
return myThread;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the information about completed thread that ran for runtime in milliseconds
|
|
||||||
*
|
|
||||||
* This method updates all of the key timing and tracking information in the factory so that
|
|
||||||
* thread can be retired. After this call the factory shouldn't have a pointer to the thread any longer
|
|
||||||
*
|
|
||||||
* @param thread the thread whose information we are updating
|
|
||||||
*/
|
|
||||||
@Ensures({
|
|
||||||
"activeThreads.size() < old(activeThreads.size())",
|
|
||||||
"! activeThreads.contains(thread)",
|
|
||||||
"getTotalTime() >= old(getTotalTime())",
|
|
||||||
"countDownLatch.getCount() < old(countDownLatch.getCount())"
|
|
||||||
})
|
|
||||||
private synchronized void threadIsDone(final Thread thread) {
|
|
||||||
if ( DEBUG ) logger.warn(" Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
|
|
||||||
if ( DEBUG ) logger.warn("UpdateThreadInfo called");
|
|
||||||
|
|
||||||
final long threadID = thread.getId();
|
|
||||||
final ThreadInfo info = bean.getThreadInfo(thread.getId());
|
|
||||||
final long totalTimeNano = bean.getThreadCpuTime(threadID);
|
|
||||||
final long userTimeNano = bean.getThreadUserTime(threadID);
|
|
||||||
final long systemTimeNano = totalTimeNano - userTimeNano;
|
|
||||||
final long userTimeInMilliseconds = nanoToMilli(userTimeNano);
|
|
||||||
final long systemTimeInMilliseconds = nanoToMilli(systemTimeNano);
|
|
||||||
|
|
||||||
if ( info != null ) {
|
|
||||||
if ( DEBUG ) logger.warn("Updating thread with user runtime " + userTimeInMilliseconds + " and system runtime " + systemTimeInMilliseconds + " of which blocked " + info.getBlockedTime() + " and waiting " + info.getWaitedTime());
|
|
||||||
incTimes(State.BLOCKING, info.getBlockedTime());
|
|
||||||
incTimes(State.WAITING, info.getWaitedTime());
|
|
||||||
incTimes(State.USER_CPU, userTimeInMilliseconds);
|
|
||||||
incTimes(State.WAITING_FOR_IO, systemTimeInMilliseconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove the thread from the list of active activeThreads
|
|
||||||
if ( ! activeThreads.remove(thread) )
|
|
||||||
throw new IllegalStateException("Thread " + thread + " not in list of active activeThreads");
|
|
||||||
|
|
||||||
// one less thread is live for those blocking on all activeThreads to be complete
|
|
||||||
countDownLatch.countDown();
|
|
||||||
if ( DEBUG ) logger.warn(" -> Countdown " + countDownLatch.getCount() + " in thread " + Thread.currentThread().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function that increments the times counter by by for state
|
|
||||||
*
|
|
||||||
* @param state
|
|
||||||
* @param by
|
|
||||||
*/
|
|
||||||
@Requires({"state != null", "by >= 0"})
|
|
||||||
@Ensures("getTotalTime() == old(getTotalTime()) + by")
|
|
||||||
private synchronized void incTimes(final State state, final long by) {
|
|
||||||
times.put(state, times.get(state) + by);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long nanoToMilli(final long timeInNano) {
|
|
||||||
return TimeUnit.NANOSECONDS.toMillis(timeInNano);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A wrapper around Thread that tracks the runtime of the thread and calls threadIsDone() when complete
|
|
||||||
*/
|
|
||||||
private class TrackingThread extends Thread {
|
|
||||||
private TrackingThread(Runnable runnable) {
|
|
||||||
super(runnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
super.run();
|
|
||||||
threadIsDone(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,206 @@
|
||||||
|
package org.broadinstitute.sting.utils.threading;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
|
import com.google.java.contract.Invariant;
|
||||||
|
import com.google.java.contract.Requires;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.Priority;
|
||||||
|
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
||||||
|
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadInfo;
|
||||||
|
import java.lang.management.ThreadMXBean;
|
||||||
|
import java.util.EnumMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses an MXBean to monitor thread efficiency
|
||||||
|
*
|
||||||
|
* Once the monitor is created, calls to threadIsDone() can be used to add information
|
||||||
|
* about the efficiency of the provided thread to this monitor.
|
||||||
|
*
|
||||||
|
* Provides simple print() for displaying efficiency information to a logger
|
||||||
|
*
|
||||||
|
* User: depristo
|
||||||
|
* Date: 8/22/12
|
||||||
|
* Time: 10:48 AM
|
||||||
|
*/
|
||||||
|
@Invariant({"nThreadsAnalyzed >= 0"})
|
||||||
|
public class ThreadEfficiencyMonitor {
|
||||||
|
protected static final boolean DEBUG = false;
|
||||||
|
protected static Logger logger = Logger.getLogger(EfficiencyMonitoringThreadFactory.class);
|
||||||
|
final EnumMap<State, Long> times = new EnumMap<State, Long>(State.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of threads we've included in our efficiency monitoring
|
||||||
|
*/
|
||||||
|
int nThreadsAnalyzed = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The bean used to get the thread info about blocked and waiting times
|
||||||
|
*/
|
||||||
|
final ThreadMXBean bean;
|
||||||
|
|
||||||
|
public ThreadEfficiencyMonitor() {
|
||||||
|
bean = ManagementFactory.getThreadMXBean();
|
||||||
|
|
||||||
|
// get the bean, and start tracking
|
||||||
|
if ( bean.isThreadContentionMonitoringSupported() )
|
||||||
|
bean.setThreadContentionMonitoringEnabled(true);
|
||||||
|
else
|
||||||
|
logger.warn("Thread contention monitoring not supported, we cannot track GATK multi-threaded efficiency");
|
||||||
|
//bean.setThreadCpuTimeEnabled(true);
|
||||||
|
|
||||||
|
if ( bean.isThreadCpuTimeSupported() )
|
||||||
|
bean.setThreadCpuTimeEnabled(true);
|
||||||
|
else
|
||||||
|
logger.warn("Thread CPU monitoring not supported, we cannot track GATK multi-threaded efficiency");
|
||||||
|
|
||||||
|
// initialize times to 0
|
||||||
|
for ( final State state : State.values() )
|
||||||
|
times.put(state, 0l);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long nanoToMilli(final long timeInNano) {
|
||||||
|
return TimeUnit.NANOSECONDS.toMillis(timeInNano);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the time spent in state across all threads created by this factory
|
||||||
|
*
|
||||||
|
* @param state to get information about
|
||||||
|
* @return the time in milliseconds
|
||||||
|
*/
|
||||||
|
@Ensures({"result >= 0"})
|
||||||
|
public synchronized long getStateTime(final State state) {
|
||||||
|
return times.get(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the total time spent in all states across all threads created by this factory
|
||||||
|
*
|
||||||
|
* @return the time in milliseconds
|
||||||
|
*/
|
||||||
|
@Ensures({"result >= 0"})
|
||||||
|
public synchronized long getTotalTime() {
|
||||||
|
long total = 0;
|
||||||
|
for ( final long time : times.values() )
|
||||||
|
total += time;
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the fraction of time spent in state across all threads created by this factory
|
||||||
|
*
|
||||||
|
* @return the percentage (0.0-100.0) of time spent in state over all state times of all threads
|
||||||
|
*/
|
||||||
|
@Ensures({"result >= 0.0", "result <= 100.0"})
|
||||||
|
public synchronized double getStatePercent(final State state) {
|
||||||
|
return (100.0 * getStateTime(state)) / Math.max(getTotalTime(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getnThreadsAnalyzed() {
|
||||||
|
return nThreadsAnalyzed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized String toString() {
|
||||||
|
final StringBuilder b = new StringBuilder();
|
||||||
|
|
||||||
|
b.append("total ").append(getTotalTime()).append(" ");
|
||||||
|
for ( final State state : State.values() ) {
|
||||||
|
b.append(state).append(" ").append(getStateTime(state)).append(" ");
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print usage information about threads from this factory to logger
|
||||||
|
* with the INFO priority
|
||||||
|
*
|
||||||
|
* @param logger
|
||||||
|
*/
|
||||||
|
public synchronized void printUsageInformation(final Logger logger) {
|
||||||
|
printUsageInformation(logger, Priority.INFO);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print usage information about threads from this factory to logger
|
||||||
|
* with the provided priority
|
||||||
|
*
|
||||||
|
* @param logger
|
||||||
|
*/
|
||||||
|
public synchronized void printUsageInformation(final Logger logger, final Priority priority) {
|
||||||
|
logger.debug("Number of threads monitored: " + getnThreadsAnalyzed());
|
||||||
|
logger.debug("Total runtime " + new AutoFormattingTime(TimeUnit.MILLISECONDS.toSeconds(getTotalTime())));
|
||||||
|
for ( final State state : State.values() ) {
|
||||||
|
logger.debug(String.format("\tPercent of time spent %s is %.2f", state.getUserFriendlyName(), getStatePercent(state)));
|
||||||
|
}
|
||||||
|
logger.log(priority, String.format("CPU efficiency : %6.2f%% of time spent %s", getStatePercent(State.USER_CPU), State.USER_CPU.getUserFriendlyName()));
|
||||||
|
logger.log(priority, String.format("Walker inefficiency : %6.2f%% of time spent %s", getStatePercent(State.BLOCKING), State.BLOCKING.getUserFriendlyName()));
|
||||||
|
logger.log(priority, String.format("I/O inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING_FOR_IO), State.WAITING_FOR_IO.getUserFriendlyName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the information about completed thread that ran for runtime in milliseconds
|
||||||
|
*
|
||||||
|
* This method updates all of the key timing and tracking information in the factory so that
|
||||||
|
* thread can be retired. After this call the factory shouldn't have a pointer to the thread any longer
|
||||||
|
*
|
||||||
|
* @param thread the thread whose information we are updating
|
||||||
|
*/
|
||||||
|
@Ensures({
|
||||||
|
"getTotalTime() >= old(getTotalTime())"
|
||||||
|
})
|
||||||
|
public synchronized void threadIsDone(final Thread thread) {
|
||||||
|
nThreadsAnalyzed++;
|
||||||
|
|
||||||
|
if ( DEBUG ) logger.warn("UpdateThreadInfo called");
|
||||||
|
|
||||||
|
final long threadID = thread.getId();
|
||||||
|
final ThreadInfo info = bean.getThreadInfo(thread.getId());
|
||||||
|
final long totalTimeNano = bean.getThreadCpuTime(threadID);
|
||||||
|
final long userTimeNano = bean.getThreadUserTime(threadID);
|
||||||
|
final long systemTimeNano = totalTimeNano - userTimeNano;
|
||||||
|
final long userTimeInMilliseconds = nanoToMilli(userTimeNano);
|
||||||
|
final long systemTimeInMilliseconds = nanoToMilli(systemTimeNano);
|
||||||
|
|
||||||
|
if ( info != null ) {
|
||||||
|
if ( DEBUG ) logger.warn("Updating thread with user runtime " + userTimeInMilliseconds + " and system runtime " + systemTimeInMilliseconds + " of which blocked " + info.getBlockedTime() + " and waiting " + info.getWaitedTime());
|
||||||
|
incTimes(State.BLOCKING, info.getBlockedTime());
|
||||||
|
incTimes(State.WAITING, info.getWaitedTime());
|
||||||
|
incTimes(State.USER_CPU, userTimeInMilliseconds);
|
||||||
|
incTimes(State.WAITING_FOR_IO, systemTimeInMilliseconds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function that increments the times counter by by for state
|
||||||
|
*
|
||||||
|
* @param state
|
||||||
|
* @param by
|
||||||
|
*/
|
||||||
|
@Requires({"state != null", "by >= 0"})
|
||||||
|
@Ensures("getTotalTime() == old(getTotalTime()) + by")
|
||||||
|
private synchronized void incTimes(final State state, final long by) {
|
||||||
|
times.put(state, times.get(state) + by);
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum State {
|
||||||
|
BLOCKING("blocking on synchronized data structures"),
|
||||||
|
WAITING("waiting on some other thread"),
|
||||||
|
USER_CPU("doing productive CPU work"),
|
||||||
|
WAITING_FOR_IO("waiting for I/O");
|
||||||
|
|
||||||
|
private final String userFriendlyName;
|
||||||
|
|
||||||
|
private State(String userFriendlyName) {
|
||||||
|
this.userFriendlyName = userFriendlyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUserFriendlyName() {
|
||||||
|
return userFriendlyName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -47,24 +47,24 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
private class StateTest extends TestDataProvider {
|
private class StateTest extends TestDataProvider {
|
||||||
private final double TOLERANCE = 0.1; // willing to tolerate a 10% error
|
private final double TOLERANCE = 0.1; // willing to tolerate a 10% error
|
||||||
|
|
||||||
final List<StateMonitoringThreadFactory.State> statesForThreads;
|
final List<EfficiencyMonitoringThreadFactory.State> statesForThreads;
|
||||||
|
|
||||||
public StateTest(final List<StateMonitoringThreadFactory.State> statesForThreads) {
|
public StateTest(final List<EfficiencyMonitoringThreadFactory.State> statesForThreads) {
|
||||||
super(StateTest.class);
|
super(StateTest.class);
|
||||||
this.statesForThreads = statesForThreads;
|
this.statesForThreads = statesForThreads;
|
||||||
setName("StateTest " + Utils.join(",", statesForThreads));
|
setName("StateTest " + Utils.join(",", statesForThreads));
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<StateMonitoringThreadFactory.State> getStatesForThreads() {
|
public List<EfficiencyMonitoringThreadFactory.State> getStatesForThreads() {
|
||||||
return statesForThreads;
|
return statesForThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getNStates() { return statesForThreads.size(); }
|
public int getNStates() { return statesForThreads.size(); }
|
||||||
|
|
||||||
public double maxStateFraction(final StateMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; }
|
public double maxStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) + TOLERANCE; }
|
||||||
public double minStateFraction(final StateMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; }
|
public double minStateFraction(final EfficiencyMonitoringThreadFactory.State state) { return fraction(state) - TOLERANCE; }
|
||||||
|
|
||||||
private double fraction(final StateMonitoringThreadFactory.State state) {
|
private double fraction(final EfficiencyMonitoringThreadFactory.State state) {
|
||||||
return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size());
|
return Collections.frequency(statesForThreads, state) / (1.0 * statesForThreads.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -74,9 +74,9 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
* requested for input argument
|
* requested for input argument
|
||||||
*/
|
*/
|
||||||
private static class StateTestThread implements Callable<Double> {
|
private static class StateTestThread implements Callable<Double> {
|
||||||
private final StateMonitoringThreadFactory.State stateToImplement;
|
private final EfficiencyMonitoringThreadFactory.State stateToImplement;
|
||||||
|
|
||||||
private StateTestThread(final StateMonitoringThreadFactory.State stateToImplement) {
|
private StateTestThread(final EfficiencyMonitoringThreadFactory.State stateToImplement) {
|
||||||
this.stateToImplement = stateToImplement;
|
this.stateToImplement = stateToImplement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -95,10 +95,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
Thread.currentThread().sleep(THREAD_TARGET_DURATION_IN_MILLISECOND);
|
Thread.currentThread().sleep(THREAD_TARGET_DURATION_IN_MILLISECOND);
|
||||||
return 0.0;
|
return 0.0;
|
||||||
case BLOCKING:
|
case BLOCKING:
|
||||||
if ( StateMonitoringThreadFactory.DEBUG ) logger.warn("Blocking...");
|
if ( EfficiencyMonitoringThreadFactory.DEBUG ) logger.warn("Blocking...");
|
||||||
synchronized (GLOBAL_LOCK) {
|
synchronized (GLOBAL_LOCK) {
|
||||||
// the GLOBAL_LOCK must be held by the unit test itself for this to properly block
|
// the GLOBAL_LOCK must be held by the unit test itself for this to properly block
|
||||||
if ( StateMonitoringThreadFactory.DEBUG ) logger.warn(" ... done blocking");
|
if ( EfficiencyMonitoringThreadFactory.DEBUG ) logger.warn(" ... done blocking");
|
||||||
}
|
}
|
||||||
return 0.0;
|
return 0.0;
|
||||||
case WAITING_FOR_IO:
|
case WAITING_FOR_IO:
|
||||||
|
|
@ -114,10 +114,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
@DataProvider(name = "StateTest")
|
@DataProvider(name = "StateTest")
|
||||||
public Object[][] createStateTest() {
|
public Object[][] createStateTest() {
|
||||||
for ( final int nThreads : Arrays.asList(3) ) {
|
for ( final int nThreads : Arrays.asList(3) ) {
|
||||||
//final List<StateMonitoringThreadFactory.State> allStates = Arrays.asList(StateMonitoringThreadFactory.State.WAITING_FOR_IO);
|
//final List<EfficiencyMonitoringThreadFactory.State> allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.WAITING_FOR_IO);
|
||||||
final List<StateMonitoringThreadFactory.State> allStates = Arrays.asList(StateMonitoringThreadFactory.State.USER_CPU, StateMonitoringThreadFactory.State.WAITING, StateMonitoringThreadFactory.State.BLOCKING);
|
final List<EfficiencyMonitoringThreadFactory.State> allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.USER_CPU, EfficiencyMonitoringThreadFactory.State.WAITING, EfficiencyMonitoringThreadFactory.State.BLOCKING);
|
||||||
//final List<StateMonitoringThreadFactory.State> allStates = Arrays.asList(StateMonitoringThreadFactory.State.values());
|
//final List<EfficiencyMonitoringThreadFactory.State> allStates = Arrays.asList(EfficiencyMonitoringThreadFactory.State.values());
|
||||||
for (final List<StateMonitoringThreadFactory.State> states : Utils.makePermutations(allStates, nThreads, true) ) {
|
for (final List<EfficiencyMonitoringThreadFactory.State> states : Utils.makePermutations(allStates, nThreads, true) ) {
|
||||||
//if ( Collections.frequency(states, Thread.State.BLOCKED) > 0)
|
//if ( Collections.frequency(states, Thread.State.BLOCKED) > 0)
|
||||||
new StateTest(states);
|
new StateTest(states);
|
||||||
}
|
}
|
||||||
|
|
@ -129,13 +129,13 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
@Test(enabled = true, dataProvider = "StateTest")
|
@Test(enabled = true, dataProvider = "StateTest")
|
||||||
public void testStateTest(final StateTest test) throws InterruptedException {
|
public void testStateTest(final StateTest test) throws InterruptedException {
|
||||||
// allows us to test blocking
|
// allows us to test blocking
|
||||||
final StateMonitoringThreadFactory factory = new StateMonitoringThreadFactory(test.getNStates());
|
final EfficiencyMonitoringThreadFactory factory = new EfficiencyMonitoringThreadFactory(test.getNStates());
|
||||||
final ExecutorService threadPool = Executors.newFixedThreadPool(test.getNStates(), factory);
|
final ExecutorService threadPool = Executors.newFixedThreadPool(test.getNStates(), factory);
|
||||||
|
|
||||||
logger.warn("Running " + test);
|
logger.warn("Running " + test);
|
||||||
synchronized (GLOBAL_LOCK) {
|
synchronized (GLOBAL_LOCK) {
|
||||||
//logger.warn(" Have lock");
|
//logger.warn(" Have lock");
|
||||||
for ( final StateMonitoringThreadFactory.State threadToRunState : test.getStatesForThreads() )
|
for ( final EfficiencyMonitoringThreadFactory.State threadToRunState : test.getStatesForThreads() )
|
||||||
threadPool.submit(new StateTestThread(threadToRunState));
|
threadPool.submit(new StateTestThread(threadToRunState));
|
||||||
|
|
||||||
// lock has to be here for the whole running of the activeThreads but end before the sleep so the blocked activeThreads
|
// lock has to be here for the whole running of the activeThreads but end before the sleep so the blocked activeThreads
|
||||||
|
|
@ -158,10 +158,10 @@ public class StateMonitoringThreadFactoryUnitTest extends BaseTest {
|
||||||
Assert.assertTrue(totalTime >= minTime, "Factory results not properly accumulated: totalTime = " + totalTime + " < minTime = " + minTime);
|
Assert.assertTrue(totalTime >= minTime, "Factory results not properly accumulated: totalTime = " + totalTime + " < minTime = " + minTime);
|
||||||
Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime);
|
Assert.assertTrue(totalTime <= maxTime, "Factory results not properly accumulated: totalTime = " + totalTime + " > maxTime = " + maxTime);
|
||||||
|
|
||||||
for (final StateMonitoringThreadFactory.State state : StateMonitoringThreadFactory.State.values() ) {
|
for (final EfficiencyMonitoringThreadFactory.State state : EfficiencyMonitoringThreadFactory.State.values() ) {
|
||||||
final double min = test.minStateFraction(state);
|
final double min = test.minStateFraction(state);
|
||||||
final double max = test.maxStateFraction(state);
|
final double max = test.maxStateFraction(state);
|
||||||
final double obs = factory.getStateFraction(state);
|
final double obs = factory.getStatePercent(state);
|
||||||
// logger.warn(" Checking " + state
|
// logger.warn(" Checking " + state
|
||||||
// + " min " + String.format("%.2f", min)
|
// + " min " + String.format("%.2f", min)
|
||||||
// + " max " + String.format("%.2f", max)
|
// + " max " + String.format("%.2f", max)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue