vastly improved progress meter that estimates % of work done and time until the job finishes and time remaining. Reordered GATK core initialization order -- intervals are created before the scheduler.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4975 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2011-01-12 17:32:27 +00:00
parent bdd382198c
commit 468ef382b7
4 changed files with 278 additions and 79 deletions

View File

@ -193,6 +193,10 @@ public class GenomeAnalysisEngine {
// Prepare the data for traversal.
initializeDataSources();
// initialize and validate the interval list
initializeIntervals();
validateSuppliedIntervals();
// our microscheduler, which is in charge of running everything
MicroScheduler microScheduler = createMicroscheduler();
@ -202,10 +206,6 @@ public class GenomeAnalysisEngine {
// create the output streams "
initializeOutputStreams(microScheduler.getOutputTracker());
// initialize and validate the interval list
initializeIntervals();
validateSuppliedIntervals();
ShardStrategy shardStrategy = getShardStrategy(microScheduler.getReference());
// execute the microscheduler, storing the results

View File

@ -30,45 +30,173 @@ import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.ReadMetrics;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Map;
import java.util.*;
public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,ProviderType extends ShardDataProvider> {
// Time in milliseconds since we initialized this engine
private long startTime = -1;
private long lastProgressPrintTime = -1; // When was the last time we printed our progress?
private static final int HISTORY_WINDOW_SIZE = 50;
private static class ProcessingHistory {
double elapsedSeconds;
long unitsProcessed;
long bpProcessed;
GenomeLoc loc;
public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) {
this.elapsedSeconds = elapsedSeconds;
this.loc = loc;
this.unitsProcessed = unitsProcessed;
this.bpProcessed = bpProcessed;
}
}
/**
* Simple utility class that makes it convenient to print unit adjusted times
*/
private static class MyTime {
double t; // in Seconds
int precision; // for format
public MyTime(double t, int precision) {
this.t = t;
this.precision = precision;
}
public MyTime(double t) {
this(t, 1);
}
/**
* Instead of 10000 s, returns 2.8 hours
* @return
*/
public String toString() {
double unitTime = t;
String unit = "s";
if ( t > 120 ) {
unitTime = t / 60; // minutes
unit = "m";
if ( unitTime > 120 ) {
unitTime /= 60; // hours
unit = "h";
if ( unitTime > 100 ) {
unitTime /= 24; // days
unit = "d";
if ( unitTime > 20 ) {
unitTime /= 7; // days
unit = "w";
}
}
}
}
return String.format("%6."+precision+"f %s", unitTime, unit);
}
}
/** lock object to sure updates to history are consistent across threads */
private static final Object lock = new Object();
LinkedList<ProcessingHistory> history = new LinkedList<ProcessingHistory>();
/** We use the SimpleTimer to time our run */
private SimpleTimer timer = new SimpleTimer("Traversal");
// How long can we go without printing some progress info?
private final long MAX_PROGRESS_PRINT_TIME = 30 * 1000; // in seconds
private final long N_RECORDS_TO_PRINT = 1000000;
private long lastProgressPrintTime = -1; // When was the last time we printed progress log?
private final long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in seconds
// for performance log
private static final boolean PERFORMANCE_LOG_ENABLED = true;
private PrintStream performanceLog = null;
private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log?
private final long PERFORMANCE_LOG_PRINT_FREQUENCY = 1 * 1000; // in seconds
private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log?
private final long PERFORMANCE_LOG_PRINT_FREQUENCY = PROGRESS_PRINT_FREQUENCY; // in seconds
/** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */
long targetSize = -1;
GenomeLocSortedSet targetIntervals = null;
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(TraversalEngine.class);
protected GenomeAnalysisEngine engine;
// ----------------------------------------------------------------------------------------------------
//
// ABSTRACT METHODS
//
// ----------------------------------------------------------------------------------------------------
/**
* Gets the named traversal type associated with the given traversal.
* @return A user-friendly name for the given traversal type.
*/
protected abstract String getTraversalType();
/**
* this method must be implemented by all traversal engines
*
* @param walker the walker to run with
* @param dataProvider the data provider that generates data given the shard
* @param sum the accumulator
*
* @return an object of the reduce type
*/
public abstract T traverse(WalkerType walker,
ProviderType dataProvider,
T sum);
// ----------------------------------------------------------------------------------------------------
//
// Common timing routines
//
// ----------------------------------------------------------------------------------------------------
/**
* Initialize the traversal engine. After this point traversals can be run over the data
* @param engine GenomeAnalysisEngine for this traversal
*/
public void initialize(GenomeAnalysisEngine engine) {
if ( engine == null )
throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!");
this.engine = engine;
if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) {
try {
performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog));
List<String> pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining");
performanceLog.println(Utils.join("\t", pLogHeader));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e);
}
}
// if we don't have any intervals defined, create intervals from the reference itself
if ( this.engine.getIntervals() == null )
targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary());
else
targetIntervals = this.engine.getIntervals();
targetSize = targetIntervals.coveredSize();
}
/**
* Should be called to indicate that we're going to process records and the timer should start ticking
*/
public void startTimers() {
timer.start();
lastProgressPrintTime = timer.currentTime();
}
/**
* @param curTime (current runtime, in millisecs)
* @param lastPrintTime the last time we printed, in machine milliseconds
@ -86,7 +214,7 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
* @param shard the given shard currently being processed.
* @param loc the location
*/
public void printProgress(Shard shard,GenomeLoc loc) {
public void printProgress(Shard shard, GenomeLoc loc) {
// A bypass is inserted here for unit testing.
// TODO: print metrics outside of the traversal engine to more easily handle cumulative stats.
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics() != null ? engine.getCumulativeMetrics().clone() : new ReadMetrics();
@ -98,49 +226,105 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.
*
* @param loc Current location
* @param loc Current location, can be null if you are at the end of the traversal
* @param metrics Metrics of reads filtered in/out.
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
*/
private void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint) {
final long nRecords = metrics.getNumIterations();
final long curTime = System.currentTimeMillis();
final double elapsed = (curTime - startTime) / 1000.0;
final double secsPer1MReads = (elapsed * 1000000.0) / Math.max(nRecords, 1);
if (mustPrint
|| nRecords == 1
|| nRecords % N_RECORDS_TO_PRINT == 0
|| maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, MAX_PROGRESS_PRINT_TIME)) {
lastProgressPrintTime = curTime;
if ( nRecords == 1 ) {
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
"Location", getTraversalType(), getTraversalType()));
if ( nRecords == 1 )
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
else {
if (loc != null)
logger.info(String.format("[PROGRESS] Traversed to %s, processing %,d %s in %.2f secs (%.2f secs per 1M %s)", loc, nRecords, getTraversalType(), elapsed, secsPer1MReads, getTraversalType()));
else
logger.info(String.format("[PROGRESS] Traversed %,d %s in %.2f secs (%.2f secs per 1M %s)", nRecords, getTraversalType(), elapsed, secsPer1MReads, getTraversalType()));
}
else {
final long curTime = timer.currentTime();
boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, PROGRESS_PRINT_FREQUENCY);
boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
if ( printProgress || printLog ) {
lastProgressPrintTime = curTime;
ProcessingHistory last = updateHistory(loc, metrics);
final MyTime elapsed = new MyTime(last.elapsedSeconds);
final MyTime bpRate = new MyTime(secondsPerMillionBP(last));
final MyTime unitRate = new MyTime(secondsPerMillionElements(last));
final double fractionGenomeTargetCompleted = calculateFractionGenomeTargetCompleted(last);
final MyTime estTotalRuntime = new MyTime(elapsed.t / fractionGenomeTargetCompleted);
final MyTime timeToCompletion = new MyTime(estTotalRuntime.t - elapsed.t);
if ( printProgress ) {
// String common = String.format("%4.1e %s in %s, %s per 1M %s, %4.1f%% complete, est. runtime %s, %s remaining",
// nRecords*1.0, getTraversalType(), elapsed, unitRate,
// getTraversalType(), 100*fractionGenomeTargetCompleted,
// estTotalRuntime, timeToCompletion);
//
// if (loc != null)
// logger.info(String.format("%20s: processing %s", loc, common));
// else
// logger.info(String.format("Processing %s", common));
logger.info(String.format("%15s %5.2e %s %s %4.1f%% %s %s",
loc == null ? "done" : loc, nRecords*1.0, elapsed, unitRate,
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
}
if ( printLog ) {
lastPerformanceLogPrintTime = curTime;
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t,
fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t);
}
}
}
}
//
// code to process the performance log
//
if ( performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY)) {
lastPerformanceLogPrintTime = curTime;
if ( nRecords > 1 ) performanceLog.printf("%.2f\t%d\t%.2f%n", elapsed, nRecords, secsPer1MReads);
/**
* Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the
* history isn't used in any way, but in the future it'll become valuable for more accurate estimates
* for when a process will complete.
*
* @param loc our current position. If null, assumes we are done traversing
* @param metrics information about what's been processed already
* @return
*/
private final ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) {
synchronized (lock) {
if ( history.size() > HISTORY_WINDOW_SIZE )
history.pop();
long nRecords = metrics.getNumIterations();
long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing
history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed));
return history.getLast();
}
}
/** How long in seconds to process 1M traversal units? */
private final double secondsPerMillionElements(ProcessingHistory last) {
return (last.elapsedSeconds * 1000000.0) / Math.max(last.unitsProcessed, 1);
}
/** How long in seconds to process 1M bp on the genome? */
private final double secondsPerMillionBP(ProcessingHistory last) {
return (last.elapsedSeconds * 1000000.0) / Math.max(last.bpProcessed, 1);
}
/** What fractoin of the target intervals have we covered? */
private final double calculateFractionGenomeTargetCompleted(ProcessingHistory last) {
return (1.0*last.bpProcessed) / targetSize;
}
/**
* Called after a traversal to print out information about the traversal process
*/
public void printOnTraversalDone(ReadMetrics cumulativeMetrics) {
printProgress(null, cumulativeMetrics, true);
final long curTime = System.currentTimeMillis();
final double elapsed = (curTime - startTime) / 1000.0;
final double elapsed = timer.getElapsedTime();
// count up the number of skipped reads by summing over all filters
long nSkippedReads = 0L;
@ -161,41 +345,4 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
if ( performanceLog != null ) performanceLog.close();
}
/**
* Initialize the traversal engine. After this point traversals can be run over the data
* @param engine GenomeAnalysisEngine for this traversal
*/
public void initialize(GenomeAnalysisEngine engine) {
this.engine = engine;
if ( PERFORMANCE_LOG_ENABLED && engine != null && engine.getArguments() != null && engine.getArguments().performanceLog != null ) {
try {
performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog));
performanceLog.println(Utils.join("\t", Arrays.asList("elapsed.time", "units.processed", "processing.speed")));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e);
}
}
}
/**
* Should be called to indicate that we're going to process records and the timer should start ticking
*/
public void startTimers() {
lastProgressPrintTime = startTime = System.currentTimeMillis();
}
/**
* this method must be implemented by all traversal engines
*
* @param walker the walker to run with
* @param dataProvider the data provider that generates data given the shard
* @param sum the accumulator
*
* @return an object of the reduce type
*/
public abstract T traverse(WalkerType walker,
ProviderType dataProvider,
T sum);
}

View File

@ -97,6 +97,27 @@ public class GenomeLocSortedSet extends AbstractSet<GenomeLoc> {
return s;
}
/**
* Return the number of bps before loc in the sorted set
*
* @param loc the location before which we are counting bases
* @return
*/
public long sizeBeforeLoc(GenomeLoc loc) {
long s = 0;
for ( GenomeLoc e : this ) {
if ( e.isBefore(loc) )
s += e.size();
else if ( e.isPast(loc) )
; // don't do anything
else // loc is inside of s
s += loc.getStart() - e.getStart();
}
return s;
}
/**
* determine if the collection is empty
*

View File

@ -213,7 +213,38 @@ public class GenomeLocSortedSetUnitTest extends BaseTest {
assertEquals(genomeLocParser.createGenomeLoc(contigOneName, 19, 20), p4);
}
private void testSizeBeforeLocX(int pos, int size) {
GenomeLoc test = genomeLocParser.createGenomeLoc(contigOneName, pos, pos);
assertEquals(mSortedSet.sizeBeforeLoc(test), size, String.format("X pos=%d size=%d", pos, size));
}
@Test
public void testSizeBeforeLoc() {
GenomeLoc r1 = genomeLocParser.createGenomeLoc(contigOneName, 3, 5);
GenomeLoc r2 = genomeLocParser.createGenomeLoc(contigOneName, 10, 12);
GenomeLoc r3 = genomeLocParser.createGenomeLoc(contigOneName, 16, 18);
mSortedSet.addAll(Arrays.asList(r1,r2,r3));
testSizeBeforeLocX(2, 0);
testSizeBeforeLocX(3, 0);
testSizeBeforeLocX(4, 1);
testSizeBeforeLocX(5, 2);
testSizeBeforeLocX(6, 3);
testSizeBeforeLocX(10, 3);
testSizeBeforeLocX(11, 4);
testSizeBeforeLocX(12, 5);
testSizeBeforeLocX(13, 6);
testSizeBeforeLocX(15, 6);
testSizeBeforeLocX(16, 6);
testSizeBeforeLocX(17, 7);
testSizeBeforeLocX(18, 8);
testSizeBeforeLocX(19, 9);
testSizeBeforeLocX(50, 9);
testSizeBeforeLocX(50, (int)mSortedSet.coveredSize());
}
@Test
public void fromSequenceDictionary() {