Fix requested by Lee Lichtenstein: first check to see whether it's time for
a progress message, then aggregate metrics. Makes the overhead of printProgress in RealignerTargetCreator go from >20% to ~3%. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5663 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
d35c7d1029
commit
54660a8c25
|
|
@ -176,7 +176,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
outputTracker.bypassThreadLocalStorage(true);
|
outputTracker.bypassThreadLocalStorage(true);
|
||||||
try {
|
try {
|
||||||
walker.onTraversalDone(result);
|
walker.onTraversalDone(result);
|
||||||
printOnTraversalDone(result,engine.getCumulativeMetrics());
|
printOnTraversalDone(result);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
outputTracker.bypassThreadLocalStorage(false);
|
outputTracker.bypassThreadLocalStorage(false);
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
|
|
||||||
Object result = accumulator.finishTraversal();
|
Object result = accumulator.finishTraversal();
|
||||||
|
|
||||||
printOnTraversalDone(result,engine.getCumulativeMetrics());
|
printOnTraversalDone(result);
|
||||||
|
|
||||||
outputTracker.close();
|
outputTracker.close();
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|
|
||||||
|
|
@ -217,8 +217,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
* Print summary information for the analysis.
|
* Print summary information for the analysis.
|
||||||
* @param sum The final reduce output.
|
* @param sum The final reduce output.
|
||||||
*/
|
*/
|
||||||
protected void printOnTraversalDone(Object sum, ReadMetrics metrics) {
|
protected void printOnTraversalDone(Object sum) {
|
||||||
traversalEngine.printOnTraversalDone(metrics);
|
traversalEngine.printOnTraversalDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -119,6 +119,7 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
private long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
|
private long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
|
||||||
private final double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
|
private final double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
|
||||||
private final double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
|
private final double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
|
||||||
|
private boolean progressMeterInitialized = false;
|
||||||
|
|
||||||
// for performance log
|
// for performance log
|
||||||
private static final boolean PERFORMANCE_LOG_ENABLED = true;
|
private static final boolean PERFORMANCE_LOG_ENABLED = true;
|
||||||
|
|
@ -229,10 +230,7 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
*/
|
*/
|
||||||
public void printProgress(Shard shard, GenomeLoc loc) {
|
public void printProgress(Shard shard, GenomeLoc loc) {
|
||||||
// A bypass is inserted here for unit testing.
|
// A bypass is inserted here for unit testing.
|
||||||
// TODO: print metrics outside of the traversal engine to more easily handle cumulative stats.
|
printProgress(loc,shard.getReadMetrics(),false);
|
||||||
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics() != null ? engine.getCumulativeMetrics().clone() : new ReadMetrics();
|
|
||||||
cumulativeMetrics.incrementMetrics(shard.getReadMetrics());
|
|
||||||
printProgress(loc, cumulativeMetrics, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -240,66 +238,62 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
* every M seconds, for N and M set in global variables.
|
* every M seconds, for N and M set in global variables.
|
||||||
*
|
*
|
||||||
* @param loc Current location, can be null if you are at the end of the traversal
|
* @param loc Current location, can be null if you are at the end of the traversal
|
||||||
* @param metrics Metrics of reads filtered in/out.
|
* @param metrics Data processed since the last cumulative
|
||||||
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
|
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
|
||||||
*/
|
*/
|
||||||
private void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint) {
|
private void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint) {
|
||||||
final long nRecords = metrics.getNumIterations();
|
if(!progressMeterInitialized && mustPrint == false ) {
|
||||||
|
|
||||||
if ( nRecords == 1 && mustPrint == false ) {
|
|
||||||
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
|
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
|
||||||
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
|
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
|
||||||
"Location", getTraversalType(), getTraversalType()));
|
"Location", getTraversalType(), getTraversalType()));
|
||||||
|
progressMeterInitialized = true;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
final long curTime = timer.currentTime();
|
|
||||||
boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, PROGRESS_PRINT_FREQUENCY);
|
|
||||||
boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
|
|
||||||
|
|
||||||
if ( printProgress || printLog ) {
|
final long curTime = timer.currentTime();
|
||||||
ProcessingHistory last = updateHistory(loc, metrics);
|
boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, PROGRESS_PRINT_FREQUENCY);
|
||||||
|
boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
|
||||||
|
|
||||||
final MyTime elapsed = new MyTime(last.elapsedSeconds);
|
if ( printProgress || printLog ) {
|
||||||
final MyTime bpRate = new MyTime(secondsPerMillionBP(last));
|
// getting and appending metrics data actually turns out to be quite a heavyweight
|
||||||
final MyTime unitRate = new MyTime(secondsPerMillionElements(last));
|
// operation. Postpone it until after determining whether to print the log message.
|
||||||
final double fractionGenomeTargetCompleted = calculateFractionGenomeTargetCompleted(last);
|
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics() != null ? engine.getCumulativeMetrics() : new ReadMetrics();
|
||||||
final MyTime estTotalRuntime = new MyTime(elapsed.t / fractionGenomeTargetCompleted);
|
if(metrics != null)
|
||||||
final MyTime timeToCompletion = new MyTime(estTotalRuntime.t - elapsed.t);
|
cumulativeMetrics.incrementMetrics(metrics);
|
||||||
|
|
||||||
if ( printProgress ) {
|
final long nRecords = cumulativeMetrics.getNumIterations();
|
||||||
lastProgressPrintTime = curTime;
|
|
||||||
|
|
||||||
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
|
ProcessingHistory last = updateHistory(loc,cumulativeMetrics);
|
||||||
if ( estTotalRuntime.t > TWELVE_HOURS_IN_SECONDS )
|
|
||||||
PROGRESS_PRINT_FREQUENCY = 60 * 1000; // in milliseconds
|
|
||||||
else if ( estTotalRuntime.t > TWO_HOURS_IN_SECONDS )
|
|
||||||
PROGRESS_PRINT_FREQUENCY = 30 * 1000; // in milliseconds
|
|
||||||
else
|
|
||||||
PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
|
|
||||||
|
|
||||||
// String common = String.format("%4.1e %s in %s, %s per 1M %s, %4.1f%% complete, est. runtime %s, %s remaining",
|
final MyTime elapsed = new MyTime(last.elapsedSeconds);
|
||||||
// nRecords*1.0, getTraversalType(), elapsed, unitRate,
|
final MyTime bpRate = new MyTime(secondsPerMillionBP(last));
|
||||||
// getTraversalType(), 100*fractionGenomeTargetCompleted,
|
final MyTime unitRate = new MyTime(secondsPerMillionElements(last));
|
||||||
// estTotalRuntime, timeToCompletion);
|
final double fractionGenomeTargetCompleted = calculateFractionGenomeTargetCompleted(last);
|
||||||
//
|
final MyTime estTotalRuntime = new MyTime(elapsed.t / fractionGenomeTargetCompleted);
|
||||||
// if (loc != null)
|
final MyTime timeToCompletion = new MyTime(estTotalRuntime.t - elapsed.t);
|
||||||
// logger.info(String.format("%20s: processing %s", loc, common));
|
|
||||||
// else
|
|
||||||
// logger.info(String.format("Processing %s", common));
|
|
||||||
logger.info(String.format("%15s %5.2e %s %s %4.1f%% %s %s",
|
|
||||||
loc == null ? "done with mapped reads" : loc, nRecords*1.0, elapsed, unitRate,
|
|
||||||
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
|
|
||||||
|
|
||||||
}
|
if ( printProgress ) {
|
||||||
|
lastProgressPrintTime = curTime;
|
||||||
|
|
||||||
if ( printLog ) {
|
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
|
||||||
lastPerformanceLogPrintTime = curTime;
|
if ( estTotalRuntime.t > TWELVE_HOURS_IN_SECONDS )
|
||||||
synchronized(performanceLogLock) {
|
PROGRESS_PRINT_FREQUENCY = 60 * 1000; // in milliseconds
|
||||||
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
|
else if ( estTotalRuntime.t > TWO_HOURS_IN_SECONDS )
|
||||||
elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t,
|
PROGRESS_PRINT_FREQUENCY = 30 * 1000; // in milliseconds
|
||||||
fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t);
|
else
|
||||||
}
|
PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
|
||||||
|
|
||||||
|
logger.info(String.format("%15s %5.2e %s %s %4.1f%% %s %s",
|
||||||
|
loc == null ? "done with mapped reads" : loc, nRecords*1.0, elapsed, unitRate,
|
||||||
|
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( printLog ) {
|
||||||
|
lastPerformanceLogPrintTime = curTime;
|
||||||
|
synchronized(performanceLogLock) {
|
||||||
|
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
|
||||||
|
elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t,
|
||||||
|
fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -345,11 +339,13 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
/**
|
/**
|
||||||
* Called after a traversal to print out information about the traversal process
|
* Called after a traversal to print out information about the traversal process
|
||||||
*/
|
*/
|
||||||
public void printOnTraversalDone(ReadMetrics cumulativeMetrics) {
|
public void printOnTraversalDone() {
|
||||||
printProgress(null, cumulativeMetrics, true);
|
printProgress(null, null, true);
|
||||||
|
|
||||||
final double elapsed = timer.getElapsedTime();
|
final double elapsed = timer.getElapsedTime();
|
||||||
|
|
||||||
|
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics();
|
||||||
|
|
||||||
// count up the number of skipped reads by summing over all filters
|
// count up the number of skipped reads by summing over all filters
|
||||||
long nSkippedReads = 0L;
|
long nSkippedReads = 0L;
|
||||||
for ( Map.Entry<Class, Long> countsByFilter: cumulativeMetrics.getCountsByFilter().entrySet())
|
for ( Map.Entry<Class, Long> countsByFilter: cumulativeMetrics.getCountsByFilter().entrySet())
|
||||||
|
|
|
||||||
|
|
@ -138,7 +138,6 @@ public class TraverseReadsUnitTest extends BaseTest {
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
traversalEngine.printOnTraversalDone(new ReadMetrics());
|
|
||||||
countReadWalker.onTraversalDone(accumulator);
|
countReadWalker.onTraversalDone(accumulator);
|
||||||
|
|
||||||
if (!(accumulator instanceof Integer)) {
|
if (!(accumulator instanceof Integer)) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue