diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 9dcba25ff..314de29c7 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -60,6 +60,7 @@ import org.broadinstitute.sting.utils.classloader.PluginManager; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.interval.IntervalUtils; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.recalibration.BQSRArgumentSet; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; @@ -194,6 +195,11 @@ public class GenomeAnalysisEngine { */ private ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + /** + * The global progress meter we are using to track our progress through the genome + */ + private ProgressMeter progressMeter = null; + /** * Set the reference metadata files to use for this traversal. * @param referenceMetaDataFiles Collection of files and descriptors over which to traverse. @@ -202,6 +208,12 @@ public class GenomeAnalysisEngine { this.referenceMetaDataFiles = referenceMetaDataFiles; } + /** + * The maximum runtime of this engine, in nanoseconds, set during engine initialization + * from the GATKArgumentCollection command line value + */ + private long runtimeLimitInNanoseconds = -1; + /** * Static random number generator and seed. */ @@ -252,6 +264,9 @@ public class GenomeAnalysisEngine { if (args.BQSR_RECAL_FILE != null) setBaseRecalibration(args); + // setup the runtime limits + setupRuntimeLimits(args); + // Determine how the threads should be divided between CPU vs. IO. determineThreadAllocation(); @@ -1067,22 +1082,52 @@ public class GenomeAnalysisEngine { return CommandLineUtils.createApproximateCommandLineArgumentString(parsingEngine,argumentProviders); } + // ------------------------------------------------------------------------------------- + // + // code for working with progress meter + // + // ------------------------------------------------------------------------------------- + + /** + * Register the global progress meter with this engine + * + * Calling this function more than once will result in an IllegalStateException + * + * @param meter a non-null progress meter + */ + public void registerProgressMeter(final ProgressMeter meter) { + if ( meter == null ) throw new IllegalArgumentException("Meter cannot be null"); + if ( progressMeter != null ) throw new IllegalStateException("Progress meter already set"); + + progressMeter = meter; + } + + /** + * Get the progress meter being used by this engine. May be null if no meter has been registered yet + * @return a potentially null pointer to the progress meter + */ + public ProgressMeter getProgressMeter() { + return progressMeter; + } + /** * Does the current runtime in unit exceed the runtime limit, if one has been provided? * - * @param runtime the runtime of this GATK instance in minutes - * @param unit the time unit of runtime * @return false if not limit was requested or if runtime <= the limit, true otherwise */ - public boolean exceedsRuntimeLimit(final long runtime, final TimeUnit unit) { + public boolean exceedsRuntimeLimit() { + if ( progressMeter == null ) + // not yet initialized or not set because of testing + return false; + + final long runtime = progressMeter.getRuntimeInNanosecondsUpdatedPeriodically(); if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime); if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT ) return false; else { - final long actualRuntimeNano = TimeUnit.NANOSECONDS.convert(runtime, unit); final long maxRuntimeNano = getRuntimeLimitInNanoseconds(); - return actualRuntimeNano > maxRuntimeNano; + return runtime > maxRuntimeNano; } } @@ -1090,9 +1135,22 @@ public class GenomeAnalysisEngine { * @return the runtime limit in nanoseconds, or -1 if no limit was specified */ public long getRuntimeLimitInNanoseconds() { - if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT ) - return -1; - else - return TimeUnit.NANOSECONDS.convert(getArguments().maxRuntime, getArguments().maxRuntimeUnits); + return runtimeLimitInNanoseconds; + } + + /** + * Setup the runtime limits for this engine, updating the runtimeLimitInNanoseconds + * as appropriate + * + * @param args the GATKArgumentCollection to retrieve our runtime limits from + */ + private void setupRuntimeLimits(final GATKArgumentCollection args) { + if ( args.maxRuntime == NO_RUNTIME_LIMIT ) + runtimeLimitInNanoseconds = -1; + else if (args.maxRuntime < 0 ) + throw new UserException.BadArgumentValue("maxRuntime", "must be >= 0 or == -1 (meaning no limit) but received negative value " + args.maxRuntime); + else { + runtimeLimitInNanoseconds = TimeUnit.NANOSECONDS.convert(args.maxRuntime, args.maxRuntimeUnits); + } } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 23b084d66..4ffdc88d8 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -120,8 +120,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; - final ProgressMeter progressMeter; - /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -146,8 +144,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { logger.warn(String.format("Number of requested GATK threads %d is more than the number of " + "available processors on this machine %d", threadAllocation.getTotalNumThreads(), Runtime.getRuntime().availableProcessors())); -// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) -// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1"); } if ( threadAllocation.getNumDataThreads() > 1 ) { @@ -206,14 +202,14 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { availableTraversalEngines.add(traversalEngine); } - // Create our progress meter - this.progressMeter = new ProgressMeter(progressLogFile, + // Create the progress meter, and register it with the analysis engine + engine.registerProgressMeter(new ProgressMeter(progressLogFile, availableTraversalEngines.peek().getTraversalUnits(), - engine.getRegionsOfGenomeBeingProcessed()); + engine.getRegionsOfGenomeBeingProcessed())); // Now that we have a progress meter, go through and initialize the traversal engines for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines ) - traversalEngine.initialize(engine, walker, progressMeter); + traversalEngine.initialize(engine, walker, engine.getProgressMeter()); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -282,7 +278,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return true if we should abort execution, or false otherwise */ protected boolean abortExecution() { - final boolean abort = engine.exceedsRuntimeLimit(progressMeter.getRuntimeInNanoseconds(), TimeUnit.NANOSECONDS); + final boolean abort = engine.exceedsRuntimeLimit(); if ( abort ) { final AutoFormattingTime aft = new AutoFormattingTime(engine.getRuntimeLimitInNanoseconds(), -1, 4); logger.info("Aborting execution (cleanly) because the runtime has exceeded the requested maximum " + aft); @@ -308,7 +304,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * Currently only starts the progress meter timer running, but other start up activities could be incorporated */ protected void startingExecution() { - progressMeter.start(); + engine.getProgressMeter().start(); } /** @@ -330,7 +326,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * Must be called by subclasses when execute is done */ protected void executionIsDone() { - progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); + engine.getProgressMeter().notifyDone(engine.getCumulativeMetrics().getNumIterations()); printReadFilteringStats(); shutdownTraversalEngines(); @@ -347,12 +343,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * pointers to the traversal engines */ public synchronized void shutdownTraversalEngines() { - // no longer applicable because engines are allocated to keys now -// if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) -// throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + -// "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() -// + " have been returned"); - for ( final TraversalEngine te : allCreatedTraversalEngines) te.shutdown(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index b47a355be..b1e5b907f 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -291,6 +291,8 @@ public final class TraverseActiveRegions extends TraversalEngine extends TraversalEngine, @Override public boolean hasNext() { - return locusView.hasNext(); + return locusView.hasNext() && ! engine.exceedsRuntimeLimit(); } @Override diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index 40b3a1812..09c79a168 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -133,7 +133,7 @@ public class TraverseReadsNano extends TraversalEngine, final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); final Iterator readIterator = reads.iterator(); - @Override public boolean hasNext() { return readIterator.hasNext(); } + @Override public boolean hasNext() { return ! engine.exceedsRuntimeLimit() && readIterator.hasNext(); } @Override public MapData next() { diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java index f76490552..9d1011c8f 100644 --- a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java @@ -149,6 +149,12 @@ public class ProgressMeter { private Position position = new Position(PositionStatus.STARTING); private long nTotalRecordsProcessed = 0; + /** + * The elapsed time in nanosecond, updated by the daemon thread, so that + * we don't pay any system call overhead to determine the the elapsed time. + */ + private long elapsedTimeInNanosecondUpdatedByDaemon = 0; + final ProgressMeterDaemon progressMeterDaemon; /** @@ -225,6 +231,36 @@ public class ProgressMeter { return timer.getElapsedTimeNano(); } + /** + * This function is just like getRuntimeInNanoseconds but it doesn't actually query the + * system timer to determine the value, but rather uses a local variable in this meter + * that is updated by the daemon thread. This means that the result is ridiculously imprecise + * for a nanosecond value (as it's only updated each pollingFrequency of the daemon) but + * it is free for clients to access, which can be critical when one wants to do tests like: + * + * for some work unit: + * do unit if getRuntimeInNanosecondsUpdatedPeriodically < X + * + * and have this operation eventually timeout but don't want to pay the system call time to + * ensure that the loop exits as soon as the elapsed time exceeds X + * + * @return the current runtime in nanoseconds + */ + @Ensures("result >= 0") + public long getRuntimeInNanosecondsUpdatedPeriodically() { + return elapsedTimeInNanosecondUpdatedByDaemon; + } + + /** + * Update the period runtime variable to the current runtime in nanoseconds. Should only + * be called by the daemon thread + */ + protected void updateElapsedTimeInNanoseconds() { + elapsedTimeInNanosecondUpdatedByDaemon = getRuntimeInNanoseconds(); + } + + + /** * Utility routine that prints out process information (including timing) every N records or * every M seconds, for N and M set in global variables. diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java index 30abef8b8..38316e537 100644 --- a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java @@ -100,6 +100,7 @@ public final class ProgressMeterDaemon extends Thread { public void run() { while (! done) { meter.printProgress(false); + meter.updateElapsedTimeInNanoseconds(); try { Thread.sleep(getPollFrequencyMilliseconds()); } catch (InterruptedException e) { diff --git a/public/java/test/org/broadinstitute/sting/gatk/MaxRuntimeIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/MaxRuntimeIntegrationTest.java index e6176dbe8..5b3f1e790 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/MaxRuntimeIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/MaxRuntimeIntegrationTest.java @@ -26,19 +26,52 @@ package org.broadinstitute.sting.gatk; import org.broadinstitute.sting.WalkerTest; +import org.broadinstitute.sting.commandline.Argument; +import org.broadinstitute.sting.commandline.Output; +import org.broadinstitute.sting.gatk.contexts.AlignmentContext; +import org.broadinstitute.sting.gatk.contexts.ReferenceContext; +import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.PrintStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; /** * */ public class MaxRuntimeIntegrationTest extends WalkerTest { + public static class SleepingWalker extends LocusWalker { + @Output PrintStream out; + + @Argument(fullName="sleepTime",shortName="sleepTime",doc="x", required=false) + public int sleepTime = 100; + + @Override + public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) { + try {Thread.sleep(sleepTime);} catch (InterruptedException e) {}; + return 1; + } + + @Override public Integer reduceInit() { return 0; } + @Override public Integer reduce(Integer value, Integer sum) { return sum + value; } + + @Override + public void onTraversalDone(Integer result) { + out.println(result); + } + } + private static final long STARTUP_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); private class MaxRuntimeTestProvider extends TestDataProvider { @@ -84,4 +117,35 @@ public class MaxRuntimeIntegrationTest extends WalkerTest { + " exceeded max. tolerated runtime " + TimeUnit.SECONDS.convert(cfg.expectedMaxRuntimeNano(), TimeUnit.NANOSECONDS) + " given requested runtime " + cfg.maxRuntime + " " + cfg.unit); } + + @DataProvider(name = "SubshardProvider") + public Object[][] makeSubshardProvider() { + List tests = new ArrayList(); + + // this functionality can be adapted to provide input data for whatever you might want in your data + tests.add(new Object[]{10}); + tests.add(new Object[]{100}); + tests.add(new Object[]{500}); + tests.add(new Object[]{1000}); + tests.add(new Object[]{2000}); + + return tests.toArray(new Object[][]{}); + } + + @Test(enabled = true, dataProvider = "SubshardProvider", timeOut = 120 * 1000) + public void testSubshardTimeout(final int sleepTime) throws Exception { + final int maxRuntime = 5000; + + WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec( + "-T SleepingWalker -R " + b37KGReference + + " -I " + privateTestDir + "NA12878.100kb.BQSRv2.example.bam -o %s" + + " -maxRuntime " + maxRuntime + " -maxRuntimeUnits MILLISECONDS -sleepTime " + sleepTime, 1, + Collections.singletonList("")); + final File result = executeTest("Subshard max runtime ", spec).getFirst().get(0); + final int cycle = Integer.valueOf(new BufferedReader(new FileReader(result)).readLine()); + + final int maxCycles = (int)Math.ceil((maxRuntime * 5) / sleepTime); + logger.warn(String.format("Max cycles %d saw %d in file %s with sleepTime %d and maxRuntime %d", maxCycles, cycle, result, sleepTime, maxRuntime)); + Assert.assertTrue(cycle < maxCycles, "Too many cycles seen -- saw " + cycle + " in file " + result + " but max should have been " + maxCycles); + } } \ No newline at end of file diff --git a/public/java/test/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemonUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemonUnitTest.java index d127a2937..767646963 100644 --- a/public/java/test/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemonUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemonUnitTest.java @@ -84,10 +84,19 @@ public class ProgressMeterDaemonUnitTest extends BaseTest { return tests.toArray(new Object[][]{}); } + @Test + public void testPeriodUpdateNano() { + final ProgressMeter meter = new TestingProgressMeter(10); + final long currentTime = meter.getRuntimeInNanoseconds(); + meter.updateElapsedTimeInNanoseconds(); + Assert.assertTrue( meter.getRuntimeInNanosecondsUpdatedPeriodically() > currentTime, "Updating the periodic runtime failed" ); + } + @Test(dataProvider = "PollingData", invocationCount = 10, successPercentage = 90) public void testProgressMeterDaemon(final long poll, final int ticks) throws InterruptedException { final TestingProgressMeter meter = new TestingProgressMeter(poll); final ProgressMeterDaemon daemon = meter.getProgressMeterDaemon(); + Assert.assertTrue(daemon.isDaemon()); Assert.assertFalse(daemon.isDone()); @@ -106,5 +115,7 @@ public class ProgressMeterDaemonUnitTest extends BaseTest { final int tolerance = (int)Math.ceil(0.8 * meter.progressCalls.size()); Assert.assertTrue(Math.abs(meter.progressCalls.size() - ticks) <= tolerance, "Expected " + ticks + " progress calls from daemon thread, but got " + meter.progressCalls.size() + " and a tolerance of only " + tolerance); + + Assert.assertTrue(meter.getRuntimeInNanosecondsUpdatedPeriodically() > 0, "Daemon should have updated our periodic runtime"); } }