Refactor TraversalEngine to extract the progress meter functions

-- Previously these core progress metering functions were all in TraversalEngine, and available to subclasses like TraverseLoci via inheritance.  The problem here is that the upcoming data threads x cpu threads parallelism requires one master copy of the progress metering shared among all traversals, but multiple instantiations of traverse engines themselves.
-- Because the progress metering code has horrible anyway, I've refactored and vastly cleaned up and simplified all of these capabilities into TraversalProgressMeter class.  I've simplified down the classes it uses to work (STILL SOME TODOs in there) so that it doesn't reach into the core GATK engine all the time.  It should be possible to write some nice tests for it now.  By making it its own class, it can protect itself from multi-threaded access with a single synchronized printProgress function instead of carrying around multiple lock objects as before
-- Cleaned up the start up of the progress meter.  It's now handled when the meter is created, so each micro scheduler doesn't have to deal with proper initialization timing any longer
-- Simplified and made clear the interface for shutting down the traversal engines.  There's no a shutdown method in TraversalEngine that's called once by the MicroScheduler when the entire traversing in over.  Nano traversals now properly shut down (was subtle bug I undercovered here).  The printing of on traversal done metering is now handled by MicroScheduler
-- The MicroScheduler holds the single master copy of the progress meter, and doles it out to the TraversalEngines (currently 1 but in future commit there will be N).
-- Added a nice function to GenomeAnalysisEngine that returns the regions we will be processing, either the intervals requested or the whole genome.  Useful for progress meter but also probably for other infrastructure as well
-- Remove a lot of the sh*ting Bean interface getting and setting in MicroScheduler that's no longer useful.  The generic bean is just a shell interface with nothing in it.
-- By removing a lot of these bean accessors and setters many things are now final that used to be dynamic.
This commit is contained in:
Mark DePristo 2012-09-08 20:17:15 -04:00
parent 4a84ff4fce
commit 2e94a0a201
18 changed files with 441 additions and 375 deletions

View File

@ -24,6 +24,7 @@
package org.broadinstitute.sting.gatk;
import com.google.java.contract.Ensures;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import net.sf.picard.reference.ReferenceSequenceFile;
import net.sf.samtools.SAMFileHeader;
@ -682,14 +683,14 @@ public class GenomeAnalysisEngine {
// if include argument isn't given, create new set of all possible intervals
Pair<GenomeLocSortedSet, GenomeLocSortedSet> includeExcludePair = IntervalUtils.parseIntervalBindingsPair(
final Pair<GenomeLocSortedSet, GenomeLocSortedSet> includeExcludePair = IntervalUtils.parseIntervalBindingsPair(
this.referenceDataSource,
argCollection.intervals,
argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding,
argCollection.excludeIntervals);
GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst();
GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond();
final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst();
final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond();
// if no exclude arguments, can return parseIntervalArguments directly
if ( excludeSortedSet == null )
@ -700,13 +701,15 @@ public class GenomeAnalysisEngine {
intervals = includeSortedSet.subtractRegions(excludeSortedSet);
// logging messages only printed when exclude (-XL) arguments are given
long toPruneSize = includeSortedSet.coveredSize();
long toExcludeSize = excludeSortedSet.coveredSize();
long intervalSize = intervals.coveredSize();
final long toPruneSize = includeSortedSet.coveredSize();
final long toExcludeSize = excludeSortedSet.coveredSize();
final long intervalSize = intervals.coveredSize();
logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize));
logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)",
toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize)));
}
logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize()));
}
/**
@ -981,6 +984,22 @@ public class GenomeAnalysisEngine {
return this.intervals;
}
/**
* Get the list of regions of the genome being processed. If the user
* requested specific intervals, return those, otherwise return regions
* corresponding to the entire genome. Never returns null.
*
* @return a non-null set of intervals being processed
*/
@Ensures("result != null")
public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() {
if ( getIntervals() == null )
// if we don't have any intervals defined, create intervals from the reference itself
return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary());
else
return getIntervals();
}
/**
* Gets the list of filters employed by this engine.
* @return Collection of filters (actual instances) used by this engine.

View File

@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
outputTracker.bypassThreadLocalStorage(true);
try {
walker.onTraversalDone(result);
printOnTraversalDone(result);
}
finally {
outputTracker.bypassThreadLocalStorage(false);

View File

@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive;
* An interface for retrieving runtime statistics about how the hierarchical
* microscheduler is behaving.
*/
public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean {
public interface HierarchicalMicroSchedulerMBean {
/**
* How many tree reduces are waiting in the tree reduce queue?
* @return Total number of reduces waiting in the tree reduce queue?

View File

@ -60,7 +60,6 @@ public class LinearMicroScheduler extends MicroScheduler {
boolean done = walker.isDone();
int counter = 0;
traversalEngine.startTimersIfNecessary();
for (Shard shard : shardStrategy ) {
if ( done || shard == null ) // we ran out of shards that aren't owned
break;
@ -95,8 +94,6 @@ public class LinearMicroScheduler extends MicroScheduler {
Object result = accumulator.finishTraversal();
printOnTraversalDone(result);
outputTracker.close();
cleanup();
executionIsDone();

View File

@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.Collection;
@ -89,6 +90,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
final TraversalProgressMeter progressMeter;
/**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
* selected walker.
@ -170,9 +173,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
traversalEngine = new TraverseActiveRegions();
} else {
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
}
}
traversalEngine.initialize(engine);
final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog;
this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile,
traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed());
traversalEngine.initialize(engine, progressMeter);
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
@ -231,18 +237,15 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator();
}
/**
* Print summary information for the analysis.
* @param sum The final reduce output.
*/
protected void printOnTraversalDone(Object sum) {
traversalEngine.printOnTraversalDone();
}
/**
* Must be called by subclasses when execute is done
*/
protected void executionIsDone() {
progressMeter.printOnDone();
// TODO -- generalize to all local thread copies
traversalEngine.shutdown();
// Print out the threading efficiency of this HMS, if state monitoring is enabled
if ( threadEfficiencyMonitor != null ) {
// include the master thread information
@ -269,38 +272,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
public IndexedFastaSequenceFile getReference() { return reference; }
/**
* Gets the filename to which performance data is currently being written.
* @return Filename to which performance data is currently being written.
*/
public String getPerformanceLogFileName() {
return traversalEngine.getPerformanceLogFileName();
}
/**
* Set the filename of the log for performance. If set,
* @param fileName filename to use when writing performance data.
*/
public void setPerformanceLogFileName(String fileName) {
traversalEngine.setPerformanceLogFileName(fileName);
}
/**
* Gets the frequency with which performance data is written.
* @return Frequency, in seconds, of performance log writes.
*/
public long getPerformanceProgressPrintFrequencySeconds() {
return traversalEngine.getPerformanceProgressPrintFrequencySeconds();
}
/**
* How often should the performance log message be written?
* @param seconds number of seconds between messages indicating performance frequency.
*/
public void setPerformanceProgressPrintFrequencySeconds(long seconds) {
traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds);
}
protected void cleanup() {
try {
mBeanServer.unregisterMBean(mBeanName);

View File

@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive;
* To change this template use File | Settings | File Templates.
*/
public interface MicroSchedulerMBean {
/**
* Gets the filename to which performance data is currently being written.
* @return Filename to which performance data is currently being written.
*/
public String getPerformanceLogFileName();
/**
* Set the filename of the log for performance. If set,
* @param fileName filename to use when writing performance data.
*/
public void setPerformanceLogFileName(String fileName);
/**
* Gets the frequency with which performance data is written.
* @return Frequency, in seconds, of performance log writes.
*/
public long getPerformanceProgressPrintFrequencySeconds();
/**
* How often should the performance log message be written?
* @param seconds number of seconds between messages indicating performance frequency.
*/
public void setPerformanceProgressPrintFrequencySeconds(long seconds);
// has nothing because we don't have anything we currently track
}

View File

@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* User: hanna
* Date: Apr 29, 2009
@ -56,7 +55,6 @@ public class ShardTraverser implements Callable {
public Object call() {
try {
traversalEngine.startTimersIfNecessary();
final long startTime = System.currentTimeMillis();
Object accumulator = walker.reduceInit();

View File

@ -30,66 +30,28 @@ import org.broadinstitute.sting.gatk.ReadMetrics;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,ProviderType extends ShardDataProvider> {
/** our log, which we want to capture anything from this class */
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
// Time in milliseconds since we initialized this engine
private static final int HISTORY_WINDOW_SIZE = 50;
/** lock object to sure updates to history are consistent across threads */
private static final Object lock = new Object();
LinkedList<ProcessingHistory> history = new LinkedList<ProcessingHistory>();
/** We use the SimpleTimer to time our run */
private SimpleTimer timer = null;
// How long can we go without printing some progress info?
private long lastProgressPrintTime = -1; // When was the last time we printed progress log?
private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds
private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
private long progressPrintFrequency = 10 * 1000; // in milliseconds
private boolean progressMeterInitialized = false;
// for performance log
private static final boolean PERFORMANCE_LOG_ENABLED = true;
private final Object performanceLogLock = new Object();
private File performanceLogFile;
private PrintStream performanceLog = null;
private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log?
private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds
/** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */
long targetSize = -1;
GenomeLocSortedSet targetIntervals = null;
protected GenomeAnalysisEngine engine;
private TraversalProgressMeter progressMeter;
// ----------------------------------------------------------------------------------------------------
//
// ABSTRACT METHODS
//
// ----------------------------------------------------------------------------------------------------
/**
* Gets the named traversal type associated with the given traversal.
* Gets the named traversal type associated with the given traversal, such as loci, reads, etc.
*
* @return A user-friendly name for the given traversal type.
*/
protected abstract String getTraversalType();
public abstract String getTraversalUnits();
/**
* this method must be implemented by all traversal engines
@ -104,70 +66,36 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
ProviderType dataProvider,
T sum);
// ----------------------------------------------------------------------------------------------------
//
// Common timing routines
//
// ----------------------------------------------------------------------------------------------------
/**
* Initialize the traversal engine. After this point traversals can be run over the data
*
* @param engine GenomeAnalysisEngine for this traversal
* @param progressMeter An optional (null == optional) meter to track our progress
*/
public void initialize(GenomeAnalysisEngine engine) {
public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) {
if ( engine == null )
throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!");
this.engine = engine;
if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) {
synchronized(this.performanceLogLock) {
performanceLogFile = engine.getArguments().performanceLog;
createNewPerformanceLog();
}
}
// if we don't have any intervals defined, create intervals from the reference itself
if ( this.engine.getIntervals() == null )
targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary());
else
targetIntervals = this.engine.getIntervals();
targetSize = targetIntervals.coveredSize();
}
private void createNewPerformanceLog() {
synchronized(performanceLogLock) {
try {
performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog));
List<String> pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining");
performanceLog.println(Utils.join("\t", pLogHeader));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e);
}
}
}
/**
* Should be called to indicate that we're going to process records and the timer should start ticking. This
* function should be called right before any traversal work is done, to avoid counting setup costs in the
* processing costs and inflating the estimated runtime.
*/
public void startTimersIfNecessary() {
if ( timer == null ) {
timer = new SimpleTimer("Traversal");
timer.start();
lastProgressPrintTime = timer.currentTime();
}
this.progressMeter = progressMeter;
}
/**
* @param curTime (current runtime, in millisecs)
* @param lastPrintTime the last time we printed, in machine milliseconds
* @param printFreq maximum permitted difference between last print and current times
* For testing only. Does not initialize the progress meter
*
* @return true if the maximum interval (in millisecs) has passed since the last printing
* @param engine
*/
private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) {
long elapsed = curTime - lastPrintTime;
return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS;
protected void initialize(final GenomeAnalysisEngine engine) {
initialize(engine, null);
}
/**
* Called by the MicroScheduler when all work is done and the GATK is shutting down.
*
* To be used by subclasses that need to free up resources (such as threads)
*/
public void shutdown() {
// by default there's nothing to do
}
/**
@ -197,194 +125,7 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
* @param loc the location
*/
public void printProgress(final GenomeLoc loc) {
// A bypass is inserted here for unit testing.
printProgress(loc, false);
}
/**
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.
*
* @param loc Current location, can be null if you are at the end of the traversal
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
*/
private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) {
if( ! progressMeterInitialized ) {
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
"Location", getTraversalType(), getTraversalType()));
progressMeterInitialized = true;
}
final long curTime = timer.currentTime();
boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
if ( printProgress || printLog ) {
final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics());
final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds);
final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP());
final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements());
final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize);
final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted);
final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds());
final long nRecords = engine.getCumulativeMetrics().getNumIterations();
if ( printProgress ) {
lastProgressPrintTime = curTime;
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS )
progressPrintFrequency = 60 * 1000; // in milliseconds
else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS )
progressPrintFrequency = 30 * 1000; // in milliseconds
else
progressPrintFrequency = 10 * 1000; // in milliseconds
final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart());
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
posName, nRecords*1.0, elapsed, unitRate,
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
}
if ( printLog ) {
lastPerformanceLogPrintTime = curTime;
synchronized(performanceLogLock) {
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed,
bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(),
timeToCompletion.getTimeInSeconds());
}
}
}
}
/**
* Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the
* history isn't used in any way, but in the future it'll become valuable for more accurate estimates
* for when a process will complete.
*
* @param loc our current position. If null, assumes we are done traversing
* @param metrics information about what's been processed already
* @return
*/
private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) {
synchronized (lock) {
if ( history.size() > HISTORY_WINDOW_SIZE )
history.pop();
long nRecords = metrics.getNumIterations();
long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing
history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed));
return history.getLast();
}
}
/**
* Called after a traversal to print out information about the traversal process
*/
public void printOnTraversalDone() {
printProgress(null, true);
final double elapsed = timer == null ? 0 : timer.getElapsedTime();
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics();
// count up the number of skipped reads by summing over all filters
long nSkippedReads = 0L;
for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values())
nSkippedReads += countsByFilter;
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600));
if ( cumulativeMetrics.getNumReadsSeen() > 0 )
logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)",
nSkippedReads,
cumulativeMetrics.getNumReadsSeen(),
100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen())));
for ( Map.Entry<String, Long> filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) {
long count = filterCounts.getValue();
logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s",
count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey()));
}
if ( performanceLog != null ) performanceLog.close();
}
/**
* Gets the filename to which performance data is currently being written.
* @return Filename to which performance data is currently being written.
*/
public String getPerformanceLogFileName() {
synchronized(performanceLogLock) {
return performanceLogFile.getAbsolutePath();
}
}
/**
* Sets the filename of the log for performance. If set, will write performance data.
* @param fileName filename to use when writing performance data.
*/
public void setPerformanceLogFileName(String fileName) {
File file = new File(fileName);
synchronized(performanceLogLock) {
// Ignore multiple calls to reset the same lock.
if(performanceLogFile != null && performanceLogFile.equals(file))
return;
// Close an existing log
if(performanceLog != null) performanceLog.close();
performanceLogFile = file;
createNewPerformanceLog();
}
}
/**
* Gets the frequency with which performance data is written.
* @return Frequency, in seconds, of performance log writes.
*/
public long getPerformanceProgressPrintFrequencySeconds() {
return progressPrintFrequency;
}
/**
* How often should the performance log message be written?
* @param seconds number of seconds between messages indicating performance frequency.
*/
public void setPerformanceProgressPrintFrequencySeconds(long seconds) {
progressPrintFrequency = seconds;
}
private static class ProcessingHistory {
double elapsedSeconds;
long unitsProcessed;
long bpProcessed;
GenomeLoc loc;
public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) {
this.elapsedSeconds = elapsedSeconds;
this.loc = loc;
this.unitsProcessed = unitsProcessed;
this.bpProcessed = bpProcessed;
}
/** How long in seconds to process 1M traversal units? */
private double secondsPerMillionElements() {
return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1);
}
/** How long in seconds to process 1M bp on the genome? */
private double secondsPerMillionBP() {
return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1);
}
/** What fractoin of the target intervals have we covered? */
private double calculateFractionGenomeTargetCompleted(final long targetSize) {
return (1.0*bpProcessed) / targetSize;
}
if ( progressMeter != null ) progressMeter.printProgress(loc);
}
}

View File

@ -0,0 +1,367 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import com.google.java.contract.Ensures;
import com.google.java.contract.Invariant;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.ReadMetrics;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* A progress meter that prints a few key metrics to a logger and optionally to a file
*
* Metrics include:
* -- Number of processed X (X = traversal units)
* -- Runtime per.1M X
* -- Percent of regions to be processed completed
* -- The estimated total runtime based on previous performance
* -- The estimated time remaining for the entire process
*
* The optional file log an expanded set of metrics in tabular format
* suitable for subsequent analysis in R.
*
* This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent
* threads executing traversals will be calling printProgress() simultaneously and this
* class does (and MUST) properly sort out the timings of logs without interlacing outputs
* because of these threads.
*
* Consequently, the fundamental model for when to print the logs is time based. We basically
* print a meter message every X seconds, minutes, hours, whatever is appropriate based on the
* estimated remaining runtime.
*
* @author depristo
* @since 2010 maybe, but written in 09/12 for clarity
*/
@Invariant({
"targetSizeInBP >= 0",
"progressPrintFrequency > 0"
})
public class TraversalProgressMeter {
protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class);
// --------------------------------------------------------------------------------
// static constants controlling overall system behavior
// --------------------------------------------------------------------------------
/**
* Min. milliseconds after we start up the meter before we will print our first meter message
*/
private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000;
/**
* How often should we print performance logging information, when we are sending this
* information to a file? Not dynamically updated as the logger meter is.
*/
private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000;
private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
// --------------------------------------------------------------------------------
// Variables we updating during running
// --------------------------------------------------------------------------------
/**
* When was the last time we printed progress log? In milleseconds
*/
private long lastProgressPrintTime = -1;
/**
* How frequently should we be printing our meter messages? Dynamically updated
* depending on how long we think the run has left.
*/
private long progressPrintFrequency = 10 * 1000; // default value
/**
* When was the last time we printed to the performance log? In millseconds
*/
private long lastPerformanceLogPrintTime = -1;
// --------------------------------------------------------------------------------
// final variables fixed at object creation time
// --------------------------------------------------------------------------------
/**
* The set of genome locs describing the total region we are processing with
* this GATK run. Used to determine how close we are to completing the run
*/
private final GenomeLocSortedSet regionsBeingProcessed;
/**
* Size, in bp, of the area we are processing, derived from regionsBeingProcessed.
* Updated once in the system in initial for performance reasons
*/
private final long targetSizeInBP;
/**
* Used to get the total number of records we've processed so far.
*/
final ReadMetrics cumulativeMetrics;
/**
* A string describing the type of this traversal, so we can say things like
* "we are running at X traversalType per second"
*/
private final String traversalType;
/**
* A potentially null file where we print a supplementary, R readable performance log
* file.
*/
private final PrintStream performanceLog;
/** We use the SimpleTimer to time our run */
private final SimpleTimer timer = new SimpleTimer("Traversal");
/**
* Create a new TraversalProgressMeter
*
* @param cumulativeMetrics the object where the shared traversal counts are being updated
* @param performanceLogFile an optional performance log file where a table of performance logs will be written
* @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits
* @param processingIntervals the intervals being processed
*/
public TraversalProgressMeter(final ReadMetrics cumulativeMetrics,
final File performanceLogFile,
final String traversalUnits,
final GenomeLocSortedSet processingIntervals) {
if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!");
if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null");
if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null");
this.cumulativeMetrics = cumulativeMetrics;
this.traversalType = traversalUnits;
this.regionsBeingProcessed = processingIntervals;
// setup the performance logger output, if requested by the GATK engine
if ( performanceLogFile != null ) {
try {
this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile));
final List<String> pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed",
"bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining");
performanceLog.println(Utils.join("\t", pLogHeader));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e);
}
} else {
performanceLog = null;
}
// cached for performance reasons
targetSizeInBP = processingIntervals.coveredSize();
// start up the timer
start();
}
/**
* Forward request to printProgress
*
* Assumes that one cycle has been completed
*
* @param loc the location
*/
public void printProgress(final GenomeLoc loc) {
// A bypass is inserted here for unit testing.
printProgress(loc, false);
}
private synchronized void start() {
timer.start();
lastProgressPrintTime = timer.currentTime();
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
"Location", traversalType, traversalType));
}
/**
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.
*
* Synchronized to ensure that even with multiple threads calling printProgress we still
* get one clean stream of meter logs.
*
* @param loc Current location, can be null if you are at the end of the traversal
* @param mustPrint If true, will print out info, regardless of time interval
*/
private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) {
final long curTime = timer.currentTime();
final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
if ( printProgress || printLog ) {
final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics);
final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds);
final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP());
final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements());
final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP);
final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted);
final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds());
if ( printProgress ) {
lastProgressPrintTime = curTime;
updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds());
// a pretty name for our position
final String posName = loc == null
? (mustPrint ? "done" : "unmapped reads")
: String.format("%s:%d", loc.getContig(), loc.getStart());
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
posName, progressData.unitsProcessed*1.0, elapsed, unitRate,
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
}
if ( printLog ) {
lastPerformanceLogPrintTime = curTime;
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(),
progressData.bpProcessed, bpRate.getTimeInSeconds(),
fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(),
timeToCompletion.getTimeInSeconds());
}
}
}
/**
* Determine, based on remaining runtime, how often to print the meter
*
* @param totalRuntimeSeconds kinda obvious, no?
*/
private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) {
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS )
progressPrintFrequency = 60 * 1000; // in milliseconds
else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS )
progressPrintFrequency = 30 * 1000; // in milliseconds
else
progressPrintFrequency = 10 * 1000; // in milliseconds
}
/**
* Creates a new ProgressData object recording a snapshot of our progress at this instant
*
* @param loc our current position. If null, assumes we are done traversing
* @param metrics information about what's been processed already
* @return
*/
private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) {
final long nRecords = metrics.getNumIterations();
// null -> end of processing
final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc);
return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed);
}
/**
* Called after a traversal to print out information about the traversal process
*/
public void printOnDone() {
printProgress(null, true);
final double elapsed = timer == null ? 0 : timer.getElapsedTime();
// count up the number of skipped reads by summing over all filters
long nSkippedReads = 0L;
for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values())
nSkippedReads += countsByFilter;
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600));
// TODO -- move into MicroScheduler
if ( cumulativeMetrics.getNumReadsSeen() > 0 )
logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)",
nSkippedReads,
cumulativeMetrics.getNumReadsSeen(),
100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen())));
for ( Map.Entry<String, Long> filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) {
long count = filterCounts.getValue();
logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s",
count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey()));
}
if ( performanceLog != null ) performanceLog.close();
}
/**
* @param curTime (current runtime, in millisecs)
* @param lastPrintTime the last time we printed, in machine milliseconds
* @param printFreq maximum permitted difference between last print and current times
*
* @return true if the maximum interval (in millisecs) has passed since the last printing
*/
private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) {
final long elapsed = curTime - lastPrintTime;
return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS;
}
/**
* a snapshot of our performance, suitable for storage and later analysis
*/
private static class ProgressData {
final double elapsedSeconds;
final long unitsProcessed;
final long bpProcessed;
@Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"})
public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) {
this.elapsedSeconds = elapsedSeconds;
this.unitsProcessed = unitsProcessed;
this.bpProcessed = bpProcessed;
}
/** How long in seconds to process 1M traversal units? */
@Ensures("result >= 0.0")
private double secondsPerMillionElements() {
return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1);
}
/** How long in seconds to process 1M bp on the genome? */
@Ensures("result >= 0.0")
private double secondsPerMillionBP() {
return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1);
}
/** What fraction of the target intervals have we covered? */
@Requires("targetSize >= 0")
@Ensures({"result >= 0.0", "result <= 1.0"})
private double calculateFractionGenomeTargetCompleted(final long targetSize) {
return (1.0*bpProcessed) / Math.max(targetSize, 1);
}
}
}

View File

@ -36,7 +36,7 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "active regions";
}

View File

@ -54,7 +54,7 @@ public class TraverseDuplicates<M,T> extends TraversalEngine<M,T,DuplicateWalker
private final boolean DEBUG = false;
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "dups";
}

View File

@ -20,7 +20,7 @@ public abstract class TraverseLociBase<M,T> extends TraversalEngine<M,T,LocusWal
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
@Override
protected final String getTraversalType() {
public final String getTraversalUnits() {
return "sites";
}

View File

@ -92,9 +92,8 @@ public class TraverseLociNano<M,T> extends TraverseLociBase<M,T> {
}
@Override
public void printOnTraversalDone() {
public void shutdown() {
nanoScheduler.shutdown();
super.printOnTraversalDone();
}
/**

View File

@ -27,7 +27,7 @@ public class TraverseReadPairs<M,T> extends TraversalEngine<M,T, ReadPairWalker<
protected static final Logger logger = Logger.getLogger(TraverseReadPairs.class);
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "read pairs";
}

View File

@ -50,7 +50,7 @@ public class TraverseReads<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,Read
protected static final Logger logger = Logger.getLogger(TraverseReads.class);
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "reads";
}

View File

@ -65,7 +65,7 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
}
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "reads";
}
@ -135,9 +135,8 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
}
@Override
public void printOnTraversalDone() {
public void shutdown() {
nanoScheduler.shutdown();
super.printOnTraversalDone();
}
/**

View File

@ -69,7 +69,7 @@ public class ArtificialReadsTraversal<M,T> extends TraversalEngine<M,T,Walker<M,
}
@Override
protected String getTraversalType() {
public String getTraversalUnits() {
return "reads";
}

View File

@ -121,8 +121,6 @@ public class TraverseReadsUnitTest extends BaseTest {
Object accumulator = countReadWalker.reduceInit();
for(Shard shard: shardStrategy) {
traversalEngine.startTimersIfNecessary();
if (shard == null) {
fail("Shard == null");
}