Merge pull request #228 from broadinstitute/mc_fine_grained_maxruntime

Subshard timeouts in the GATK
This commit is contained in:
droazen 2013-05-15 05:25:43 -07:00
commit a733a5e9b7
9 changed files with 190 additions and 28 deletions

View File

@ -60,6 +60,7 @@ import org.broadinstitute.sting.utils.classloader.PluginManager;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.interval.IntervalUtils;
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
import org.broadinstitute.sting.utils.recalibration.BQSRArgumentSet;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
@ -194,6 +195,11 @@ public class GenomeAnalysisEngine {
*/
private ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
/**
* The global progress meter we are using to track our progress through the genome
*/
private ProgressMeter progressMeter = null;
/**
* Set the reference metadata files to use for this traversal.
* @param referenceMetaDataFiles Collection of files and descriptors over which to traverse.
@ -202,6 +208,12 @@ public class GenomeAnalysisEngine {
this.referenceMetaDataFiles = referenceMetaDataFiles;
}
/**
* The maximum runtime of this engine, in nanoseconds, set during engine initialization
* from the GATKArgumentCollection command line value
*/
private long runtimeLimitInNanoseconds = -1;
/**
* Static random number generator and seed.
*/
@ -252,6 +264,9 @@ public class GenomeAnalysisEngine {
if (args.BQSR_RECAL_FILE != null)
setBaseRecalibration(args);
// setup the runtime limits
setupRuntimeLimits(args);
// Determine how the threads should be divided between CPU vs. IO.
determineThreadAllocation();
@ -1067,22 +1082,52 @@ public class GenomeAnalysisEngine {
return CommandLineUtils.createApproximateCommandLineArgumentString(parsingEngine,argumentProviders);
}
// -------------------------------------------------------------------------------------
//
// code for working with progress meter
//
// -------------------------------------------------------------------------------------
/**
* Register the global progress meter with this engine
*
* Calling this function more than once will result in an IllegalStateException
*
* @param meter a non-null progress meter
*/
public void registerProgressMeter(final ProgressMeter meter) {
if ( meter == null ) throw new IllegalArgumentException("Meter cannot be null");
if ( progressMeter != null ) throw new IllegalStateException("Progress meter already set");
progressMeter = meter;
}
/**
* Get the progress meter being used by this engine. May be null if no meter has been registered yet
* @return a potentially null pointer to the progress meter
*/
public ProgressMeter getProgressMeter() {
return progressMeter;
}
/**
* Does the current runtime in unit exceed the runtime limit, if one has been provided?
*
* @param runtime the runtime of this GATK instance in minutes
* @param unit the time unit of runtime
* @return false if not limit was requested or if runtime <= the limit, true otherwise
*/
public boolean exceedsRuntimeLimit(final long runtime, final TimeUnit unit) {
public boolean exceedsRuntimeLimit() {
if ( progressMeter == null )
// not yet initialized or not set because of testing
return false;
final long runtime = progressMeter.getRuntimeInNanosecondsUpdatedPeriodically();
if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime);
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return false;
else {
final long actualRuntimeNano = TimeUnit.NANOSECONDS.convert(runtime, unit);
final long maxRuntimeNano = getRuntimeLimitInNanoseconds();
return actualRuntimeNano > maxRuntimeNano;
return runtime > maxRuntimeNano;
}
}
@ -1090,9 +1135,22 @@ public class GenomeAnalysisEngine {
* @return the runtime limit in nanoseconds, or -1 if no limit was specified
*/
public long getRuntimeLimitInNanoseconds() {
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return -1;
else
return TimeUnit.NANOSECONDS.convert(getArguments().maxRuntime, getArguments().maxRuntimeUnits);
return runtimeLimitInNanoseconds;
}
/**
* Setup the runtime limits for this engine, updating the runtimeLimitInNanoseconds
* as appropriate
*
* @param args the GATKArgumentCollection to retrieve our runtime limits from
*/
private void setupRuntimeLimits(final GATKArgumentCollection args) {
if ( args.maxRuntime == NO_RUNTIME_LIMIT )
runtimeLimitInNanoseconds = -1;
else if (args.maxRuntime < 0 )
throw new UserException.BadArgumentValue("maxRuntime", "must be >= 0 or == -1 (meaning no limit) but received negative value " + args.maxRuntime);
else {
runtimeLimitInNanoseconds = TimeUnit.NANOSECONDS.convert(args.maxRuntime, args.maxRuntimeUnits);
}
}
}

View File

@ -120,8 +120,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
final ProgressMeter progressMeter;
/**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
* selected walker.
@ -146,8 +144,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
logger.warn(String.format("Number of requested GATK threads %d is more than the number of " +
"available processors on this machine %d", threadAllocation.getTotalNumThreads(),
Runtime.getRuntime().availableProcessors()));
// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
}
if ( threadAllocation.getNumDataThreads() > 1 ) {
@ -206,14 +202,14 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
availableTraversalEngines.add(traversalEngine);
}
// Create our progress meter
this.progressMeter = new ProgressMeter(progressLogFile,
// Create the progress meter, and register it with the analysis engine
engine.registerProgressMeter(new ProgressMeter(progressLogFile,
availableTraversalEngines.peek().getTraversalUnits(),
engine.getRegionsOfGenomeBeingProcessed());
engine.getRegionsOfGenomeBeingProcessed()));
// Now that we have a progress meter, go through and initialize the traversal engines
for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines )
traversalEngine.initialize(engine, walker, progressMeter);
traversalEngine.initialize(engine, walker, engine.getProgressMeter());
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
@ -282,7 +278,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return true if we should abort execution, or false otherwise
*/
protected boolean abortExecution() {
final boolean abort = engine.exceedsRuntimeLimit(progressMeter.getRuntimeInNanoseconds(), TimeUnit.NANOSECONDS);
final boolean abort = engine.exceedsRuntimeLimit();
if ( abort ) {
final AutoFormattingTime aft = new AutoFormattingTime(engine.getRuntimeLimitInNanoseconds(), -1, 4);
logger.info("Aborting execution (cleanly) because the runtime has exceeded the requested maximum " + aft);
@ -308,7 +304,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* Currently only starts the progress meter timer running, but other start up activities could be incorporated
*/
protected void startingExecution() {
progressMeter.start();
engine.getProgressMeter().start();
}
/**
@ -330,7 +326,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* Must be called by subclasses when execute is done
*/
protected void executionIsDone() {
progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations());
engine.getProgressMeter().notifyDone(engine.getCumulativeMetrics().getNumIterations());
printReadFilteringStats();
shutdownTraversalEngines();
@ -347,12 +343,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* pointers to the traversal engines
*/
public synchronized void shutdownTraversalEngines() {
// no longer applicable because engines are allocated to keys now
// if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
// throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
// "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
// + " have been returned");
for ( final TraversalEngine te : allCreatedTraversalEngines)
te.shutdown();

View File

@ -291,6 +291,8 @@ public final class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,Activ
}
@Override
public boolean hasNext() {
if ( engine.exceedsRuntimeLimit() ) // too much time has been dedicated to doing work, just stop
return false;
if ( ! readyActiveRegions.isEmpty() )
return true;
if ( done )

View File

@ -165,7 +165,7 @@ public class TraverseLociNano<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,
@Override
public boolean hasNext() {
return locusView.hasNext();
return locusView.hasNext() && ! engine.exceedsRuntimeLimit();
}
@Override

View File

@ -133,7 +133,7 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
final Iterator<SAMRecord> readIterator = reads.iterator();
@Override public boolean hasNext() { return readIterator.hasNext(); }
@Override public boolean hasNext() { return ! engine.exceedsRuntimeLimit() && readIterator.hasNext(); }
@Override
public MapData next() {

View File

@ -149,6 +149,12 @@ public class ProgressMeter {
private Position position = new Position(PositionStatus.STARTING);
private long nTotalRecordsProcessed = 0;
/**
* The elapsed time in nanosecond, updated by the daemon thread, so that
* we don't pay any system call overhead to determine the the elapsed time.
*/
private long elapsedTimeInNanosecondUpdatedByDaemon = 0;
final ProgressMeterDaemon progressMeterDaemon;
/**
@ -225,6 +231,36 @@ public class ProgressMeter {
return timer.getElapsedTimeNano();
}
/**
* This function is just like getRuntimeInNanoseconds but it doesn't actually query the
* system timer to determine the value, but rather uses a local variable in this meter
* that is updated by the daemon thread. This means that the result is ridiculously imprecise
* for a nanosecond value (as it's only updated each pollingFrequency of the daemon) but
* it is free for clients to access, which can be critical when one wants to do tests like:
*
* for some work unit:
* do unit if getRuntimeInNanosecondsUpdatedPeriodically < X
*
* and have this operation eventually timeout but don't want to pay the system call time to
* ensure that the loop exits as soon as the elapsed time exceeds X
*
* @return the current runtime in nanoseconds
*/
@Ensures("result >= 0")
public long getRuntimeInNanosecondsUpdatedPeriodically() {
return elapsedTimeInNanosecondUpdatedByDaemon;
}
/**
* Update the period runtime variable to the current runtime in nanoseconds. Should only
* be called by the daemon thread
*/
protected void updateElapsedTimeInNanoseconds() {
elapsedTimeInNanosecondUpdatedByDaemon = getRuntimeInNanoseconds();
}
/**
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.

View File

@ -100,6 +100,7 @@ public final class ProgressMeterDaemon extends Thread {
public void run() {
while (! done) {
meter.printProgress(false);
meter.updateElapsedTimeInNanoseconds();
try {
Thread.sleep(getPollFrequencyMilliseconds());
} catch (InterruptedException e) {

View File

@ -26,19 +26,52 @@
package org.broadinstitute.sting.gatk;
import org.broadinstitute.sting.WalkerTest;
import org.broadinstitute.sting.commandline.Argument;
import org.broadinstitute.sting.commandline.Output;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class MaxRuntimeIntegrationTest extends WalkerTest {
public static class SleepingWalker extends LocusWalker<Integer, Integer> {
@Output PrintStream out;
@Argument(fullName="sleepTime",shortName="sleepTime",doc="x", required=false)
public int sleepTime = 100;
@Override
public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
try {Thread.sleep(sleepTime);} catch (InterruptedException e) {};
return 1;
}
@Override public Integer reduceInit() { return 0; }
@Override public Integer reduce(Integer value, Integer sum) { return sum + value; }
@Override
public void onTraversalDone(Integer result) {
out.println(result);
}
}
private static final long STARTUP_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
private class MaxRuntimeTestProvider extends TestDataProvider {
@ -84,4 +117,35 @@ public class MaxRuntimeIntegrationTest extends WalkerTest {
+ " exceeded max. tolerated runtime " + TimeUnit.SECONDS.convert(cfg.expectedMaxRuntimeNano(), TimeUnit.NANOSECONDS)
+ " given requested runtime " + cfg.maxRuntime + " " + cfg.unit);
}
@DataProvider(name = "SubshardProvider")
public Object[][] makeSubshardProvider() {
List<Object[]> tests = new ArrayList<Object[]>();
// this functionality can be adapted to provide input data for whatever you might want in your data
tests.add(new Object[]{10});
tests.add(new Object[]{100});
tests.add(new Object[]{500});
tests.add(new Object[]{1000});
tests.add(new Object[]{2000});
return tests.toArray(new Object[][]{});
}
@Test(enabled = true, dataProvider = "SubshardProvider", timeOut = 120 * 1000)
public void testSubshardTimeout(final int sleepTime) throws Exception {
final int maxRuntime = 5000;
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
"-T SleepingWalker -R " + b37KGReference
+ " -I " + privateTestDir + "NA12878.100kb.BQSRv2.example.bam -o %s"
+ " -maxRuntime " + maxRuntime + " -maxRuntimeUnits MILLISECONDS -sleepTime " + sleepTime, 1,
Collections.singletonList(""));
final File result = executeTest("Subshard max runtime ", spec).getFirst().get(0);
final int cycle = Integer.valueOf(new BufferedReader(new FileReader(result)).readLine());
final int maxCycles = (int)Math.ceil((maxRuntime * 5) / sleepTime);
logger.warn(String.format("Max cycles %d saw %d in file %s with sleepTime %d and maxRuntime %d", maxCycles, cycle, result, sleepTime, maxRuntime));
Assert.assertTrue(cycle < maxCycles, "Too many cycles seen -- saw " + cycle + " in file " + result + " but max should have been " + maxCycles);
}
}

View File

@ -84,10 +84,19 @@ public class ProgressMeterDaemonUnitTest extends BaseTest {
return tests.toArray(new Object[][]{});
}
@Test
public void testPeriodUpdateNano() {
final ProgressMeter meter = new TestingProgressMeter(10);
final long currentTime = meter.getRuntimeInNanoseconds();
meter.updateElapsedTimeInNanoseconds();
Assert.assertTrue( meter.getRuntimeInNanosecondsUpdatedPeriodically() > currentTime, "Updating the periodic runtime failed" );
}
@Test(dataProvider = "PollingData", invocationCount = 10, successPercentage = 90)
public void testProgressMeterDaemon(final long poll, final int ticks) throws InterruptedException {
final TestingProgressMeter meter = new TestingProgressMeter(poll);
final ProgressMeterDaemon daemon = meter.getProgressMeterDaemon();
Assert.assertTrue(daemon.isDaemon());
Assert.assertFalse(daemon.isDone());
@ -106,5 +115,7 @@ public class ProgressMeterDaemonUnitTest extends BaseTest {
final int tolerance = (int)Math.ceil(0.8 * meter.progressCalls.size());
Assert.assertTrue(Math.abs(meter.progressCalls.size() - ticks) <= tolerance,
"Expected " + ticks + " progress calls from daemon thread, but got " + meter.progressCalls.size() + " and a tolerance of only " + tolerance);
Assert.assertTrue(meter.getRuntimeInNanosecondsUpdatedPeriodically() > 0, "Daemon should have updated our periodic runtime");
}
}