From 2e94a0a201f1d024207134b2eed36068300aab0c Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 8 Sep 2012 20:17:15 -0400 Subject: [PATCH] Refactor TraversalEngine to extract the progress meter functions -- Previously these core progress metering functions were all in TraversalEngine, and available to subclasses like TraverseLoci via inheritance. The problem here is that the upcoming data threads x cpu threads parallelism requires one master copy of the progress metering shared among all traversals, but multiple instantiations of traverse engines themselves. -- Because the progress metering code has horrible anyway, I've refactored and vastly cleaned up and simplified all of these capabilities into TraversalProgressMeter class. I've simplified down the classes it uses to work (STILL SOME TODOs in there) so that it doesn't reach into the core GATK engine all the time. It should be possible to write some nice tests for it now. By making it its own class, it can protect itself from multi-threaded access with a single synchronized printProgress function instead of carrying around multiple lock objects as before -- Cleaned up the start up of the progress meter. It's now handled when the meter is created, so each micro scheduler doesn't have to deal with proper initialization timing any longer -- Simplified and made clear the interface for shutting down the traversal engines. There's no a shutdown method in TraversalEngine that's called once by the MicroScheduler when the entire traversing in over. Nano traversals now properly shut down (was subtle bug I undercovered here). The printing of on traversal done metering is now handled by MicroScheduler -- The MicroScheduler holds the single master copy of the progress meter, and doles it out to the TraversalEngines (currently 1 but in future commit there will be N). -- Added a nice function to GenomeAnalysisEngine that returns the regions we will be processing, either the intervals requested or the whole genome. Useful for progress meter but also probably for other infrastructure as well -- Remove a lot of the sh*ting Bean interface getting and setting in MicroScheduler that's no longer useful. The generic bean is just a shell interface with nothing in it. -- By removing a lot of these bean accessors and setters many things are now final that used to be dynamic. --- .../sting/gatk/GenomeAnalysisEngine.java | 31 +- .../executive/HierarchicalMicroScheduler.java | 1 - .../HierarchicalMicroSchedulerMBean.java | 2 +- .../gatk/executive/LinearMicroScheduler.java | 3 - .../sting/gatk/executive/MicroScheduler.java | 55 +-- .../gatk/executive/MicroSchedulerMBean.java | 24 +- .../sting/gatk/executive/ShardTraverser.java | 2 - .../gatk/traversals/TraversalEngine.java | 309 ++------------- .../traversals/TraversalProgressMeter.java | 367 ++++++++++++++++++ .../traversals/TraverseActiveRegions.java | 2 +- .../gatk/traversals/TraverseDuplicates.java | 2 +- .../gatk/traversals/TraverseLociBase.java | 2 +- .../gatk/traversals/TraverseLociNano.java | 3 +- .../gatk/traversals/TraverseReadPairs.java | 2 +- .../sting/gatk/traversals/TraverseReads.java | 2 +- .../gatk/traversals/TraverseReadsNano.java | 5 +- .../utils/sam/ArtificialReadsTraversal.java | 2 +- .../traversals/TraverseReadsUnitTest.java | 2 - 18 files changed, 441 insertions(+), 375 deletions(-) create mode 100755 public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 3ce8a92b7..516ea8451 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -24,6 +24,7 @@ package org.broadinstitute.sting.gatk; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.samtools.SAMFileHeader; @@ -682,14 +683,14 @@ public class GenomeAnalysisEngine { // if include argument isn't given, create new set of all possible intervals - Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( + final Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( this.referenceDataSource, argCollection.intervals, argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding, argCollection.excludeIntervals); - GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); - GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); + final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); + final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); // if no exclude arguments, can return parseIntervalArguments directly if ( excludeSortedSet == null ) @@ -700,13 +701,15 @@ public class GenomeAnalysisEngine { intervals = includeSortedSet.subtractRegions(excludeSortedSet); // logging messages only printed when exclude (-XL) arguments are given - long toPruneSize = includeSortedSet.coveredSize(); - long toExcludeSize = excludeSortedSet.coveredSize(); - long intervalSize = intervals.coveredSize(); + final long toPruneSize = includeSortedSet.coveredSize(); + final long toExcludeSize = excludeSortedSet.coveredSize(); + final long intervalSize = intervals.coveredSize(); logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize)); logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)", toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize))); } + + logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize())); } /** @@ -981,6 +984,22 @@ public class GenomeAnalysisEngine { return this.intervals; } + /** + * Get the list of regions of the genome being processed. If the user + * requested specific intervals, return those, otherwise return regions + * corresponding to the entire genome. Never returns null. + * + * @return a non-null set of intervals being processed + */ + @Ensures("result != null") + public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() { + if ( getIntervals() == null ) + // if we don't have any intervals defined, create intervals from the reference itself + return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary()); + else + return getIntervals(); + } + /** * Gets the list of filters employed by this engine. * @return Collection of filters (actual instances) used by this engine. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index f1d2f7b5b..486e83e60 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar outputTracker.bypassThreadLocalStorage(true); try { walker.onTraversalDone(result); - printOnTraversalDone(result); } finally { outputTracker.bypassThreadLocalStorage(false); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 530285db0..87d0ad721 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive; * An interface for retrieving runtime statistics about how the hierarchical * microscheduler is behaving. */ -public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { +public interface HierarchicalMicroSchedulerMBean { /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue? diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index ceb4a6f9b..697e908fd 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -60,7 +60,6 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; - traversalEngine.startTimersIfNecessary(); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -95,8 +94,6 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = accumulator.finishTraversal(); - printOnTraversalDone(result); - outputTracker.close(); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index c6ef9acf1..0e8208594 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; @@ -89,6 +90,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + final TraversalProgressMeter progressMeter; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -170,9 +173,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { traversalEngine = new TraverseActiveRegions(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } + } - traversalEngine.initialize(engine); + final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; + this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile, + traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -231,18 +237,15 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator(); } - /** - * Print summary information for the analysis. - * @param sum The final reduce output. - */ - protected void printOnTraversalDone(Object sum) { - traversalEngine.printOnTraversalDone(); - } - /** * Must be called by subclasses when execute is done */ protected void executionIsDone() { + progressMeter.printOnDone(); + + // TODO -- generalize to all local thread copies + traversalEngine.shutdown(); + // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { // include the master thread information @@ -269,38 +272,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public IndexedFastaSequenceFile getReference() { return reference; } - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - return traversalEngine.getPerformanceLogFileName(); - } - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - traversalEngine.setPerformanceLogFileName(fileName); - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return traversalEngine.getPerformanceProgressPrintFrequencySeconds(); - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds); - } - protected void cleanup() { try { mBeanServer.unregisterMBean(mBeanName); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java index e510822b8..8be6b0b62 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java @@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive; * To change this template use File | Settings | File Templates. */ public interface MicroSchedulerMBean { - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName(); - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName); - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds(); - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds); + // has nothing because we don't have anything we currently track } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index aefa9c12d..790c6b3ed 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; /** * User: hanna * Date: Apr 29, 2009 @@ -56,7 +55,6 @@ public class ShardTraverser implements Callable { public Object call() { try { - traversalEngine.startTimersIfNecessary(); final long startTime = System.currentTimeMillis(); Object accumulator = walker.reduceInit(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 8c617e4dc..159343bf8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -30,66 +30,28 @@ import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); - // Time in milliseconds since we initialized this engine - private static final int HISTORY_WINDOW_SIZE = 50; - - /** lock object to sure updates to history are consistent across threads */ - private static final Object lock = new Object(); - LinkedList history = new LinkedList(); - - /** We use the SimpleTimer to time our run */ - private SimpleTimer timer = null; - - // How long can we go without printing some progress info? - private long lastProgressPrintTime = -1; // When was the last time we printed progress log? - - private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds - private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; - private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; - private long progressPrintFrequency = 10 * 1000; // in milliseconds - private boolean progressMeterInitialized = false; - - // for performance log - private static final boolean PERFORMANCE_LOG_ENABLED = true; - private final Object performanceLogLock = new Object(); - private File performanceLogFile; - private PrintStream performanceLog = null; - private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? - private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds - - /** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */ - long targetSize = -1; - GenomeLocSortedSet targetIntervals = null; - protected GenomeAnalysisEngine engine; + private TraversalProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // // ABSTRACT METHODS // // ---------------------------------------------------------------------------------------------------- + /** - * Gets the named traversal type associated with the given traversal. + * Gets the named traversal type associated with the given traversal, such as loci, reads, etc. + * * @return A user-friendly name for the given traversal type. */ - protected abstract String getTraversalType(); + public abstract String getTraversalUnits(); /** * this method must be implemented by all traversal engines @@ -104,70 +66,36 @@ public abstract class TraversalEngine,Provide ProviderType dataProvider, T sum); - // ---------------------------------------------------------------------------------------------------- - // - // Common timing routines - // - // ---------------------------------------------------------------------------------------------------- /** * Initialize the traversal engine. After this point traversals can be run over the data + * * @param engine GenomeAnalysisEngine for this traversal + * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(GenomeAnalysisEngine engine) { + public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); this.engine = engine; - - if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - synchronized(this.performanceLogLock) { - performanceLogFile = engine.getArguments().performanceLog; - createNewPerformanceLog(); - } - } - - // if we don't have any intervals defined, create intervals from the reference itself - if ( this.engine.getIntervals() == null ) - targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary()); - else - targetIntervals = this.engine.getIntervals(); - targetSize = targetIntervals.coveredSize(); - } - - private void createNewPerformanceLog() { - synchronized(performanceLogLock) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); - performanceLog.println(Utils.join("\t", pLogHeader)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); - } - } - } - /** - * Should be called to indicate that we're going to process records and the timer should start ticking. This - * function should be called right before any traversal work is done, to avoid counting setup costs in the - * processing costs and inflating the estimated runtime. - */ - public void startTimersIfNecessary() { - if ( timer == null ) { - timer = new SimpleTimer("Traversal"); - timer.start(); - lastProgressPrintTime = timer.currentTime(); - } + this.progressMeter = progressMeter; } /** - * @param curTime (current runtime, in millisecs) - * @param lastPrintTime the last time we printed, in machine milliseconds - * @param printFreq maximum permitted difference between last print and current times + * For testing only. Does not initialize the progress meter * - * @return true if the maximum interval (in millisecs) has passed since the last printing + * @param engine */ - private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { - long elapsed = curTime - lastPrintTime; - return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + protected void initialize(final GenomeAnalysisEngine engine) { + initialize(engine, null); + } + + /** + * Called by the MicroScheduler when all work is done and the GATK is shutting down. + * + * To be used by subclasses that need to free up resources (such as threads) + */ + public void shutdown() { + // by default there's nothing to do } /** @@ -197,194 +125,7 @@ public abstract class TraversalEngine,Provide * @param loc the location */ public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); - } - - /** - * Utility routine that prints out process information (including timing) every N records or - * every M seconds, for N and M set in global variables. - * - * @param loc Current location, can be null if you are at the end of the traversal - * @param mustPrint If true, will print out info, regardless of nRecords or time interval - */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { - if( ! progressMeterInitialized ) { - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); - logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", getTraversalType(), getTraversalType())); - progressMeterInitialized = true; - } - - final long curTime = timer.currentTime(); - boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); - boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); - - if ( printProgress || printLog ) { - final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics()); - - final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds); - final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP()); - final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements()); - final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize); - final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); - final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); - final long nRecords = engine.getCumulativeMetrics().getNumIterations(); - - if ( printProgress ) { - lastProgressPrintTime = curTime; - - // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates - if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS ) - progressPrintFrequency = 60 * 1000; // in milliseconds - else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS ) - progressPrintFrequency = 30 * 1000; // in milliseconds - else - progressPrintFrequency = 10 * 1000; // in milliseconds - - final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart()); - logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, nRecords*1.0, elapsed, unitRate, - 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); - - } - - if ( printLog ) { - lastPerformanceLogPrintTime = curTime; - synchronized(performanceLogLock) { - performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed, - bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), - timeToCompletion.getTimeInSeconds()); - } - } - } - } - - /** - * Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the - * history isn't used in any way, but in the future it'll become valuable for more accurate estimates - * for when a process will complete. - * - * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already - * @return - */ - private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) { - synchronized (lock) { - if ( history.size() > HISTORY_WINDOW_SIZE ) - history.pop(); - - long nRecords = metrics.getNumIterations(); - long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing - history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed)); - - return history.getLast(); - } - } - - /** - * Called after a traversal to print out information about the traversal process - */ - public void printOnTraversalDone() { - printProgress(null, true); - - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); - - ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); - - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); - } - - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - synchronized(performanceLogLock) { - return performanceLogFile.getAbsolutePath(); - } - } - - /** - * Sets the filename of the log for performance. If set, will write performance data. - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - File file = new File(fileName); - - synchronized(performanceLogLock) { - // Ignore multiple calls to reset the same lock. - if(performanceLogFile != null && performanceLogFile.equals(file)) - return; - - // Close an existing log - if(performanceLog != null) performanceLog.close(); - - performanceLogFile = file; - createNewPerformanceLog(); - } - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return progressPrintFrequency; - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - progressPrintFrequency = seconds; - } - - private static class ProcessingHistory { - double elapsedSeconds; - long unitsProcessed; - long bpProcessed; - GenomeLoc loc; - - public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.loc = loc; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fractoin of the target intervals have we covered? */ - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / targetSize; - } + if ( progressMeter != null ) progressMeter.printProgress(loc); } } + diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java new file mode 100755 index 000000000..72f20fb0c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.traversals; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.ReadMetrics; +import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * A progress meter that prints a few key metrics to a logger and optionally to a file + * + * Metrics include: + * -- Number of processed X (X = traversal units) + * -- Runtime per.1M X + * -- Percent of regions to be processed completed + * -- The estimated total runtime based on previous performance + * -- The estimated time remaining for the entire process + * + * The optional file log an expanded set of metrics in tabular format + * suitable for subsequent analysis in R. + * + * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent + * threads executing traversals will be calling printProgress() simultaneously and this + * class does (and MUST) properly sort out the timings of logs without interlacing outputs + * because of these threads. + * + * Consequently, the fundamental model for when to print the logs is time based. We basically + * print a meter message every X seconds, minutes, hours, whatever is appropriate based on the + * estimated remaining runtime. + * + * @author depristo + * @since 2010 maybe, but written in 09/12 for clarity + */ +@Invariant({ + "targetSizeInBP >= 0", + "progressPrintFrequency > 0" +}) +public class TraversalProgressMeter { + protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class); + + // -------------------------------------------------------------------------------- + // static constants controlling overall system behavior + // -------------------------------------------------------------------------------- + + /** + * Min. milliseconds after we start up the meter before we will print our first meter message + */ + private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; + + /** + * How often should we print performance logging information, when we are sending this + * information to a file? Not dynamically updated as the logger meter is. + */ + private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000; + + private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; + private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; + + // -------------------------------------------------------------------------------- + // Variables we updating during running + // -------------------------------------------------------------------------------- + + /** + * When was the last time we printed progress log? In milleseconds + */ + private long lastProgressPrintTime = -1; + + /** + * How frequently should we be printing our meter messages? Dynamically updated + * depending on how long we think the run has left. + */ + private long progressPrintFrequency = 10 * 1000; // default value + + /** + * When was the last time we printed to the performance log? In millseconds + */ + private long lastPerformanceLogPrintTime = -1; + + // -------------------------------------------------------------------------------- + // final variables fixed at object creation time + // -------------------------------------------------------------------------------- + + /** + * The set of genome locs describing the total region we are processing with + * this GATK run. Used to determine how close we are to completing the run + */ + private final GenomeLocSortedSet regionsBeingProcessed; + + /** + * Size, in bp, of the area we are processing, derived from regionsBeingProcessed. + * Updated once in the system in initial for performance reasons + */ + private final long targetSizeInBP; + + /** + * Used to get the total number of records we've processed so far. + */ + final ReadMetrics cumulativeMetrics; + + /** + * A string describing the type of this traversal, so we can say things like + * "we are running at X traversalType per second" + */ + private final String traversalType; + + /** + * A potentially null file where we print a supplementary, R readable performance log + * file. + */ + private final PrintStream performanceLog; + + /** We use the SimpleTimer to time our run */ + private final SimpleTimer timer = new SimpleTimer("Traversal"); + + /** + * Create a new TraversalProgressMeter + * + * @param cumulativeMetrics the object where the shared traversal counts are being updated + * @param performanceLogFile an optional performance log file where a table of performance logs will be written + * @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits + * @param processingIntervals the intervals being processed + */ + public TraversalProgressMeter(final ReadMetrics cumulativeMetrics, + final File performanceLogFile, + final String traversalUnits, + final GenomeLocSortedSet processingIntervals) { + if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!"); + if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null"); + if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); + + this.cumulativeMetrics = cumulativeMetrics; + this.traversalType = traversalUnits; + this.regionsBeingProcessed = processingIntervals; + + // setup the performance logger output, if requested by the GATK engine + if ( performanceLogFile != null ) { + try { + this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); + final List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", + "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e); + } + } else { + performanceLog = null; + } + + // cached for performance reasons + targetSizeInBP = processingIntervals.coveredSize(); + + // start up the timer + start(); + } + + /** + * Forward request to printProgress + * + * Assumes that one cycle has been completed + * + * @param loc the location + */ + public void printProgress(final GenomeLoc loc) { + // A bypass is inserted here for unit testing. + printProgress(loc, false); + } + + private synchronized void start() { + timer.start(); + lastProgressPrintTime = timer.currentTime(); + + logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", + "Location", traversalType, traversalType)); + } + + /** + * Utility routine that prints out process information (including timing) every N records or + * every M seconds, for N and M set in global variables. + * + * Synchronized to ensure that even with multiple threads calling printProgress we still + * get one clean stream of meter logs. + * + * @param loc Current location, can be null if you are at the end of the traversal + * @param mustPrint If true, will print out info, regardless of time interval + */ + private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { + final long curTime = timer.currentTime(); + final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); + final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); + + if ( printProgress || printLog ) { + final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics); + + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds); + final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); + final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); + final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); + final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); + final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); + + if ( printProgress ) { + lastProgressPrintTime = curTime; + updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds()); + + // a pretty name for our position + final String posName = loc == null + ? (mustPrint ? "done" : "unmapped reads") + : String.format("%s:%d", loc.getContig(), loc.getStart()); + + logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", + posName, progressData.unitsProcessed*1.0, elapsed, unitRate, + 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); + + } + + if ( printLog ) { + lastPerformanceLogPrintTime = curTime; + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(), + progressData.bpProcessed, bpRate.getTimeInSeconds(), + fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), + timeToCompletion.getTimeInSeconds()); + } + } + } + + /** + * Determine, based on remaining runtime, how often to print the meter + * + * @param totalRuntimeSeconds kinda obvious, no? + */ + private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) { + // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates + if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS ) + progressPrintFrequency = 60 * 1000; // in milliseconds + else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS ) + progressPrintFrequency = 30 * 1000; // in milliseconds + else + progressPrintFrequency = 10 * 1000; // in milliseconds + } + + /** + * Creates a new ProgressData object recording a snapshot of our progress at this instant + * + * @param loc our current position. If null, assumes we are done traversing + * @param metrics information about what's been processed already + * @return + */ + private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) { + final long nRecords = metrics.getNumIterations(); + // null -> end of processing + final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); + return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed); + } + + /** + * Called after a traversal to print out information about the traversal process + */ + public void printOnDone() { + printProgress(null, true); + + final double elapsed = timer == null ? 0 : timer.getElapsedTime(); + + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); + + // TODO -- move into MicroScheduler + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); + for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + + if ( performanceLog != null ) performanceLog.close(); + } + + /** + * @param curTime (current runtime, in millisecs) + * @param lastPrintTime the last time we printed, in machine milliseconds + * @param printFreq maximum permitted difference between last print and current times + * + * @return true if the maximum interval (in millisecs) has passed since the last printing + */ + private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { + final long elapsed = curTime - lastPrintTime; + return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + } + + /** + * a snapshot of our performance, suitable for storage and later analysis + */ + private static class ProgressData { + final double elapsedSeconds; + final long unitsProcessed; + final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + private double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + private double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + private double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index bbd9346b3..2b7b2f9f5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -36,7 +36,7 @@ public class TraverseActiveRegions extends TraversalEngine myReads = new LinkedHashSet(); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "active regions"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 2b45d894c..2e43ef8f8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -54,7 +54,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine extends TraverseLociBase { } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 9b076fce4..aef3cf7d0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -27,7 +27,7 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read protected static final Logger logger = Logger.getLogger(TraverseReads.class); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index b3a0a1390..77ab0c891 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -65,7 +65,7 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } @@ -135,9 +135,8 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java index 475f7de21..9632a687b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java +++ b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java @@ -69,7 +69,7 @@ public class ArtificialReadsTraversal extends TraversalEngine