diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 0e8208594..3e843de3e 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -28,6 +28,7 @@ package org.broadinstitute.sting.gatk.executive; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; +import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; @@ -37,8 +38,10 @@ import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; +import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; @@ -47,6 +50,7 @@ import javax.management.ObjectName; import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; +import java.util.Map; /** @@ -90,7 +94,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; - final TraversalProgressMeter progressMeter; + final ProgressMeter progressMeter; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the @@ -176,8 +180,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; - this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile, - traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + this.progressMeter = new ProgressMeter(progressLogFile, + traversalEngine.getTraversalUnits(), + engine.getRegionsOfGenomeBeingProcessed()); traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. @@ -241,7 +246,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * Must be called by subclasses when execute is done */ protected void executionIsDone() { - progressMeter.printOnDone(); + progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); + printReadFilteringStats(); // TODO -- generalize to all local thread copies traversalEngine.shutdown(); @@ -254,6 +260,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Prints out information about number of reads observed and filtering, if any reads were used in the traversal + * + * Looks like: + * + * INFO 10:40:47,370 MicroScheduler - 22 reads were filtered out during traversal out of 101 total (21.78%) + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing BadMateFilter + * INFO 10:40:47,370 MicroScheduler - -> 20 reads (19.80% of total) failing DuplicateReadFilter + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing FailsVendorQualityCheckFilter + */ + private void printReadFilteringStats() { + final ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) { + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads, cumulativeMetrics.getNumReadsSeen()))); + + for ( final Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + } + } + /** * Gets the engine that created this microscheduler. * @return The engine owning this microscheduler. diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 159343bf8..668bddcca 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -32,13 +32,14 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); protected GenomeAnalysisEngine engine; - private TraversalProgressMeter progressMeter; + private ProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // @@ -72,7 +73,7 @@ public abstract class TraversalEngine,Provide * @param engine GenomeAnalysisEngine for this traversal * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) { + public void initialize(final GenomeAnalysisEngine engine, final ProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); @@ -118,14 +119,15 @@ public abstract class TraversalEngine,Provide } /** - * Forward request to printProgress + * Forward request to notifyOfProgress * * Assumes that one cycle has been completed * * @param loc the location */ public void printProgress(final GenomeLoc loc) { - if ( progressMeter != null ) progressMeter.printProgress(loc); + if ( progressMeter != null ) + progressMeter.notifyOfProgress(loc, engine.getCumulativeMetrics().getNumIterations()); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java similarity index 65% rename from public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java rename to public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java index 72f20fb0c..69cf52fc2 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java @@ -22,13 +22,10 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -package org.broadinstitute.sting.gatk.traversals; +package org.broadinstitute.sting.utils.progressmeter; -import com.google.java.contract.Ensures; import com.google.java.contract.Invariant; -import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -38,13 +35,17 @@ import java.io.FileOutputStream; import java.io.PrintStream; import java.util.Arrays; import java.util.List; -import java.util.Map; /** - * A progress meter that prints a few key metrics to a logger and optionally to a file + * A meter measuring progress on a calculation through a set of genomic regions that can + * print a few key metrics to a logger and optionally to a file * - * Metrics include: - * -- Number of processed X (X = traversal units) + * The key information for assessing progress is a set of genome locs describing the total + * set of regions we will process. Whenever (at reasonable intervals) the processing unit + * can called notifyOfProgress and this logger may, depending on the metering delay, print + * a log message with the following metrics: + * + * -- Number of processed X (X = processing units) * -- Runtime per.1M X * -- Percent of regions to be processed completed * -- The estimated total runtime based on previous performance @@ -54,7 +55,7 @@ import java.util.Map; * suitable for subsequent analysis in R. * * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent - * threads executing traversals will be calling printProgress() simultaneously and this + * threads executing processors will be calling notifyOfProgress() simultaneously and this * class does (and MUST) properly sort out the timings of logs without interlacing outputs * because of these threads. * @@ -69,8 +70,8 @@ import java.util.Map; "targetSizeInBP >= 0", "progressPrintFrequency > 0" }) -public class TraversalProgressMeter { - protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class); +public class ProgressMeter { + protected static final Logger logger = Logger.getLogger(ProgressMeter.class); // -------------------------------------------------------------------------------- // static constants controlling overall system behavior @@ -127,15 +128,10 @@ public class TraversalProgressMeter { private final long targetSizeInBP; /** - * Used to get the total number of records we've processed so far. + * A string describing the type of units being processes, so we can say things like + * "we are running at X processingUnitName per second" */ - final ReadMetrics cumulativeMetrics; - - /** - * A string describing the type of this traversal, so we can say things like - * "we are running at X traversalType per second" - */ - private final String traversalType; + private final String processingUnitName; /** * A potentially null file where we print a supplementary, R readable performance log @@ -144,29 +140,25 @@ public class TraversalProgressMeter { private final PrintStream performanceLog; /** We use the SimpleTimer to time our run */ - private final SimpleTimer timer = new SimpleTimer("Traversal"); + private final SimpleTimer timer = new SimpleTimer(); /** - * Create a new TraversalProgressMeter + * Create a new ProgressMeter * - * @param cumulativeMetrics the object where the shared traversal counts are being updated * @param performanceLogFile an optional performance log file where a table of performance logs will be written - * @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits + * @param processingUnitName the name of the unit type being processed, suitable for saying X seconds per processingUnitName * @param processingIntervals the intervals being processed */ - public TraversalProgressMeter(final ReadMetrics cumulativeMetrics, - final File performanceLogFile, - final String traversalUnits, - final GenomeLocSortedSet processingIntervals) { - if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!"); - if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null"); + public ProgressMeter(final File performanceLogFile, + final String processingUnitName, + final GenomeLocSortedSet processingIntervals) { + if ( processingUnitName == null ) throw new IllegalArgumentException("processingUnitName cannot be null"); if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); - this.cumulativeMetrics = cumulativeMetrics; - this.traversalType = traversalUnits; + this.processingUnitName = processingUnitName; this.regionsBeingProcessed = processingIntervals; - // setup the performance logger output, if requested by the GATK engine + // setup the performance logger output, if requested if ( performanceLogFile != null ) { try { this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); @@ -188,45 +180,48 @@ public class TraversalProgressMeter { } /** - * Forward request to printProgress + * Forward request to notifyOfProgress * * Assumes that one cycle has been completed * - * @param loc the location + * @param loc our current location. Null means "in unmapped reads" + * @param nTotalRecordsProcessed the total number of records we've processed */ - public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); + public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) { + notifyOfProgress(loc, false, nTotalRecordsProcessed); } private synchronized void start() { timer.start(); lastProgressPrintTime = timer.currentTime(); - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]"); logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", traversalType, traversalType)); + "Location", processingUnitName, processingUnitName)); } /** * Utility routine that prints out process information (including timing) every N records or * every M seconds, for N and M set in global variables. * - * Synchronized to ensure that even with multiple threads calling printProgress we still + * Synchronized to ensure that even with multiple threads calling notifyOfProgress we still * get one clean stream of meter logs. * - * @param loc Current location, can be null if you are at the end of the traversal + * @param loc Current location, can be null if you are at the end of the processing unit * @param mustPrint If true, will print out info, regardless of time interval + * @param nTotalRecordsProcessed the total number of records we've processed */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { + private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) { + if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0"); + final long curTime = timer.currentTime(); final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); if ( printProgress || printLog ) { - final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics); + final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed); - final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds); + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds()); final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); @@ -243,7 +238,7 @@ public class TraversalProgressMeter { : String.format("%s:%d", loc.getContig(), loc.getStart()); logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, progressData.unitsProcessed*1.0, elapsed, unitRate, + posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate, 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); } @@ -251,8 +246,8 @@ public class TraversalProgressMeter { if ( printLog ) { lastPerformanceLogPrintTime = curTime; performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(), - progressData.bpProcessed, bpRate.getTimeInSeconds(), + elapsed.getTimeInSeconds(), progressData.getUnitsProcessed(), unitRate.getTimeInSeconds(), + progressData.getBpProcessed(), bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), timeToCompletion.getTimeInSeconds()); } @@ -278,44 +273,27 @@ public class TraversalProgressMeter { * Creates a new ProgressData object recording a snapshot of our progress at this instant * * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already + * @param nTotalRecordsProcessed the total number of records we've processed * @return */ - private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) { - final long nRecords = metrics.getNumIterations(); + private ProgressMeterData takeProgressSnapshot(final GenomeLoc loc, final long nTotalRecordsProcessed) { // null -> end of processing final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); - return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed); + return new ProgressMeterData(timer.getElapsedTime(), nTotalRecordsProcessed, bpProcessed); } /** - * Called after a traversal to print out information about the traversal process + * Should be called when processing is done */ - public void printOnDone() { - printProgress(null, true); + public void notifyDone(final long nTotalRecordsProcessed) { + // print out the progress meter + notifyOfProgress(null, true, nTotalRecordsProcessed); - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", + timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600)); - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - - // TODO -- move into MicroScheduler - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); + if ( performanceLog != null ) + performanceLog.close(); } /** @@ -329,39 +307,4 @@ public class TraversalProgressMeter { final long elapsed = curTime - lastPrintTime; return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; } - - /** - * a snapshot of our performance, suitable for storage and later analysis - */ - private static class ProgressData { - final double elapsedSeconds; - final long unitsProcessed; - final long bpProcessed; - - @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) - public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - @Ensures("result >= 0.0") - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - @Ensures("result >= 0.0") - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fraction of the target intervals have we covered? */ - @Requires("targetSize >= 0") - @Ensures({"result >= 0.0", "result <= 1.0"}) - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / Math.max(targetSize, 1); - } - } } diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java new file mode 100644 index 000000000..096b55be2 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java @@ -0,0 +1,54 @@ +package org.broadinstitute.sting.utils.progressmeter; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; + +/** + * a snapshot of our performance, suitable for storage and later analysis + */ +class ProgressMeterData { + private final double elapsedSeconds; + private final long unitsProcessed; + private final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressMeterData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + @Ensures("result >= 0.0") + public double getElapsedSeconds() { + return elapsedSeconds; + } + + @Ensures("result >= 0") + public long getUnitsProcessed() { + return unitsProcessed; + } + + @Ensures("result >= 0") + public long getBpProcessed() { + return bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + public double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + public double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + public double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } +}