From 468ef382b761432473bbbeb62a89e12097a3488c Mon Sep 17 00:00:00 2001 From: depristo Date: Wed, 12 Jan 2011 17:32:27 +0000 Subject: [PATCH] vastly improved progress meter that estimates % of work done and time until the job finishes and time remaining. Reordered GATK core initialization order -- intervals are created before the scheduler. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4975 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GenomeAnalysisEngine.java | 8 +- .../gatk/traversals/TraversalEngine.java | 295 +++++++++++++----- .../sting/utils/GenomeLocSortedSet.java | 21 ++ .../utils/GenomeLocSortedSetUnitTest.java | 33 +- 4 files changed, 278 insertions(+), 79 deletions(-) diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 165828013..1571de6f0 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -193,6 +193,10 @@ public class GenomeAnalysisEngine { // Prepare the data for traversal. initializeDataSources(); + // initialize and validate the interval list + initializeIntervals(); + validateSuppliedIntervals(); + // our microscheduler, which is in charge of running everything MicroScheduler microScheduler = createMicroscheduler(); @@ -202,10 +206,6 @@ public class GenomeAnalysisEngine { // create the output streams " initializeOutputStreams(microScheduler.getOutputTracker()); - // initialize and validate the interval list - initializeIntervals(); - validateSuppliedIntervals(); - ShardStrategy shardStrategy = getShardStrategy(microScheduler.getReference()); // execute the microscheduler, storing the results diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index ec6e93aae..c25ed6c24 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -30,45 +30,173 @@ import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; -import org.broadinstitute.sting.utils.GenomeLoc; -import org.broadinstitute.sting.utils.Utils; -import org.broadinstitute.sting.utils.MathUtils; +import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.Arrays; -import java.util.Map; +import java.util.*; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { // Time in milliseconds since we initialized this engine - private long startTime = -1; - private long lastProgressPrintTime = -1; // When was the last time we printed our progress? + private static final int HISTORY_WINDOW_SIZE = 50; + + private static class ProcessingHistory { + double elapsedSeconds; + long unitsProcessed; + long bpProcessed; + GenomeLoc loc; + + public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.loc = loc; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + } + + /** + * Simple utility class that makes it convenient to print unit adjusted times + */ + private static class MyTime { + double t; // in Seconds + int precision; // for format + + public MyTime(double t, int precision) { + this.t = t; + this.precision = precision; + } + + public MyTime(double t) { + this(t, 1); + } + + /** + * Instead of 10000 s, returns 2.8 hours + * @return + */ + public String toString() { + double unitTime = t; + String unit = "s"; + + if ( t > 120 ) { + unitTime = t / 60; // minutes + unit = "m"; + + if ( unitTime > 120 ) { + unitTime /= 60; // hours + unit = "h"; + + if ( unitTime > 100 ) { + unitTime /= 24; // days + unit = "d"; + + if ( unitTime > 20 ) { + unitTime /= 7; // days + unit = "w"; + } + } + } + } + + return String.format("%6."+precision+"f %s", unitTime, unit); + } + } + + /** lock object to sure updates to history are consistent across threads */ + private static final Object lock = new Object(); + LinkedList history = new LinkedList(); + + /** We use the SimpleTimer to time our run */ + private SimpleTimer timer = new SimpleTimer("Traversal"); // How long can we go without printing some progress info? - private final long MAX_PROGRESS_PRINT_TIME = 30 * 1000; // in seconds - private final long N_RECORDS_TO_PRINT = 1000000; + private long lastProgressPrintTime = -1; // When was the last time we printed progress log? + private final long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in seconds // for performance log private static final boolean PERFORMANCE_LOG_ENABLED = true; private PrintStream performanceLog = null; - private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? - private final long PERFORMANCE_LOG_PRINT_FREQUENCY = 1 * 1000; // in seconds + private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? + private final long PERFORMANCE_LOG_PRINT_FREQUENCY = PROGRESS_PRINT_FREQUENCY; // in seconds + /** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */ + long targetSize = -1; + GenomeLocSortedSet targetIntervals = null; /** our log, which we want to capture anything from this class */ protected static Logger logger = Logger.getLogger(TraversalEngine.class); protected GenomeAnalysisEngine engine; + // ---------------------------------------------------------------------------------------------------- + // + // ABSTRACT METHODS + // + // ---------------------------------------------------------------------------------------------------- /** * Gets the named traversal type associated with the given traversal. * @return A user-friendly name for the given traversal type. */ protected abstract String getTraversalType(); + /** + * this method must be implemented by all traversal engines + * + * @param walker the walker to run with + * @param dataProvider the data provider that generates data given the shard + * @param sum the accumulator + * + * @return an object of the reduce type + */ + public abstract T traverse(WalkerType walker, + ProviderType dataProvider, + T sum); + + // ---------------------------------------------------------------------------------------------------- + // + // Common timing routines + // + // ---------------------------------------------------------------------------------------------------- + /** + * Initialize the traversal engine. After this point traversals can be run over the data + * @param engine GenomeAnalysisEngine for this traversal + */ + public void initialize(GenomeAnalysisEngine engine) { + if ( engine == null ) + throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); + + this.engine = engine; + + if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { + try { + performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); + List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); + } + } + + // if we don't have any intervals defined, create intervals from the reference itself + if ( this.engine.getIntervals() == null ) + targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary()); + else + targetIntervals = this.engine.getIntervals(); + targetSize = targetIntervals.coveredSize(); + } + + /** + * Should be called to indicate that we're going to process records and the timer should start ticking + */ + public void startTimers() { + timer.start(); + lastProgressPrintTime = timer.currentTime(); + } + /** * @param curTime (current runtime, in millisecs) * @param lastPrintTime the last time we printed, in machine milliseconds @@ -86,7 +214,7 @@ public abstract class TraversalEngine,Provide * @param shard the given shard currently being processed. * @param loc the location */ - public void printProgress(Shard shard,GenomeLoc loc) { + public void printProgress(Shard shard, GenomeLoc loc) { // A bypass is inserted here for unit testing. // TODO: print metrics outside of the traversal engine to more easily handle cumulative stats. ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics() != null ? engine.getCumulativeMetrics().clone() : new ReadMetrics(); @@ -98,49 +226,105 @@ public abstract class TraversalEngine,Provide * Utility routine that prints out process information (including timing) every N records or * every M seconds, for N and M set in global variables. * - * @param loc Current location + * @param loc Current location, can be null if you are at the end of the traversal * @param metrics Metrics of reads filtered in/out. * @param mustPrint If true, will print out info, regardless of nRecords or time interval */ private void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint) { final long nRecords = metrics.getNumIterations(); - final long curTime = System.currentTimeMillis(); - final double elapsed = (curTime - startTime) / 1000.0; - final double secsPer1MReads = (elapsed * 1000000.0) / Math.max(nRecords, 1); - if (mustPrint - || nRecords == 1 - || nRecords % N_RECORDS_TO_PRINT == 0 - || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, MAX_PROGRESS_PRINT_TIME)) { - lastProgressPrintTime = curTime; + if ( nRecords == 1 ) { + logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", + "Location", getTraversalType(), getTraversalType())); - if ( nRecords == 1 ) - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); - else { - if (loc != null) - logger.info(String.format("[PROGRESS] Traversed to %s, processing %,d %s in %.2f secs (%.2f secs per 1M %s)", loc, nRecords, getTraversalType(), elapsed, secsPer1MReads, getTraversalType())); - else - logger.info(String.format("[PROGRESS] Traversed %,d %s in %.2f secs (%.2f secs per 1M %s)", nRecords, getTraversalType(), elapsed, secsPer1MReads, getTraversalType())); + } + else { + final long curTime = timer.currentTime(); + boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, PROGRESS_PRINT_FREQUENCY); + boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); + + if ( printProgress || printLog ) { + lastProgressPrintTime = curTime; + ProcessingHistory last = updateHistory(loc, metrics); + + final MyTime elapsed = new MyTime(last.elapsedSeconds); + final MyTime bpRate = new MyTime(secondsPerMillionBP(last)); + final MyTime unitRate = new MyTime(secondsPerMillionElements(last)); + final double fractionGenomeTargetCompleted = calculateFractionGenomeTargetCompleted(last); + final MyTime estTotalRuntime = new MyTime(elapsed.t / fractionGenomeTargetCompleted); + final MyTime timeToCompletion = new MyTime(estTotalRuntime.t - elapsed.t); + + if ( printProgress ) { +// String common = String.format("%4.1e %s in %s, %s per 1M %s, %4.1f%% complete, est. runtime %s, %s remaining", +// nRecords*1.0, getTraversalType(), elapsed, unitRate, +// getTraversalType(), 100*fractionGenomeTargetCompleted, +// estTotalRuntime, timeToCompletion); +// +// if (loc != null) +// logger.info(String.format("%20s: processing %s", loc, common)); +// else +// logger.info(String.format("Processing %s", common)); + logger.info(String.format("%15s %5.2e %s %s %4.1f%% %s %s", + loc == null ? "done" : loc, nRecords*1.0, elapsed, unitRate, + 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); + + } + + if ( printLog ) { + lastPerformanceLogPrintTime = curTime; + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t, + fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t); + } } } + } - // - // code to process the performance log - // - if ( performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY)) { - lastPerformanceLogPrintTime = curTime; - if ( nRecords > 1 ) performanceLog.printf("%.2f\t%d\t%.2f%n", elapsed, nRecords, secsPer1MReads); + /** + * Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the + * history isn't used in any way, but in the future it'll become valuable for more accurate estimates + * for when a process will complete. + * + * @param loc our current position. If null, assumes we are done traversing + * @param metrics information about what's been processed already + * @return + */ + private final ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) { + synchronized (lock) { + if ( history.size() > HISTORY_WINDOW_SIZE ) + history.pop(); + + long nRecords = metrics.getNumIterations(); + long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing + history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed)); + + return history.getLast(); } } + /** How long in seconds to process 1M traversal units? */ + private final double secondsPerMillionElements(ProcessingHistory last) { + return (last.elapsedSeconds * 1000000.0) / Math.max(last.unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + private final double secondsPerMillionBP(ProcessingHistory last) { + return (last.elapsedSeconds * 1000000.0) / Math.max(last.bpProcessed, 1); + } + + /** What fractoin of the target intervals have we covered? */ + private final double calculateFractionGenomeTargetCompleted(ProcessingHistory last) { + return (1.0*last.bpProcessed) / targetSize; + } + /** * Called after a traversal to print out information about the traversal process */ public void printOnTraversalDone(ReadMetrics cumulativeMetrics) { printProgress(null, cumulativeMetrics, true); - final long curTime = System.currentTimeMillis(); - final double elapsed = (curTime - startTime) / 1000.0; + final double elapsed = timer.getElapsedTime(); // count up the number of skipped reads by summing over all filters long nSkippedReads = 0L; @@ -161,41 +345,4 @@ public abstract class TraversalEngine,Provide if ( performanceLog != null ) performanceLog.close(); } - - /** - * Initialize the traversal engine. After this point traversals can be run over the data - * @param engine GenomeAnalysisEngine for this traversal - */ - public void initialize(GenomeAnalysisEngine engine) { - this.engine = engine; - - if ( PERFORMANCE_LOG_ENABLED && engine != null && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - performanceLog.println(Utils.join("\t", Arrays.asList("elapsed.time", "units.processed", "processing.speed"))); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); - } - } - } - - /** - * Should be called to indicate that we're going to process records and the timer should start ticking - */ - public void startTimers() { - lastProgressPrintTime = startTime = System.currentTimeMillis(); - } - - /** - * this method must be implemented by all traversal engines - * - * @param walker the walker to run with - * @param dataProvider the data provider that generates data given the shard - * @param sum the accumulator - * - * @return an object of the reduce type - */ - public abstract T traverse(WalkerType walker, - ProviderType dataProvider, - T sum); } diff --git a/java/src/org/broadinstitute/sting/utils/GenomeLocSortedSet.java b/java/src/org/broadinstitute/sting/utils/GenomeLocSortedSet.java index eba412e0b..efcc27cf2 100755 --- a/java/src/org/broadinstitute/sting/utils/GenomeLocSortedSet.java +++ b/java/src/org/broadinstitute/sting/utils/GenomeLocSortedSet.java @@ -97,6 +97,27 @@ public class GenomeLocSortedSet extends AbstractSet { return s; } + /** + * Return the number of bps before loc in the sorted set + * + * @param loc the location before which we are counting bases + * @return + */ + public long sizeBeforeLoc(GenomeLoc loc) { + long s = 0; + + for ( GenomeLoc e : this ) { + if ( e.isBefore(loc) ) + s += e.size(); + else if ( e.isPast(loc) ) + ; // don't do anything + else // loc is inside of s + s += loc.getStart() - e.getStart(); + } + + return s; + } + /** * determine if the collection is empty * diff --git a/java/test/org/broadinstitute/sting/utils/GenomeLocSortedSetUnitTest.java b/java/test/org/broadinstitute/sting/utils/GenomeLocSortedSetUnitTest.java index 8e4ff8a2c..3d21e654f 100755 --- a/java/test/org/broadinstitute/sting/utils/GenomeLocSortedSetUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/GenomeLocSortedSetUnitTest.java @@ -213,7 +213,38 @@ public class GenomeLocSortedSetUnitTest extends BaseTest { assertEquals(genomeLocParser.createGenomeLoc(contigOneName, 19, 20), p4); } - + private void testSizeBeforeLocX(int pos, int size) { + GenomeLoc test = genomeLocParser.createGenomeLoc(contigOneName, pos, pos); + assertEquals(mSortedSet.sizeBeforeLoc(test), size, String.format("X pos=%d size=%d", pos, size)); + } + + @Test + public void testSizeBeforeLoc() { + GenomeLoc r1 = genomeLocParser.createGenomeLoc(contigOneName, 3, 5); + GenomeLoc r2 = genomeLocParser.createGenomeLoc(contigOneName, 10, 12); + GenomeLoc r3 = genomeLocParser.createGenomeLoc(contigOneName, 16, 18); + mSortedSet.addAll(Arrays.asList(r1,r2,r3)); + + testSizeBeforeLocX(2, 0); + testSizeBeforeLocX(3, 0); + testSizeBeforeLocX(4, 1); + testSizeBeforeLocX(5, 2); + testSizeBeforeLocX(6, 3); + + testSizeBeforeLocX(10, 3); + testSizeBeforeLocX(11, 4); + testSizeBeforeLocX(12, 5); + testSizeBeforeLocX(13, 6); + testSizeBeforeLocX(15, 6); + + testSizeBeforeLocX(16, 6); + testSizeBeforeLocX(17, 7); + testSizeBeforeLocX(18, 8); + testSizeBeforeLocX(19, 9); + testSizeBeforeLocX(50, 9); + testSizeBeforeLocX(50, (int)mSortedSet.coveredSize()); + } + @Test public void fromSequenceDictionary() {