diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 3ce8a92b7..516ea8451 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -24,6 +24,7 @@ package org.broadinstitute.sting.gatk; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.samtools.SAMFileHeader; @@ -682,14 +683,14 @@ public class GenomeAnalysisEngine { // if include argument isn't given, create new set of all possible intervals - Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( + final Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( this.referenceDataSource, argCollection.intervals, argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding, argCollection.excludeIntervals); - GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); - GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); + final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); + final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); // if no exclude arguments, can return parseIntervalArguments directly if ( excludeSortedSet == null ) @@ -700,13 +701,15 @@ public class GenomeAnalysisEngine { intervals = includeSortedSet.subtractRegions(excludeSortedSet); // logging messages only printed when exclude (-XL) arguments are given - long toPruneSize = includeSortedSet.coveredSize(); - long toExcludeSize = excludeSortedSet.coveredSize(); - long intervalSize = intervals.coveredSize(); + final long toPruneSize = includeSortedSet.coveredSize(); + final long toExcludeSize = excludeSortedSet.coveredSize(); + final long intervalSize = intervals.coveredSize(); logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize)); logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)", toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize))); } + + logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize())); } /** @@ -981,6 +984,22 @@ public class GenomeAnalysisEngine { return this.intervals; } + /** + * Get the list of regions of the genome being processed. If the user + * requested specific intervals, return those, otherwise return regions + * corresponding to the entire genome. Never returns null. + * + * @return a non-null set of intervals being processed + */ + @Ensures("result != null") + public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() { + if ( getIntervals() == null ) + // if we don't have any intervals defined, create intervals from the reference itself + return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary()); + else + return getIntervals(); + } + /** * Gets the list of filters employed by this engine. * @return Collection of filters (actual instances) used by this engine. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index f1d2f7b5b..486e83e60 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar outputTracker.bypassThreadLocalStorage(true); try { walker.onTraversalDone(result); - printOnTraversalDone(result); } finally { outputTracker.bypassThreadLocalStorage(false); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 530285db0..87d0ad721 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive; * An interface for retrieving runtime statistics about how the hierarchical * microscheduler is behaving. */ -public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { +public interface HierarchicalMicroSchedulerMBean { /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue? diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index ceb4a6f9b..697e908fd 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -60,7 +60,6 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; - traversalEngine.startTimersIfNecessary(); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -95,8 +94,6 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = accumulator.finishTraversal(); - printOnTraversalDone(result); - outputTracker.close(); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index c6ef9acf1..0e8208594 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; @@ -89,6 +90,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + final TraversalProgressMeter progressMeter; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -170,9 +173,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { traversalEngine = new TraverseActiveRegions(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } + } - traversalEngine.initialize(engine); + final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; + this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile, + traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -231,18 +237,15 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator(); } - /** - * Print summary information for the analysis. - * @param sum The final reduce output. - */ - protected void printOnTraversalDone(Object sum) { - traversalEngine.printOnTraversalDone(); - } - /** * Must be called by subclasses when execute is done */ protected void executionIsDone() { + progressMeter.printOnDone(); + + // TODO -- generalize to all local thread copies + traversalEngine.shutdown(); + // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { // include the master thread information @@ -269,38 +272,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public IndexedFastaSequenceFile getReference() { return reference; } - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - return traversalEngine.getPerformanceLogFileName(); - } - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - traversalEngine.setPerformanceLogFileName(fileName); - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return traversalEngine.getPerformanceProgressPrintFrequencySeconds(); - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds); - } - protected void cleanup() { try { mBeanServer.unregisterMBean(mBeanName); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java index e510822b8..8be6b0b62 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java @@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive; * To change this template use File | Settings | File Templates. */ public interface MicroSchedulerMBean { - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName(); - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName); - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds(); - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds); + // has nothing because we don't have anything we currently track } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index aefa9c12d..790c6b3ed 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; /** * User: hanna * Date: Apr 29, 2009 @@ -56,7 +55,6 @@ public class ShardTraverser implements Callable { public Object call() { try { - traversalEngine.startTimersIfNecessary(); final long startTime = System.currentTimeMillis(); Object accumulator = walker.reduceInit(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 8c617e4dc..159343bf8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -30,66 +30,28 @@ import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); - // Time in milliseconds since we initialized this engine - private static final int HISTORY_WINDOW_SIZE = 50; - - /** lock object to sure updates to history are consistent across threads */ - private static final Object lock = new Object(); - LinkedList history = new LinkedList(); - - /** We use the SimpleTimer to time our run */ - private SimpleTimer timer = null; - - // How long can we go without printing some progress info? - private long lastProgressPrintTime = -1; // When was the last time we printed progress log? - - private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds - private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; - private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; - private long progressPrintFrequency = 10 * 1000; // in milliseconds - private boolean progressMeterInitialized = false; - - // for performance log - private static final boolean PERFORMANCE_LOG_ENABLED = true; - private final Object performanceLogLock = new Object(); - private File performanceLogFile; - private PrintStream performanceLog = null; - private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? - private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds - - /** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */ - long targetSize = -1; - GenomeLocSortedSet targetIntervals = null; - protected GenomeAnalysisEngine engine; + private TraversalProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // // ABSTRACT METHODS // // ---------------------------------------------------------------------------------------------------- + /** - * Gets the named traversal type associated with the given traversal. + * Gets the named traversal type associated with the given traversal, such as loci, reads, etc. + * * @return A user-friendly name for the given traversal type. */ - protected abstract String getTraversalType(); + public abstract String getTraversalUnits(); /** * this method must be implemented by all traversal engines @@ -104,70 +66,36 @@ public abstract class TraversalEngine,Provide ProviderType dataProvider, T sum); - // ---------------------------------------------------------------------------------------------------- - // - // Common timing routines - // - // ---------------------------------------------------------------------------------------------------- /** * Initialize the traversal engine. After this point traversals can be run over the data + * * @param engine GenomeAnalysisEngine for this traversal + * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(GenomeAnalysisEngine engine) { + public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); this.engine = engine; - - if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - synchronized(this.performanceLogLock) { - performanceLogFile = engine.getArguments().performanceLog; - createNewPerformanceLog(); - } - } - - // if we don't have any intervals defined, create intervals from the reference itself - if ( this.engine.getIntervals() == null ) - targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary()); - else - targetIntervals = this.engine.getIntervals(); - targetSize = targetIntervals.coveredSize(); - } - - private void createNewPerformanceLog() { - synchronized(performanceLogLock) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); - performanceLog.println(Utils.join("\t", pLogHeader)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); - } - } - } - /** - * Should be called to indicate that we're going to process records and the timer should start ticking. This - * function should be called right before any traversal work is done, to avoid counting setup costs in the - * processing costs and inflating the estimated runtime. - */ - public void startTimersIfNecessary() { - if ( timer == null ) { - timer = new SimpleTimer("Traversal"); - timer.start(); - lastProgressPrintTime = timer.currentTime(); - } + this.progressMeter = progressMeter; } /** - * @param curTime (current runtime, in millisecs) - * @param lastPrintTime the last time we printed, in machine milliseconds - * @param printFreq maximum permitted difference between last print and current times + * For testing only. Does not initialize the progress meter * - * @return true if the maximum interval (in millisecs) has passed since the last printing + * @param engine */ - private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { - long elapsed = curTime - lastPrintTime; - return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + protected void initialize(final GenomeAnalysisEngine engine) { + initialize(engine, null); + } + + /** + * Called by the MicroScheduler when all work is done and the GATK is shutting down. + * + * To be used by subclasses that need to free up resources (such as threads) + */ + public void shutdown() { + // by default there's nothing to do } /** @@ -197,194 +125,7 @@ public abstract class TraversalEngine,Provide * @param loc the location */ public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); - } - - /** - * Utility routine that prints out process information (including timing) every N records or - * every M seconds, for N and M set in global variables. - * - * @param loc Current location, can be null if you are at the end of the traversal - * @param mustPrint If true, will print out info, regardless of nRecords or time interval - */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { - if( ! progressMeterInitialized ) { - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); - logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", getTraversalType(), getTraversalType())); - progressMeterInitialized = true; - } - - final long curTime = timer.currentTime(); - boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); - boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); - - if ( printProgress || printLog ) { - final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics()); - - final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds); - final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP()); - final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements()); - final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize); - final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); - final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); - final long nRecords = engine.getCumulativeMetrics().getNumIterations(); - - if ( printProgress ) { - lastProgressPrintTime = curTime; - - // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates - if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS ) - progressPrintFrequency = 60 * 1000; // in milliseconds - else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS ) - progressPrintFrequency = 30 * 1000; // in milliseconds - else - progressPrintFrequency = 10 * 1000; // in milliseconds - - final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart()); - logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, nRecords*1.0, elapsed, unitRate, - 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); - - } - - if ( printLog ) { - lastPerformanceLogPrintTime = curTime; - synchronized(performanceLogLock) { - performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed, - bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), - timeToCompletion.getTimeInSeconds()); - } - } - } - } - - /** - * Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the - * history isn't used in any way, but in the future it'll become valuable for more accurate estimates - * for when a process will complete. - * - * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already - * @return - */ - private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) { - synchronized (lock) { - if ( history.size() > HISTORY_WINDOW_SIZE ) - history.pop(); - - long nRecords = metrics.getNumIterations(); - long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing - history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed)); - - return history.getLast(); - } - } - - /** - * Called after a traversal to print out information about the traversal process - */ - public void printOnTraversalDone() { - printProgress(null, true); - - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); - - ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); - - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); - } - - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - synchronized(performanceLogLock) { - return performanceLogFile.getAbsolutePath(); - } - } - - /** - * Sets the filename of the log for performance. If set, will write performance data. - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - File file = new File(fileName); - - synchronized(performanceLogLock) { - // Ignore multiple calls to reset the same lock. - if(performanceLogFile != null && performanceLogFile.equals(file)) - return; - - // Close an existing log - if(performanceLog != null) performanceLog.close(); - - performanceLogFile = file; - createNewPerformanceLog(); - } - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return progressPrintFrequency; - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - progressPrintFrequency = seconds; - } - - private static class ProcessingHistory { - double elapsedSeconds; - long unitsProcessed; - long bpProcessed; - GenomeLoc loc; - - public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.loc = loc; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fractoin of the target intervals have we covered? */ - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / targetSize; - } + if ( progressMeter != null ) progressMeter.printProgress(loc); } } + diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java new file mode 100755 index 000000000..72f20fb0c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.traversals; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.ReadMetrics; +import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * A progress meter that prints a few key metrics to a logger and optionally to a file + * + * Metrics include: + * -- Number of processed X (X = traversal units) + * -- Runtime per.1M X + * -- Percent of regions to be processed completed + * -- The estimated total runtime based on previous performance + * -- The estimated time remaining for the entire process + * + * The optional file log an expanded set of metrics in tabular format + * suitable for subsequent analysis in R. + * + * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent + * threads executing traversals will be calling printProgress() simultaneously and this + * class does (and MUST) properly sort out the timings of logs without interlacing outputs + * because of these threads. + * + * Consequently, the fundamental model for when to print the logs is time based. We basically + * print a meter message every X seconds, minutes, hours, whatever is appropriate based on the + * estimated remaining runtime. + * + * @author depristo + * @since 2010 maybe, but written in 09/12 for clarity + */ +@Invariant({ + "targetSizeInBP >= 0", + "progressPrintFrequency > 0" +}) +public class TraversalProgressMeter { + protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class); + + // -------------------------------------------------------------------------------- + // static constants controlling overall system behavior + // -------------------------------------------------------------------------------- + + /** + * Min. milliseconds after we start up the meter before we will print our first meter message + */ + private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; + + /** + * How often should we print performance logging information, when we are sending this + * information to a file? Not dynamically updated as the logger meter is. + */ + private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000; + + private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; + private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; + + // -------------------------------------------------------------------------------- + // Variables we updating during running + // -------------------------------------------------------------------------------- + + /** + * When was the last time we printed progress log? In milleseconds + */ + private long lastProgressPrintTime = -1; + + /** + * How frequently should we be printing our meter messages? Dynamically updated + * depending on how long we think the run has left. + */ + private long progressPrintFrequency = 10 * 1000; // default value + + /** + * When was the last time we printed to the performance log? In millseconds + */ + private long lastPerformanceLogPrintTime = -1; + + // -------------------------------------------------------------------------------- + // final variables fixed at object creation time + // -------------------------------------------------------------------------------- + + /** + * The set of genome locs describing the total region we are processing with + * this GATK run. Used to determine how close we are to completing the run + */ + private final GenomeLocSortedSet regionsBeingProcessed; + + /** + * Size, in bp, of the area we are processing, derived from regionsBeingProcessed. + * Updated once in the system in initial for performance reasons + */ + private final long targetSizeInBP; + + /** + * Used to get the total number of records we've processed so far. + */ + final ReadMetrics cumulativeMetrics; + + /** + * A string describing the type of this traversal, so we can say things like + * "we are running at X traversalType per second" + */ + private final String traversalType; + + /** + * A potentially null file where we print a supplementary, R readable performance log + * file. + */ + private final PrintStream performanceLog; + + /** We use the SimpleTimer to time our run */ + private final SimpleTimer timer = new SimpleTimer("Traversal"); + + /** + * Create a new TraversalProgressMeter + * + * @param cumulativeMetrics the object where the shared traversal counts are being updated + * @param performanceLogFile an optional performance log file where a table of performance logs will be written + * @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits + * @param processingIntervals the intervals being processed + */ + public TraversalProgressMeter(final ReadMetrics cumulativeMetrics, + final File performanceLogFile, + final String traversalUnits, + final GenomeLocSortedSet processingIntervals) { + if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!"); + if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null"); + if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); + + this.cumulativeMetrics = cumulativeMetrics; + this.traversalType = traversalUnits; + this.regionsBeingProcessed = processingIntervals; + + // setup the performance logger output, if requested by the GATK engine + if ( performanceLogFile != null ) { + try { + this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); + final List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", + "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e); + } + } else { + performanceLog = null; + } + + // cached for performance reasons + targetSizeInBP = processingIntervals.coveredSize(); + + // start up the timer + start(); + } + + /** + * Forward request to printProgress + * + * Assumes that one cycle has been completed + * + * @param loc the location + */ + public void printProgress(final GenomeLoc loc) { + // A bypass is inserted here for unit testing. + printProgress(loc, false); + } + + private synchronized void start() { + timer.start(); + lastProgressPrintTime = timer.currentTime(); + + logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", + "Location", traversalType, traversalType)); + } + + /** + * Utility routine that prints out process information (including timing) every N records or + * every M seconds, for N and M set in global variables. + * + * Synchronized to ensure that even with multiple threads calling printProgress we still + * get one clean stream of meter logs. + * + * @param loc Current location, can be null if you are at the end of the traversal + * @param mustPrint If true, will print out info, regardless of time interval + */ + private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { + final long curTime = timer.currentTime(); + final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); + final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); + + if ( printProgress || printLog ) { + final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics); + + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds); + final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); + final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); + final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); + final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); + final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); + + if ( printProgress ) { + lastProgressPrintTime = curTime; + updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds()); + + // a pretty name for our position + final String posName = loc == null + ? (mustPrint ? "done" : "unmapped reads") + : String.format("%s:%d", loc.getContig(), loc.getStart()); + + logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", + posName, progressData.unitsProcessed*1.0, elapsed, unitRate, + 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); + + } + + if ( printLog ) { + lastPerformanceLogPrintTime = curTime; + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(), + progressData.bpProcessed, bpRate.getTimeInSeconds(), + fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), + timeToCompletion.getTimeInSeconds()); + } + } + } + + /** + * Determine, based on remaining runtime, how often to print the meter + * + * @param totalRuntimeSeconds kinda obvious, no? + */ + private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) { + // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates + if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS ) + progressPrintFrequency = 60 * 1000; // in milliseconds + else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS ) + progressPrintFrequency = 30 * 1000; // in milliseconds + else + progressPrintFrequency = 10 * 1000; // in milliseconds + } + + /** + * Creates a new ProgressData object recording a snapshot of our progress at this instant + * + * @param loc our current position. If null, assumes we are done traversing + * @param metrics information about what's been processed already + * @return + */ + private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) { + final long nRecords = metrics.getNumIterations(); + // null -> end of processing + final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); + return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed); + } + + /** + * Called after a traversal to print out information about the traversal process + */ + public void printOnDone() { + printProgress(null, true); + + final double elapsed = timer == null ? 0 : timer.getElapsedTime(); + + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); + + // TODO -- move into MicroScheduler + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); + for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + + if ( performanceLog != null ) performanceLog.close(); + } + + /** + * @param curTime (current runtime, in millisecs) + * @param lastPrintTime the last time we printed, in machine milliseconds + * @param printFreq maximum permitted difference between last print and current times + * + * @return true if the maximum interval (in millisecs) has passed since the last printing + */ + private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { + final long elapsed = curTime - lastPrintTime; + return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + } + + /** + * a snapshot of our performance, suitable for storage and later analysis + */ + private static class ProgressData { + final double elapsedSeconds; + final long unitsProcessed; + final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + private double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + private double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + private double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index bbd9346b3..2b7b2f9f5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -36,7 +36,7 @@ public class TraverseActiveRegions extends TraversalEngine myReads = new LinkedHashSet(); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "active regions"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 2b45d894c..2e43ef8f8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -54,7 +54,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine extends TraverseLociBase { } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 9b076fce4..aef3cf7d0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -27,7 +27,7 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read protected static final Logger logger = Logger.getLogger(TraverseReads.class); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index b3a0a1390..77ab0c891 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -65,7 +65,7 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } @@ -135,9 +135,8 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java index 475f7de21..9632a687b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java +++ b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java @@ -69,7 +69,7 @@ public class ArtificialReadsTraversal extends TraversalEngine