Merge pull request #519 from broadinstitute/ks_nc6_checkpoint_patch

Updates to timer, supporting checkpoint mechanisms
This commit is contained in:
kshakir 2014-02-14 21:47:34 +08:00
commit 16e82b8c2c
3 changed files with 134 additions and 6 deletions

View File

@ -1205,12 +1205,11 @@ public class GenomeAnalysisEngine {
// not yet initialized or not set because of testing
return false;
final long runtime = progressMeter.getRuntimeInNanosecondsUpdatedPeriodically();
if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime);
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return false;
else {
else {
final long runtime = progressMeter.getRuntimeInNanosecondsUpdatedPeriodically();
if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime);
final long maxRuntimeNano = getRuntimeLimitInNanoseconds();
return runtime > maxRuntimeNano;
}

View File

@ -29,7 +29,11 @@ package org.broadinstitute.sting.utils;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import java.text.NumberFormat;
import java.util.concurrent.TimeUnit;
import static java.lang.Math.abs;
/**
* A useful simple system for timing code with nano second resolution
@ -39,14 +43,42 @@ import java.util.concurrent.TimeUnit;
* calls to avoid meaningless results of having multiple starts and stops
* called sequentially.
*
* This timer has been modified to provide better semantics for dealing with
* system-level checkpoint and restarting. Such events can cause the internal JVM
* clock to be reset, breaking timings based upon it. Whilst this is difficult to
* counter without getting explicit notice of checkpoint events, we try to moderate
* the symptoms through tracking the offset between the system clock and the JVM clock.
* If this offset grows drastically (greater than CLOCK_DRIFT), we infer a JVM restart
* and reset the timer.
*
* User: depristo
* Date: Dec 10, 2010
* Time: 9:07:44 AM
*/
public class SimpleTimer {
private final static Logger logger = Logger.getLogger(SimpleTimer.class);
protected static final double NANO_TO_SECOND_DOUBLE = 1.0 / TimeUnit.SECONDS.toNanos(1);
private static final long MILLI_TO_NANO= TimeUnit.MILLISECONDS.toNanos(1);
private static final ThreadLocal<NumberFormat> NUMBER_FORMAT = new ThreadLocal<NumberFormat>() {
@Override
protected NumberFormat initialValue() {
return NumberFormat.getIntegerInstance();
}
};
/**
* Allowable clock drift in nanoseconds.
*/
private static final long CLOCK_DRIFT = TimeUnit.SECONDS.toNanos(5);
private final String name;
/**
* The difference between system time and JVM time at last sync.
* This is used to detect JVM checkpoint/restart events, and should be
* reset when a JVM checkpoint/restart is detected.
*/
private long nanoTimeOffset;
/**
* The elapsedTimeNano time in nanoSeconds of this timer. The elapsedTimeNano time is the
* sum of times between starts/restrats and stops.
@ -77,6 +109,8 @@ public class SimpleTimer {
public SimpleTimer(final String name) {
if ( name == null ) throw new IllegalArgumentException("SimpleTimer name cannot be null");
this.name = name;
this.nanoTimeOffset = getNanoOffset();
}
/**
@ -108,6 +142,7 @@ public class SimpleTimer {
public synchronized SimpleTimer restart() {
running = true;
startTimeNano = currentTimeNano();
nanoTimeOffset = getNanoOffset();
return this;
}
@ -134,6 +169,9 @@ public class SimpleTimer {
/**
* Stops the timer. Increases the elapsedTimeNano time by difference between start and now.
* This method calls `ensureClockSync` to make sure that the JVM and system clocks
* are roughly in sync since the start of the timer. If they are not, then the time
* elapsed since the previous 'stop' will not be added to the timer.
*
* It's ok to call stop on a timer that's not running. It has no effect on the timer.
*
@ -143,7 +181,9 @@ public class SimpleTimer {
public synchronized SimpleTimer stop() {
if ( running ) {
running = false;
elapsedTimeNano += currentTimeNano() - startTimeNano;
if (ensureClockSync()) {
elapsedTimeNano += currentTimeNano() - startTimeNano;
}
}
return this;
}
@ -168,7 +208,11 @@ public class SimpleTimer {
* @return the elapsed time in nanoseconds
*/
public synchronized long getElapsedTimeNano() {
return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano;
if (running && ensureClockSync()) {
return currentTimeNano() - startTimeNano + elapsedTimeNano;
} else {
return elapsedTimeNano;
}
}
/**
@ -179,4 +223,39 @@ public class SimpleTimer {
public synchronized void addElapsed(final SimpleTimer toAdd) {
elapsedTimeNano += toAdd.getElapsedTimeNano();
}
/**
* Get the current offset of nano time from system time.
*/
private static long getNanoOffset() {
return System.nanoTime() - (System.currentTimeMillis() * MILLI_TO_NANO);
}
/**
* Ensure that the JVM time has remained in sync with system time.
* This will also reset the clocks to avoid gradual drift.
*
* @return true if the clocks are in sync, false otherwise
*/
private boolean ensureClockSync() {
final long currentOffset = getNanoOffset();
final long diff = abs(currentOffset - nanoTimeOffset);
final boolean ret = (diff <= CLOCK_DRIFT);
if (!ret) {
final NumberFormat numberFormat = NUMBER_FORMAT.get();
final String msg = String.format(
"Clock drift of %s - %s = %s nanoseconds detected, vs. max allowable drift of %s. " +
"Assuming checkpoint/restart event.",
numberFormat.format(currentOffset),
numberFormat.format(nanoTimeOffset),
numberFormat.format(diff),
numberFormat.format(CLOCK_DRIFT));
// Log message
logger.warn(msg);
}
// Reset the drift meter to stay in sync.
this.nanoTimeOffset = currentOffset;
return ret;
}
}

View File

@ -29,6 +29,8 @@ import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -123,6 +125,54 @@ public class SimpleTimerUnitTest extends BaseTest {
Assert.assertTrue(nano < maxTimeInNano, "Fast operation said to take longer than " + maxTimeInMicro + " microseconds: elapsed time in nano " + nano + " micro " + TimeUnit.NANOSECONDS.toMicros(nano));
}
@Test
public void testCheckpointRestart() throws Exception {
SimpleTimer t = new SimpleTimer(NAME);
final Field offsetField = t.getClass().getDeclaredField("nanoTimeOffset");
offsetField.setAccessible(true);
long offset = ((Long) offsetField.get(t)).longValue();
t.start();
idleLoop();
// Make it as if clock has jumped into the past
offsetField.set(t, offset + TimeUnit.SECONDS.toNanos(10));
t.stop();
offset = ((Long) offsetField.get(t)).longValue();
Assert.assertEquals(t.getElapsedTime(), 0.0, "Time over restart is not zero.");
t.start();
idleLoop();
t.stop();
offset = ((Long) offsetField.get(t)).longValue();
double elapsed = t.getElapsedTime();
Assert.assertTrue(elapsed >= 0.0, "Elapsed time is zero.");
t.restart();
// Make the clock jump again by just a little
offsetField.set(t, offset + TimeUnit.SECONDS.toNanos(1));
idleLoop();
t.stop();
offset = ((Long) offsetField.get(t)).longValue();
Assert.assertTrue(t.getElapsedTime() > elapsed, "Small clock drift causing reset.");
elapsed = t.getElapsedTime();
// Now a bigger jump, into the future this time.
t.restart();
// Make the clock jump again by a lot
offsetField.set(t, offset - TimeUnit.SECONDS.toNanos(10));
t.stop();
Assert.assertEquals(t.getElapsedTime(), elapsed, "Time added over checkpoint/restart.");
// Test without stopping
t.start();
offset = ((Long) offsetField.get(t)).longValue();
// Make it as if clock has jumped into the past
offsetField.set(t, offset + TimeUnit.SECONDS.toNanos(10));
Assert.assertEquals(t.getElapsedTime(), 0.0, "Elapsed time after C/R is not zero.");
idleLoop();
Assert.assertTrue(t.getElapsedTime() > 0.0, "Elapsed time zero after re-sync.");
}
private static void idleLoop() {
for ( int i = 0; i < 100000; i++ ) ; // idle loop to wait a tiny bit of time
}