Fixes GSA-515 Nanoscheduler GSA-560 / Fix display of NanoScheduler and MonitoringEfficiency
-- Now prints out a single combined NanoScheduler runtime profile report across all nano schedulers in use. So now if you run with -nt 4 you'll get one combined NanoScheduler profiler across all 4 instances of the NanoScheduler within TraverseXNano.
This commit is contained in:
parent
64ee0a10fe
commit
e25e617d1a
|
|
@ -42,6 +42,7 @@ import org.broadinstitute.sting.gatk.walkers.*;
|
||||||
import org.broadinstitute.sting.utils.MathUtils;
|
import org.broadinstitute.sting.utils.MathUtils;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||||
|
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
||||||
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
||||||
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
||||||
|
|
||||||
|
|
@ -315,6 +316,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
for ( final TraversalEngine te : allCreatedTraversalEngines)
|
for ( final TraversalEngine te : allCreatedTraversalEngines)
|
||||||
te.shutdown();
|
te.shutdown();
|
||||||
|
|
||||||
|
// horrible hack to print nano scheduling information across all nano schedulers, if any were used
|
||||||
|
NanoScheduler.printCombinedRuntimeProfile();
|
||||||
|
|
||||||
allCreatedTraversalEngines.clear();
|
allCreatedTraversalEngines.clear();
|
||||||
availableTraversalEngines.clear();
|
availableTraversalEngines.clear();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -145,4 +145,13 @@ public class SimpleTimer {
|
||||||
public synchronized long getElapsedTimeNano() {
|
public synchronized long getElapsedTimeNano() {
|
||||||
return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano;
|
return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the elapsed time from toAdd to this elapsed time
|
||||||
|
*
|
||||||
|
* @param toAdd the timer whose elapsed time we want to add to this timer
|
||||||
|
*/
|
||||||
|
public synchronized void addElapsed(final SimpleTimer toAdd) {
|
||||||
|
elapsedTimeNano += toAdd.getElapsedTimeNano();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ class InputProducer<InputType> implements Runnable {
|
||||||
final SimpleTimer inputTimer,
|
final SimpleTimer inputTimer,
|
||||||
final BlockingQueue<InputValue> outputQueue) {
|
final BlockingQueue<InputValue> outputQueue) {
|
||||||
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
||||||
|
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
|
||||||
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
|
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
|
||||||
|
|
||||||
this.inputReader = inputReader;
|
this.inputReader = inputReader;
|
||||||
|
|
@ -38,11 +39,16 @@ class InputProducer<InputType> implements Runnable {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while ( inputReader.hasNext() ) {
|
while ( true ) {
|
||||||
if ( inputTimer != null ) inputTimer.restart();
|
inputTimer.restart();
|
||||||
final InputType input = inputReader.next();
|
if ( ! inputReader.hasNext() ) {
|
||||||
if ( inputTimer != null ) inputTimer.stop();
|
inputTimer.stop();
|
||||||
outputQueue.put(new InputValue(input));
|
break;
|
||||||
|
} else {
|
||||||
|
final InputType input = inputReader.next();
|
||||||
|
inputTimer.stop();
|
||||||
|
outputQueue.put(new InputValue(input));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add the EOF object so our consumer knows we are done in all inputs
|
// add the EOF object so our consumer knows we are done in all inputs
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,69 @@
|
||||||
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
|
import com.google.java.contract.Requires;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds runtime profile (input, read, map) times as tracked by NanoScheduler
|
||||||
|
*
|
||||||
|
* User: depristo
|
||||||
|
* Date: 9/10/12
|
||||||
|
* Time: 8:31 PM
|
||||||
|
*/
|
||||||
|
public class NSRuntimeProfile {
|
||||||
|
final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside");
|
||||||
|
final SimpleTimer inputTimer = new SimpleTimer("input");
|
||||||
|
final SimpleTimer mapTimer = new SimpleTimer("map");
|
||||||
|
final SimpleTimer reduceTimer = new SimpleTimer("reduce");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combine the elapsed time information from other with this profile
|
||||||
|
*
|
||||||
|
* @param other a non-null profile
|
||||||
|
*/
|
||||||
|
public void combine(final NSRuntimeProfile other) {
|
||||||
|
outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer);
|
||||||
|
inputTimer.addElapsed(other.inputTimer);
|
||||||
|
mapTimer.addElapsed(other.mapTimer);
|
||||||
|
reduceTimer.addElapsed(other.reduceTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print the runtime profiling to logger
|
||||||
|
*
|
||||||
|
* @param logger
|
||||||
|
*/
|
||||||
|
public void log(final Logger logger) {
|
||||||
|
log1(logger, "Input time", inputTimer);
|
||||||
|
log1(logger, "Map time", mapTimer);
|
||||||
|
log1(logger, "Reduce time", reduceTimer);
|
||||||
|
log1(logger, "Outside time", outsideSchedulerTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the total runtime for all functions of this nano scheduler
|
||||||
|
*/
|
||||||
|
@Ensures("result >= 0.0")
|
||||||
|
public double totalRuntimeInSeconds() {
|
||||||
|
return inputTimer.getElapsedTime()
|
||||||
|
+ mapTimer.getElapsedTime()
|
||||||
|
+ reduceTimer.getElapsedTime()
|
||||||
|
+ outsideSchedulerTimer.getElapsedTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print to logger.info timing information from timer, with name label
|
||||||
|
*
|
||||||
|
* @param label the name of the timer to display. Should be human readable
|
||||||
|
* @param timer the timer whose elapsed time we will display
|
||||||
|
*/
|
||||||
|
@Requires({"label != null", "timer != null"})
|
||||||
|
private void log1(final Logger logger, final String label, final SimpleTimer timer) {
|
||||||
|
final double myTimeInSec = timer.getElapsedTime();
|
||||||
|
final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100;
|
||||||
|
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,8 +3,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
import com.google.java.contract.Ensures;
|
import com.google.java.contract.Ensures;
|
||||||
import com.google.java.contract.Requires;
|
import com.google.java.contract.Requires;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
|
||||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
|
import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
|
||||||
|
|
||||||
|
|
@ -46,7 +44,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
|
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
|
||||||
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
|
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
|
||||||
private final static boolean LOG_MAP_TIMES = false;
|
private final static boolean LOG_MAP_TIMES = false;
|
||||||
private final static boolean TIME_CALLS = true;
|
|
||||||
|
|
||||||
private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100;
|
private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100;
|
||||||
|
|
||||||
|
|
@ -61,10 +58,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
boolean debug = false;
|
boolean debug = false;
|
||||||
private NSProgressFunction<InputType> progressFunction = null;
|
private NSProgressFunction<InputType> progressFunction = null;
|
||||||
|
|
||||||
final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null;
|
/**
|
||||||
final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null;
|
* Tracks the combined runtime profiles across all created nano schedulers
|
||||||
final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null;
|
*/
|
||||||
final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null;
|
final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The profile specific to this nano scheduler
|
||||||
|
*/
|
||||||
|
final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new nanoscheduler with the desire characteristics requested by the argument
|
* Create a new nanoscheduler with the desire characteristics requested by the argument
|
||||||
|
|
@ -92,7 +94,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start timing the time spent outside of the nanoScheduler
|
// start timing the time spent outside of the nanoScheduler
|
||||||
outsideSchedulerTimer.start();
|
myNSRuntimeProfile.outsideSchedulerTimer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -119,21 +121,31 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
* After this call, execute cannot be invoked without throwing an error
|
* After this call, execute cannot be invoked without throwing an error
|
||||||
*/
|
*/
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
outsideSchedulerTimer.stop();
|
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||||
|
|
||||||
|
// add my timing information to the combined NS runtime profile
|
||||||
|
combinedNSRuntimeProfiler.combine(myNSRuntimeProfile);
|
||||||
|
|
||||||
if ( nThreads > 1 ) {
|
if ( nThreads > 1 ) {
|
||||||
shutdownExecutor("inputExecutor", inputExecutor);
|
shutdownExecutor("inputExecutor", inputExecutor);
|
||||||
shutdownExecutor("mapExecutor", mapExecutor);
|
shutdownExecutor("mapExecutor", mapExecutor);
|
||||||
shutdownExecutor("reduceExecutor", reduceExecutor);
|
shutdownExecutor("reduceExecutor", reduceExecutor);
|
||||||
}
|
}
|
||||||
shutdown = true;
|
|
||||||
|
|
||||||
if (TIME_CALLS) {
|
shutdown = true;
|
||||||
printTimerInfo("Input time", inputTimer);
|
}
|
||||||
printTimerInfo("Map time", mapTimer);
|
|
||||||
printTimerInfo("Reduce time", reduceTimer);
|
public void printRuntimeProfile() {
|
||||||
printTimerInfo("Outside time", outsideSchedulerTimer);
|
myNSRuntimeProfile.log(logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void printCombinedRuntimeProfile() {
|
||||||
|
if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 )
|
||||||
|
combinedNSRuntimeProfiler.log(logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected double getTotalRuntime() {
|
||||||
|
return myNSRuntimeProfile.totalRuntimeInSeconds();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -154,21 +166,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
|
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Print to logger.info timing information from timer, with name label
|
|
||||||
*
|
|
||||||
* @param label the name of the timer to display. Should be human readable
|
|
||||||
* @param timer the timer whose elapsed time we will display
|
|
||||||
*/
|
|
||||||
@Requires({"label != null", "timer != null"})
|
|
||||||
private void printTimerInfo(final String label, final SimpleTimer timer) {
|
|
||||||
final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime()
|
|
||||||
+ reduceTimer.getElapsedTime() + outsideSchedulerTimer.getElapsedTime();
|
|
||||||
final double myTimeInSec = timer.getElapsedTime();
|
|
||||||
final double myTimePercent = myTimeInSec / total * 100;
|
|
||||||
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if this nanoScheduler is shutdown, or false if its still open for business
|
* @return true if this nanoScheduler is shutdown, or false if its still open for business
|
||||||
*/
|
*/
|
||||||
|
|
@ -246,7 +243,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
||||||
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
||||||
|
|
||||||
outsideSchedulerTimer.stop();
|
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||||
|
|
||||||
ReduceType result;
|
ReduceType result;
|
||||||
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
|
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
|
||||||
|
|
@ -255,7 +252,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
|
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
|
||||||
}
|
}
|
||||||
|
|
||||||
outsideSchedulerTimer.restart();
|
myNSRuntimeProfile.outsideSchedulerTimer.restart();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,28 +269,31 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
ReduceType sum = initialValue;
|
ReduceType sum = initialValue;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
// start timer to ensure that both hasNext and next are caught by the timer
|
while ( true ) {
|
||||||
if ( TIME_CALLS ) inputTimer.restart();
|
// start timer to ensure that both hasNext and next are caught by the timer
|
||||||
while ( inputReader.hasNext() ) {
|
myNSRuntimeProfile.inputTimer.restart();
|
||||||
final InputType input = inputReader.next();
|
if ( ! inputReader.hasNext() ) {
|
||||||
if ( TIME_CALLS ) inputTimer.stop();
|
myNSRuntimeProfile.inputTimer.stop();
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
final InputType input = inputReader.next();
|
||||||
|
myNSRuntimeProfile.inputTimer.stop();
|
||||||
|
|
||||||
// map
|
// map
|
||||||
if ( TIME_CALLS ) mapTimer.restart();
|
myNSRuntimeProfile.mapTimer.restart();
|
||||||
final long preMapTime = LOG_MAP_TIMES ? 0 : mapTimer.currentTimeNano();
|
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
|
||||||
final MapType mapValue = map.apply(input);
|
final MapType mapValue = map.apply(input);
|
||||||
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime));
|
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
|
||||||
if ( TIME_CALLS ) mapTimer.stop();
|
myNSRuntimeProfile.mapTimer.stop();
|
||||||
|
|
||||||
if ( i++ % inputBufferSize == 0 && progressFunction != null )
|
if ( i++ % inputBufferSize == 0 && progressFunction != null )
|
||||||
progressFunction.progress(input);
|
progressFunction.progress(input);
|
||||||
|
|
||||||
// reduce
|
// reduce
|
||||||
if ( TIME_CALLS ) reduceTimer.restart();
|
myNSRuntimeProfile.reduceTimer.restart();
|
||||||
sum = reduce.apply(mapValue, sum);
|
sum = reduce.apply(mapValue, sum);
|
||||||
if ( TIME_CALLS ) reduceTimer.stop();
|
myNSRuntimeProfile.reduceTimer.stop();
|
||||||
|
}
|
||||||
if ( TIME_CALLS ) inputTimer.restart();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return sum;
|
return sum;
|
||||||
|
|
@ -321,11 +321,11 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
new LinkedBlockingDeque<Future<MapResult<MapType>>>(mapBufferSize);
|
new LinkedBlockingDeque<Future<MapResult<MapType>>>(mapBufferSize);
|
||||||
|
|
||||||
// Start running the input reader thread
|
// Start running the input reader thread
|
||||||
inputExecutor.submit(new InputProducer<InputType>(inputReader, inputTimer, inputQueue));
|
inputExecutor.submit(new InputProducer<InputType>(inputReader, myNSRuntimeProfile.inputTimer, inputQueue));
|
||||||
|
|
||||||
// Start running the reducer thread
|
// Start running the reducer thread
|
||||||
final ReducerThread<MapType, ReduceType> reducer
|
final ReducerThread<MapType, ReduceType> reducer
|
||||||
= new ReducerThread<MapType, ReduceType>(reduce, reduceTimer, initialValue, mapResultQueue);
|
= new ReducerThread<MapType, ReduceType>(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue);
|
||||||
final Future<ReduceType> reduceResult = reduceExecutor.submit(reducer);
|
final Future<ReduceType> reduceResult = reduceExecutor.submit(reducer);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -382,10 +382,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MapResult<MapType> call() {
|
public MapResult<MapType> call() {
|
||||||
if ( TIME_CALLS ) mapTimer.restart();
|
|
||||||
if ( debug ) debugPrint("\t\tmap " + input);
|
if ( debug ) debugPrint("\t\tmap " + input);
|
||||||
|
myNSRuntimeProfile.mapTimer.restart();
|
||||||
final MapType result = map.apply(input);
|
final MapType result = map.apply(input);
|
||||||
if ( TIME_CALLS ) mapTimer.stop();
|
myNSRuntimeProfile.mapTimer.stop();
|
||||||
return new MapResult<MapType>(result, id);
|
return new MapResult<MapType>(result, id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ class ReducerThread<MapType, ReduceType> implements Callable<ReduceType> {
|
||||||
final ReduceType sum,
|
final ReduceType sum,
|
||||||
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue) {
|
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue) {
|
||||||
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
|
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
|
||||||
|
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
|
||||||
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
|
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
|
||||||
|
|
||||||
this.reduce = reduce;
|
this.reduce = reduce;
|
||||||
|
|
@ -51,9 +52,9 @@ class ReducerThread<MapType, ReduceType> implements Callable<ReduceType> {
|
||||||
} else {
|
} else {
|
||||||
lastJobID = result.getJobID();
|
lastJobID = result.getJobID();
|
||||||
// apply reduce, keeping track of sum
|
// apply reduce, keeping track of sum
|
||||||
if ( reduceTimer != null ) reduceTimer.restart();
|
reduceTimer.restart();
|
||||||
sum = reduce.apply(result.getValue(), sum);
|
sum = reduce.apply(result.getValue(), sum);
|
||||||
if ( reduceTimer != null ) reduceTimer.stop();
|
reduceTimer.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ExecutionException ex) {
|
} catch (ExecutionException ex) {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package org.broadinstitute.sting.utils.nanoScheduler;
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -42,7 +43,7 @@ public class InputProducerUnitTest extends BaseTest {
|
||||||
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
||||||
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
|
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
|
||||||
|
|
||||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), null, readQueue);
|
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue);
|
||||||
|
|
||||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||||
es.submit(ip);
|
es.submit(ip);
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.apache.log4j.BasicConfigurator;
|
import org.apache.log4j.BasicConfigurator;
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -86,7 +87,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
static NanoSchedulerBasicTest exampleTest = null;
|
static NanoSchedulerBasicTest exampleTest = null;
|
||||||
@DataProvider(name = "NanoSchedulerBasicTest")
|
@DataProvider(name = "NanoSchedulerBasicTest")
|
||||||
public Object[][] createNanoSchedulerBasicTest() {
|
public Object[][] createNanoSchedulerBasicTest() {
|
||||||
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) {
|
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000, 10000000) ) {
|
||||||
for ( final int nt : Arrays.asList(1, 2, 4) ) {
|
for ( final int nt : Arrays.asList(1, 2, 4) ) {
|
||||||
for ( final int start : Arrays.asList(0) ) {
|
for ( final int start : Arrays.asList(0) ) {
|
||||||
for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) {
|
for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) {
|
||||||
|
|
@ -114,6 +115,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
|
private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
|
||||||
|
final SimpleTimer timer = new SimpleTimer().start();
|
||||||
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
|
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
|
||||||
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
|
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
|
||||||
|
|
||||||
|
|
@ -129,6 +131,17 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
|
|
||||||
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
|
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
|
||||||
nanoScheduler.shutdown();
|
nanoScheduler.shutdown();
|
||||||
|
|
||||||
|
// TODO -- need to enable only in the case where there's serious time spend in
|
||||||
|
// TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up
|
||||||
|
final double myTimeEstimate = timer.getElapsedTime();
|
||||||
|
final double tolerance = 0.1;
|
||||||
|
if ( false && myTimeEstimate > 0.1 ) {
|
||||||
|
Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance,
|
||||||
|
"NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime()
|
||||||
|
+ " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of "
|
||||||
|
+ tolerance);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)
|
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package org.broadinstitute.sting.utils.nanoScheduler;
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -61,7 +62,7 @@ public class ReducerThreadUnitTest extends BaseTest {
|
||||||
|
|
||||||
final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue);
|
final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue);
|
||||||
final ReducerThread<Integer, Integer> thread
|
final ReducerThread<Integer, Integer> thread
|
||||||
= new ReducerThread<Integer, Integer>(reduce, null, 0, mapResultsQueue);
|
= new ReducerThread<Integer, Integer>(reduce, new SimpleTimer(), 0, mapResultsQueue);
|
||||||
|
|
||||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||||
final Future<Integer> value = es.submit(thread);
|
final Future<Integer> value = es.submit(thread);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue