Subshard timeouts in the GATK

-- The previous implementation of the maxRuntime would require us to wait until all of the work was completed within a shard, which can be a substantial amount of work in the case of a locus walker with 16kb shards.
-- This implementation ensures that we exit from the traversal very soon after the max runtime is exceeded, without completely all of our work within the shard.  This is done by updating all of the traversal engines to return false for hasNext() in the nano scheduled input provider.  So as soon as the timeout is exceeeded, we stop generating additional data to process, and we only have to wait until the currently executing data processing unit (locus, read, active region) completes.
-- In order to implement this timeout efficiently at this fine scale, the progress meter now lives in the genome analysis engine, and the exceedsTimeout() call in the engine looks at a periodically updated runtime variable in the meter.  This variable contains the elapsed runtime of the engine, but is updated by the progress meter daemon thread so that the engine doesn't call System.nanotime() in each cycle of the engine, which would be very expense.  Instead we basically wait for the daemon to update this variable, and so our precision of timing out is limited by the update frequency of the daemon, which is on the order of every few hundred milliseconds, totally fine for a timeout.
-- Added integration tests to ensure that subshard timeouts are working properly
This commit is contained in:
Mark DePristo 2013-05-14 17:03:36 -04:00
parent 6137e5f89b
commit 371f3752c1
9 changed files with 190 additions and 28 deletions

View File

@ -60,6 +60,7 @@ import org.broadinstitute.sting.utils.classloader.PluginManager;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.interval.IntervalUtils;
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
import org.broadinstitute.sting.utils.recalibration.BQSRArgumentSet;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
@ -194,6 +195,11 @@ public class GenomeAnalysisEngine {
*/
private ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
/**
* The global progress meter we are using to track our progress through the genome
*/
private ProgressMeter progressMeter = null;
/**
* Set the reference metadata files to use for this traversal.
* @param referenceMetaDataFiles Collection of files and descriptors over which to traverse.
@ -202,6 +208,12 @@ public class GenomeAnalysisEngine {
this.referenceMetaDataFiles = referenceMetaDataFiles;
}
/**
* The maximum runtime of this engine, in nanoseconds, set during engine initialization
* from the GATKArgumentCollection command line value
*/
private long runtimeLimitInNanoseconds = -1;
/**
* Static random number generator and seed.
*/
@ -252,6 +264,9 @@ public class GenomeAnalysisEngine {
if (args.BQSR_RECAL_FILE != null)
setBaseRecalibration(args);
// setup the runtime limits
setupRuntimeLimits(args);
// Determine how the threads should be divided between CPU vs. IO.
determineThreadAllocation();
@ -1067,22 +1082,52 @@ public class GenomeAnalysisEngine {
return CommandLineUtils.createApproximateCommandLineArgumentString(parsingEngine,argumentProviders);
}
// -------------------------------------------------------------------------------------
//
// code for working with progress meter
//
// -------------------------------------------------------------------------------------
/**
* Register the global progress meter with this engine
*
* Calling this function more than once will result in an IllegalStateException
*
* @param meter a non-null progress meter
*/
public void registerProgressMeter(final ProgressMeter meter) {
if ( meter == null ) throw new IllegalArgumentException("Meter cannot be null");
if ( progressMeter != null ) throw new IllegalStateException("Progress meter already set");
progressMeter = meter;
}
/**
* Get the progress meter being used by this engine. May be null if no meter has been registered yet
* @return a potentially null pointer to the progress meter
*/
public ProgressMeter getProgressMeter() {
return progressMeter;
}
/**
* Does the current runtime in unit exceed the runtime limit, if one has been provided?
*
* @param runtime the runtime of this GATK instance in minutes
* @param unit the time unit of runtime
* @return false if not limit was requested or if runtime <= the limit, true otherwise
*/
public boolean exceedsRuntimeLimit(final long runtime, final TimeUnit unit) {
public boolean exceedsRuntimeLimit() {
if ( progressMeter == null )
// not yet initialized or not set because of testing
return false;
final long runtime = progressMeter.getRuntimeInNanosecondsUpdatedPeriodically();
if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime);
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return false;
else {
final long actualRuntimeNano = TimeUnit.NANOSECONDS.convert(runtime, unit);
final long maxRuntimeNano = getRuntimeLimitInNanoseconds();
return actualRuntimeNano > maxRuntimeNano;
return runtime > maxRuntimeNano;
}
}
@ -1090,9 +1135,22 @@ public class GenomeAnalysisEngine {
* @return the runtime limit in nanoseconds, or -1 if no limit was specified
*/
public long getRuntimeLimitInNanoseconds() {
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return -1;
else
return TimeUnit.NANOSECONDS.convert(getArguments().maxRuntime, getArguments().maxRuntimeUnits);
return runtimeLimitInNanoseconds;
}
/**
* Setup the runtime limits for this engine, updating the runtimeLimitInNanoseconds
* as appropriate
*
* @param args the GATKArgumentCollection to retrieve our runtime limits from
*/
private void setupRuntimeLimits(final GATKArgumentCollection args) {
if ( args.maxRuntime == NO_RUNTIME_LIMIT )
runtimeLimitInNanoseconds = -1;
else if (args.maxRuntime < 0 )
throw new UserException.BadArgumentValue("maxRuntime", "must be >= 0 or == -1 (meaning no limit) but received negative value " + args.maxRuntime);
else {
runtimeLimitInNanoseconds = TimeUnit.NANOSECONDS.convert(args.maxRuntime, args.maxRuntimeUnits);
}
}
}

View File

@ -120,8 +120,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
final ProgressMeter progressMeter;
/**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
* selected walker.
@ -146,8 +144,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
logger.warn(String.format("Number of requested GATK threads %d is more than the number of " +
"available processors on this machine %d", threadAllocation.getTotalNumThreads(),
Runtime.getRuntime().availableProcessors()));
// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
}
if ( threadAllocation.getNumDataThreads() > 1 ) {
@ -206,14 +202,14 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
availableTraversalEngines.add(traversalEngine);
}
// Create our progress meter
this.progressMeter = new ProgressMeter(progressLogFile,
// Create the progress meter, and register it with the analysis engine
engine.registerProgressMeter(new ProgressMeter(progressLogFile,
availableTraversalEngines.peek().getTraversalUnits(),
engine.getRegionsOfGenomeBeingProcessed());
engine.getRegionsOfGenomeBeingProcessed()));
// Now that we have a progress meter, go through and initialize the traversal engines
for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines )
traversalEngine.initialize(engine, walker, progressMeter);
traversalEngine.initialize(engine, walker, engine.getProgressMeter());
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
@ -282,7 +278,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return true if we should abort execution, or false otherwise
*/
protected boolean abortExecution() {
final boolean abort = engine.exceedsRuntimeLimit(progressMeter.getRuntimeInNanoseconds(), TimeUnit.NANOSECONDS);
final boolean abort = engine.exceedsRuntimeLimit();
if ( abort ) {
final AutoFormattingTime aft = new AutoFormattingTime(engine.getRuntimeLimitInNanoseconds(), -1, 4);
logger.info("Aborting execution (cleanly) because the runtime has exceeded the requested maximum " + aft);
@ -308,7 +304,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* Currently only starts the progress meter timer running, but other start up activities could be incorporated
*/
protected void startingExecution() {
progressMeter.start();
engine.getProgressMeter().start();
}
/**
@ -330,7 +326,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* Must be called by subclasses when execute is done
*/
protected void executionIsDone() {
progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations());
engine.getProgressMeter().notifyDone(engine.getCumulativeMetrics().getNumIterations());
printReadFilteringStats();
shutdownTraversalEngines();
@ -347,12 +343,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* pointers to the traversal engines
*/
public synchronized void shutdownTraversalEngines() {
// no longer applicable because engines are allocated to keys now
// if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
// throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
// "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
// + " have been returned");
for ( final TraversalEngine te : allCreatedTraversalEngines)
te.shutdown();

View File

@ -291,6 +291,8 @@ public final class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,Activ
}
@Override
public boolean hasNext() {
if ( engine.exceedsRuntimeLimit() ) // too much time has been dedicated to doing work, just stop
return false;
if ( ! readyActiveRegions.isEmpty() )
return true;
if ( done )

View File

@ -165,7 +165,7 @@ public class TraverseLociNano<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,
@Override
public boolean hasNext() {
return locusView.hasNext();
return locusView.hasNext() && ! engine.exceedsRuntimeLimit();
}
@Override

View File

@ -133,7 +133,7 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
final Iterator<SAMRecord> readIterator = reads.iterator();
@Override public boolean hasNext() { return readIterator.hasNext(); }
@Override public boolean hasNext() { return ! engine.exceedsRuntimeLimit() && readIterator.hasNext(); }
@Override
public MapData next() {

View File

@ -149,6 +149,12 @@ public class ProgressMeter {
private Position position = new Position(PositionStatus.STARTING);
private long nTotalRecordsProcessed = 0;
/**
* The elapsed time in nanosecond, updated by the daemon thread, so that
* we don't pay any system call overhead to determine the the elapsed time.
*/
private long elapsedTimeInNanosecondUpdatedByDaemon = 0;
final ProgressMeterDaemon progressMeterDaemon;
/**
@ -225,6 +231,36 @@ public class ProgressMeter {
return timer.getElapsedTimeNano();
}
/**
* This function is just like getRuntimeInNanoseconds but it doesn't actually query the
* system timer to determine the value, but rather uses a local variable in this meter
* that is updated by the daemon thread. This means that the result is ridiculously imprecise
* for a nanosecond value (as it's only updated each pollingFrequency of the daemon) but
* it is free for clients to access, which can be critical when one wants to do tests like:
*
* for some work unit:
* do unit if getRuntimeInNanosecondsUpdatedPeriodically < X
*
* and have this operation eventually timeout but don't want to pay the system call time to
* ensure that the loop exits as soon as the elapsed time exceeds X
*
* @return the current runtime in nanoseconds
*/
@Ensures("result >= 0")
public long getRuntimeInNanosecondsUpdatedPeriodically() {
return elapsedTimeInNanosecondUpdatedByDaemon;
}
/**
* Update the period runtime variable to the current runtime in nanoseconds. Should only
* be called by the daemon thread
*/
protected void updateElapsedTimeInNanoseconds() {
elapsedTimeInNanosecondUpdatedByDaemon = getRuntimeInNanoseconds();
}
/**
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.

View File

@ -100,6 +100,7 @@ public final class ProgressMeterDaemon extends Thread {
public void run() {
while (! done) {
meter.printProgress(false);
meter.updateElapsedTimeInNanoseconds();
try {
Thread.sleep(getPollFrequencyMilliseconds());
} catch (InterruptedException e) {

View File

@ -26,19 +26,52 @@
package org.broadinstitute.sting.gatk;
import org.broadinstitute.sting.WalkerTest;
import org.broadinstitute.sting.commandline.Argument;
import org.broadinstitute.sting.commandline.Output;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class MaxRuntimeIntegrationTest extends WalkerTest {
public static class SleepingWalker extends LocusWalker<Integer, Integer> {
@Output PrintStream out;
@Argument(fullName="sleepTime",shortName="sleepTime",doc="x", required=false)
public int sleepTime = 100;
@Override
public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
try {Thread.sleep(sleepTime);} catch (InterruptedException e) {};
return 1;
}
@Override public Integer reduceInit() { return 0; }
@Override public Integer reduce(Integer value, Integer sum) { return sum + value; }
@Override
public void onTraversalDone(Integer result) {
out.println(result);
}
}
private static final long STARTUP_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
private class MaxRuntimeTestProvider extends TestDataProvider {
@ -84,4 +117,35 @@ public class MaxRuntimeIntegrationTest extends WalkerTest {
+ " exceeded max. tolerated runtime " + TimeUnit.SECONDS.convert(cfg.expectedMaxRuntimeNano(), TimeUnit.NANOSECONDS)
+ " given requested runtime " + cfg.maxRuntime + " " + cfg.unit);
}
@DataProvider(name = "SubshardProvider")
public Object[][] makeSubshardProvider() {
List<Object[]> tests = new ArrayList<Object[]>();
// this functionality can be adapted to provide input data for whatever you might want in your data
tests.add(new Object[]{10});
tests.add(new Object[]{100});
tests.add(new Object[]{500});
tests.add(new Object[]{1000});
tests.add(new Object[]{2000});
return tests.toArray(new Object[][]{});
}
@Test(enabled = true, dataProvider = "SubshardProvider", timeOut = 120 * 1000)
public void testSubshardTimeout(final int sleepTime) throws Exception {
final int maxRuntime = 5000;
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
"-T SleepingWalker -R " + b37KGReference
+ " -I " + privateTestDir + "NA12878.100kb.BQSRv2.example.bam -o %s"
+ " -maxRuntime " + maxRuntime + " -maxRuntimeUnits MILLISECONDS -sleepTime " + sleepTime, 1,
Collections.singletonList(""));
final File result = executeTest("Subshard max runtime ", spec).getFirst().get(0);
final int cycle = Integer.valueOf(new BufferedReader(new FileReader(result)).readLine());
final int maxCycles = (int)Math.ceil((maxRuntime * 5) / sleepTime);
logger.warn(String.format("Max cycles %d saw %d in file %s with sleepTime %d and maxRuntime %d", maxCycles, cycle, result, sleepTime, maxRuntime));
Assert.assertTrue(cycle < maxCycles, "Too many cycles seen -- saw " + cycle + " in file " + result + " but max should have been " + maxCycles);
}
}

View File

@ -84,10 +84,19 @@ public class ProgressMeterDaemonUnitTest extends BaseTest {
return tests.toArray(new Object[][]{});
}
@Test
public void testPeriodUpdateNano() {
final ProgressMeter meter = new TestingProgressMeter(10);
final long currentTime = meter.getRuntimeInNanoseconds();
meter.updateElapsedTimeInNanoseconds();
Assert.assertTrue( meter.getRuntimeInNanosecondsUpdatedPeriodically() > currentTime, "Updating the periodic runtime failed" );
}
@Test(dataProvider = "PollingData", invocationCount = 10, successPercentage = 90)
public void testProgressMeterDaemon(final long poll, final int ticks) throws InterruptedException {
final TestingProgressMeter meter = new TestingProgressMeter(poll);
final ProgressMeterDaemon daemon = meter.getProgressMeterDaemon();
Assert.assertTrue(daemon.isDaemon());
Assert.assertFalse(daemon.isDone());
@ -106,5 +115,7 @@ public class ProgressMeterDaemonUnitTest extends BaseTest {
final int tolerance = (int)Math.ceil(0.8 * meter.progressCalls.size());
Assert.assertTrue(Math.abs(meter.progressCalls.size() - ticks) <= tolerance,
"Expected " + ticks + " progress calls from daemon thread, but got " + meter.progressCalls.size() + " and a tolerance of only " + tolerance);
Assert.assertTrue(meter.getRuntimeInNanosecondsUpdatedPeriodically() > 0, "Daemon should have updated our periodic runtime");
}
}