From 2d4b00833b3d0e26b2bf9d8f016f4001bc86fcce Mon Sep 17 00:00:00 2001 From: Guillermo del Angel Date: Sun, 9 Sep 2012 20:35:45 -0400 Subject: [PATCH 01/14] Bug fix for logging likelihoods in new read allele map: reads which were filtered out were being excluded from map, but they should be included in annotations --- .../LikelihoodCalculationEngine.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java index d04c1a9e2..69af66185 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java @@ -346,20 +346,15 @@ public class LikelihoodCalculationEngine { } } } -/* // add all filtered reads to the NO_CALL list because they weren't given any likelihoods - List readList = alleleReadMap.get(Allele.NO_CALL); - if( readList == null ) { - readList = new ArrayList(); - alleleReadMap.put(Allele.NO_CALL, readList); - } - */ - /* for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) { + // add all filtered reads to the NO_CALL list because they weren't given any likelihoods + for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) { // only count the read if it overlaps the event, otherwise it is not added to the output read list at all if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) { - readList.add(read); + for( final Allele a : call.getFirst().getAlleles() ) + likelihoodMap.add(read,a,0.0); } } - */ + returnMap.put(sample.getKey(), likelihoodMap); } From 4a84ff4fcecadc2db57d265876bee32d034e6fc4 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 8 Sep 2012 19:45:56 -0400 Subject: [PATCH 02/14] Fix a nasty bug in reading GATK reports with a single line -- Old version would break during reading with (as usual) a cryptic error message -- Fixed by avoiding collapsing into a single vector type from a matrix when you subset to a single row. I believe this code confirms thats R is truly the worst programming language ever --- .../sting/utils/R/gsalib/R/gsa.read.gatkreport.R | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R b/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R index 4c228ccb4..eba94c0cb 100644 --- a/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R +++ b/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R @@ -111,7 +111,13 @@ gsa.read.gatkreportv1 <- function(lines) { headerRowCount = -1; finishTable <- function() { - .gsa.assignGATKTableToEnvironment(tableName, tableHeader, tableRows[1:rowCount,], tableEnv); + if ( rowCount == 1 ) + # good I hate R. Work around to avoid collapsing into an unstructured vector when + # there's only 1 row + sub <- t(as.matrix(tableRows[1:rowCount,])) + else + sub <- tableRows[1:rowCount,] + .gsa.assignGATKTableToEnvironment(tableName, tableHeader, sub, tableEnv); } for (line in lines) { From 2e94a0a201f1d024207134b2eed36068300aab0c Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 8 Sep 2012 20:17:15 -0400 Subject: [PATCH 03/14] Refactor TraversalEngine to extract the progress meter functions -- Previously these core progress metering functions were all in TraversalEngine, and available to subclasses like TraverseLoci via inheritance. The problem here is that the upcoming data threads x cpu threads parallelism requires one master copy of the progress metering shared among all traversals, but multiple instantiations of traverse engines themselves. -- Because the progress metering code has horrible anyway, I've refactored and vastly cleaned up and simplified all of these capabilities into TraversalProgressMeter class. I've simplified down the classes it uses to work (STILL SOME TODOs in there) so that it doesn't reach into the core GATK engine all the time. It should be possible to write some nice tests for it now. By making it its own class, it can protect itself from multi-threaded access with a single synchronized printProgress function instead of carrying around multiple lock objects as before -- Cleaned up the start up of the progress meter. It's now handled when the meter is created, so each micro scheduler doesn't have to deal with proper initialization timing any longer -- Simplified and made clear the interface for shutting down the traversal engines. There's no a shutdown method in TraversalEngine that's called once by the MicroScheduler when the entire traversing in over. Nano traversals now properly shut down (was subtle bug I undercovered here). The printing of on traversal done metering is now handled by MicroScheduler -- The MicroScheduler holds the single master copy of the progress meter, and doles it out to the TraversalEngines (currently 1 but in future commit there will be N). -- Added a nice function to GenomeAnalysisEngine that returns the regions we will be processing, either the intervals requested or the whole genome. Useful for progress meter but also probably for other infrastructure as well -- Remove a lot of the sh*ting Bean interface getting and setting in MicroScheduler that's no longer useful. The generic bean is just a shell interface with nothing in it. -- By removing a lot of these bean accessors and setters many things are now final that used to be dynamic. --- .../sting/gatk/GenomeAnalysisEngine.java | 31 +- .../executive/HierarchicalMicroScheduler.java | 1 - .../HierarchicalMicroSchedulerMBean.java | 2 +- .../gatk/executive/LinearMicroScheduler.java | 3 - .../sting/gatk/executive/MicroScheduler.java | 55 +-- .../gatk/executive/MicroSchedulerMBean.java | 24 +- .../sting/gatk/executive/ShardTraverser.java | 2 - .../gatk/traversals/TraversalEngine.java | 309 ++------------- .../traversals/TraversalProgressMeter.java | 367 ++++++++++++++++++ .../traversals/TraverseActiveRegions.java | 2 +- .../gatk/traversals/TraverseDuplicates.java | 2 +- .../gatk/traversals/TraverseLociBase.java | 2 +- .../gatk/traversals/TraverseLociNano.java | 3 +- .../gatk/traversals/TraverseReadPairs.java | 2 +- .../sting/gatk/traversals/TraverseReads.java | 2 +- .../gatk/traversals/TraverseReadsNano.java | 5 +- .../utils/sam/ArtificialReadsTraversal.java | 2 +- .../traversals/TraverseReadsUnitTest.java | 2 - 18 files changed, 441 insertions(+), 375 deletions(-) create mode 100755 public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 3ce8a92b7..516ea8451 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -24,6 +24,7 @@ package org.broadinstitute.sting.gatk; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.samtools.SAMFileHeader; @@ -682,14 +683,14 @@ public class GenomeAnalysisEngine { // if include argument isn't given, create new set of all possible intervals - Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( + final Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( this.referenceDataSource, argCollection.intervals, argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding, argCollection.excludeIntervals); - GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); - GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); + final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); + final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); // if no exclude arguments, can return parseIntervalArguments directly if ( excludeSortedSet == null ) @@ -700,13 +701,15 @@ public class GenomeAnalysisEngine { intervals = includeSortedSet.subtractRegions(excludeSortedSet); // logging messages only printed when exclude (-XL) arguments are given - long toPruneSize = includeSortedSet.coveredSize(); - long toExcludeSize = excludeSortedSet.coveredSize(); - long intervalSize = intervals.coveredSize(); + final long toPruneSize = includeSortedSet.coveredSize(); + final long toExcludeSize = excludeSortedSet.coveredSize(); + final long intervalSize = intervals.coveredSize(); logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize)); logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)", toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize))); } + + logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize())); } /** @@ -981,6 +984,22 @@ public class GenomeAnalysisEngine { return this.intervals; } + /** + * Get the list of regions of the genome being processed. If the user + * requested specific intervals, return those, otherwise return regions + * corresponding to the entire genome. Never returns null. + * + * @return a non-null set of intervals being processed + */ + @Ensures("result != null") + public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() { + if ( getIntervals() == null ) + // if we don't have any intervals defined, create intervals from the reference itself + return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary()); + else + return getIntervals(); + } + /** * Gets the list of filters employed by this engine. * @return Collection of filters (actual instances) used by this engine. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index f1d2f7b5b..486e83e60 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar outputTracker.bypassThreadLocalStorage(true); try { walker.onTraversalDone(result); - printOnTraversalDone(result); } finally { outputTracker.bypassThreadLocalStorage(false); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 530285db0..87d0ad721 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive; * An interface for retrieving runtime statistics about how the hierarchical * microscheduler is behaving. */ -public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { +public interface HierarchicalMicroSchedulerMBean { /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue? diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index ceb4a6f9b..697e908fd 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -60,7 +60,6 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; - traversalEngine.startTimersIfNecessary(); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -95,8 +94,6 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = accumulator.finishTraversal(); - printOnTraversalDone(result); - outputTracker.close(); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index c6ef9acf1..0e8208594 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; @@ -89,6 +90,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + final TraversalProgressMeter progressMeter; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -170,9 +173,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { traversalEngine = new TraverseActiveRegions(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } + } - traversalEngine.initialize(engine); + final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; + this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile, + traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -231,18 +237,15 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator(); } - /** - * Print summary information for the analysis. - * @param sum The final reduce output. - */ - protected void printOnTraversalDone(Object sum) { - traversalEngine.printOnTraversalDone(); - } - /** * Must be called by subclasses when execute is done */ protected void executionIsDone() { + progressMeter.printOnDone(); + + // TODO -- generalize to all local thread copies + traversalEngine.shutdown(); + // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { // include the master thread information @@ -269,38 +272,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public IndexedFastaSequenceFile getReference() { return reference; } - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - return traversalEngine.getPerformanceLogFileName(); - } - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - traversalEngine.setPerformanceLogFileName(fileName); - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return traversalEngine.getPerformanceProgressPrintFrequencySeconds(); - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds); - } - protected void cleanup() { try { mBeanServer.unregisterMBean(mBeanName); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java index e510822b8..8be6b0b62 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java @@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive; * To change this template use File | Settings | File Templates. */ public interface MicroSchedulerMBean { - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName(); - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName); - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds(); - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds); + // has nothing because we don't have anything we currently track } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index aefa9c12d..790c6b3ed 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; /** * User: hanna * Date: Apr 29, 2009 @@ -56,7 +55,6 @@ public class ShardTraverser implements Callable { public Object call() { try { - traversalEngine.startTimersIfNecessary(); final long startTime = System.currentTimeMillis(); Object accumulator = walker.reduceInit(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 8c617e4dc..159343bf8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -30,66 +30,28 @@ import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); - // Time in milliseconds since we initialized this engine - private static final int HISTORY_WINDOW_SIZE = 50; - - /** lock object to sure updates to history are consistent across threads */ - private static final Object lock = new Object(); - LinkedList history = new LinkedList(); - - /** We use the SimpleTimer to time our run */ - private SimpleTimer timer = null; - - // How long can we go without printing some progress info? - private long lastProgressPrintTime = -1; // When was the last time we printed progress log? - - private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds - private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; - private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; - private long progressPrintFrequency = 10 * 1000; // in milliseconds - private boolean progressMeterInitialized = false; - - // for performance log - private static final boolean PERFORMANCE_LOG_ENABLED = true; - private final Object performanceLogLock = new Object(); - private File performanceLogFile; - private PrintStream performanceLog = null; - private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? - private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds - - /** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */ - long targetSize = -1; - GenomeLocSortedSet targetIntervals = null; - protected GenomeAnalysisEngine engine; + private TraversalProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // // ABSTRACT METHODS // // ---------------------------------------------------------------------------------------------------- + /** - * Gets the named traversal type associated with the given traversal. + * Gets the named traversal type associated with the given traversal, such as loci, reads, etc. + * * @return A user-friendly name for the given traversal type. */ - protected abstract String getTraversalType(); + public abstract String getTraversalUnits(); /** * this method must be implemented by all traversal engines @@ -104,70 +66,36 @@ public abstract class TraversalEngine,Provide ProviderType dataProvider, T sum); - // ---------------------------------------------------------------------------------------------------- - // - // Common timing routines - // - // ---------------------------------------------------------------------------------------------------- /** * Initialize the traversal engine. After this point traversals can be run over the data + * * @param engine GenomeAnalysisEngine for this traversal + * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(GenomeAnalysisEngine engine) { + public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); this.engine = engine; - - if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - synchronized(this.performanceLogLock) { - performanceLogFile = engine.getArguments().performanceLog; - createNewPerformanceLog(); - } - } - - // if we don't have any intervals defined, create intervals from the reference itself - if ( this.engine.getIntervals() == null ) - targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary()); - else - targetIntervals = this.engine.getIntervals(); - targetSize = targetIntervals.coveredSize(); - } - - private void createNewPerformanceLog() { - synchronized(performanceLogLock) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); - performanceLog.println(Utils.join("\t", pLogHeader)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); - } - } - } - /** - * Should be called to indicate that we're going to process records and the timer should start ticking. This - * function should be called right before any traversal work is done, to avoid counting setup costs in the - * processing costs and inflating the estimated runtime. - */ - public void startTimersIfNecessary() { - if ( timer == null ) { - timer = new SimpleTimer("Traversal"); - timer.start(); - lastProgressPrintTime = timer.currentTime(); - } + this.progressMeter = progressMeter; } /** - * @param curTime (current runtime, in millisecs) - * @param lastPrintTime the last time we printed, in machine milliseconds - * @param printFreq maximum permitted difference between last print and current times + * For testing only. Does not initialize the progress meter * - * @return true if the maximum interval (in millisecs) has passed since the last printing + * @param engine */ - private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { - long elapsed = curTime - lastPrintTime; - return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + protected void initialize(final GenomeAnalysisEngine engine) { + initialize(engine, null); + } + + /** + * Called by the MicroScheduler when all work is done and the GATK is shutting down. + * + * To be used by subclasses that need to free up resources (such as threads) + */ + public void shutdown() { + // by default there's nothing to do } /** @@ -197,194 +125,7 @@ public abstract class TraversalEngine,Provide * @param loc the location */ public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); - } - - /** - * Utility routine that prints out process information (including timing) every N records or - * every M seconds, for N and M set in global variables. - * - * @param loc Current location, can be null if you are at the end of the traversal - * @param mustPrint If true, will print out info, regardless of nRecords or time interval - */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { - if( ! progressMeterInitialized ) { - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); - logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", getTraversalType(), getTraversalType())); - progressMeterInitialized = true; - } - - final long curTime = timer.currentTime(); - boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); - boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); - - if ( printProgress || printLog ) { - final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics()); - - final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds); - final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP()); - final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements()); - final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize); - final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); - final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); - final long nRecords = engine.getCumulativeMetrics().getNumIterations(); - - if ( printProgress ) { - lastProgressPrintTime = curTime; - - // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates - if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS ) - progressPrintFrequency = 60 * 1000; // in milliseconds - else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS ) - progressPrintFrequency = 30 * 1000; // in milliseconds - else - progressPrintFrequency = 10 * 1000; // in milliseconds - - final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart()); - logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, nRecords*1.0, elapsed, unitRate, - 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); - - } - - if ( printLog ) { - lastPerformanceLogPrintTime = curTime; - synchronized(performanceLogLock) { - performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed, - bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), - timeToCompletion.getTimeInSeconds()); - } - } - } - } - - /** - * Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the - * history isn't used in any way, but in the future it'll become valuable for more accurate estimates - * for when a process will complete. - * - * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already - * @return - */ - private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) { - synchronized (lock) { - if ( history.size() > HISTORY_WINDOW_SIZE ) - history.pop(); - - long nRecords = metrics.getNumIterations(); - long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing - history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed)); - - return history.getLast(); - } - } - - /** - * Called after a traversal to print out information about the traversal process - */ - public void printOnTraversalDone() { - printProgress(null, true); - - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); - - ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); - - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); - } - - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - synchronized(performanceLogLock) { - return performanceLogFile.getAbsolutePath(); - } - } - - /** - * Sets the filename of the log for performance. If set, will write performance data. - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - File file = new File(fileName); - - synchronized(performanceLogLock) { - // Ignore multiple calls to reset the same lock. - if(performanceLogFile != null && performanceLogFile.equals(file)) - return; - - // Close an existing log - if(performanceLog != null) performanceLog.close(); - - performanceLogFile = file; - createNewPerformanceLog(); - } - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return progressPrintFrequency; - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - progressPrintFrequency = seconds; - } - - private static class ProcessingHistory { - double elapsedSeconds; - long unitsProcessed; - long bpProcessed; - GenomeLoc loc; - - public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.loc = loc; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fractoin of the target intervals have we covered? */ - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / targetSize; - } + if ( progressMeter != null ) progressMeter.printProgress(loc); } } + diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java new file mode 100755 index 000000000..72f20fb0c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.traversals; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.ReadMetrics; +import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * A progress meter that prints a few key metrics to a logger and optionally to a file + * + * Metrics include: + * -- Number of processed X (X = traversal units) + * -- Runtime per.1M X + * -- Percent of regions to be processed completed + * -- The estimated total runtime based on previous performance + * -- The estimated time remaining for the entire process + * + * The optional file log an expanded set of metrics in tabular format + * suitable for subsequent analysis in R. + * + * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent + * threads executing traversals will be calling printProgress() simultaneously and this + * class does (and MUST) properly sort out the timings of logs without interlacing outputs + * because of these threads. + * + * Consequently, the fundamental model for when to print the logs is time based. We basically + * print a meter message every X seconds, minutes, hours, whatever is appropriate based on the + * estimated remaining runtime. + * + * @author depristo + * @since 2010 maybe, but written in 09/12 for clarity + */ +@Invariant({ + "targetSizeInBP >= 0", + "progressPrintFrequency > 0" +}) +public class TraversalProgressMeter { + protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class); + + // -------------------------------------------------------------------------------- + // static constants controlling overall system behavior + // -------------------------------------------------------------------------------- + + /** + * Min. milliseconds after we start up the meter before we will print our first meter message + */ + private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; + + /** + * How often should we print performance logging information, when we are sending this + * information to a file? Not dynamically updated as the logger meter is. + */ + private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000; + + private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; + private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; + + // -------------------------------------------------------------------------------- + // Variables we updating during running + // -------------------------------------------------------------------------------- + + /** + * When was the last time we printed progress log? In milleseconds + */ + private long lastProgressPrintTime = -1; + + /** + * How frequently should we be printing our meter messages? Dynamically updated + * depending on how long we think the run has left. + */ + private long progressPrintFrequency = 10 * 1000; // default value + + /** + * When was the last time we printed to the performance log? In millseconds + */ + private long lastPerformanceLogPrintTime = -1; + + // -------------------------------------------------------------------------------- + // final variables fixed at object creation time + // -------------------------------------------------------------------------------- + + /** + * The set of genome locs describing the total region we are processing with + * this GATK run. Used to determine how close we are to completing the run + */ + private final GenomeLocSortedSet regionsBeingProcessed; + + /** + * Size, in bp, of the area we are processing, derived from regionsBeingProcessed. + * Updated once in the system in initial for performance reasons + */ + private final long targetSizeInBP; + + /** + * Used to get the total number of records we've processed so far. + */ + final ReadMetrics cumulativeMetrics; + + /** + * A string describing the type of this traversal, so we can say things like + * "we are running at X traversalType per second" + */ + private final String traversalType; + + /** + * A potentially null file where we print a supplementary, R readable performance log + * file. + */ + private final PrintStream performanceLog; + + /** We use the SimpleTimer to time our run */ + private final SimpleTimer timer = new SimpleTimer("Traversal"); + + /** + * Create a new TraversalProgressMeter + * + * @param cumulativeMetrics the object where the shared traversal counts are being updated + * @param performanceLogFile an optional performance log file where a table of performance logs will be written + * @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits + * @param processingIntervals the intervals being processed + */ + public TraversalProgressMeter(final ReadMetrics cumulativeMetrics, + final File performanceLogFile, + final String traversalUnits, + final GenomeLocSortedSet processingIntervals) { + if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!"); + if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null"); + if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); + + this.cumulativeMetrics = cumulativeMetrics; + this.traversalType = traversalUnits; + this.regionsBeingProcessed = processingIntervals; + + // setup the performance logger output, if requested by the GATK engine + if ( performanceLogFile != null ) { + try { + this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); + final List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", + "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e); + } + } else { + performanceLog = null; + } + + // cached for performance reasons + targetSizeInBP = processingIntervals.coveredSize(); + + // start up the timer + start(); + } + + /** + * Forward request to printProgress + * + * Assumes that one cycle has been completed + * + * @param loc the location + */ + public void printProgress(final GenomeLoc loc) { + // A bypass is inserted here for unit testing. + printProgress(loc, false); + } + + private synchronized void start() { + timer.start(); + lastProgressPrintTime = timer.currentTime(); + + logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", + "Location", traversalType, traversalType)); + } + + /** + * Utility routine that prints out process information (including timing) every N records or + * every M seconds, for N and M set in global variables. + * + * Synchronized to ensure that even with multiple threads calling printProgress we still + * get one clean stream of meter logs. + * + * @param loc Current location, can be null if you are at the end of the traversal + * @param mustPrint If true, will print out info, regardless of time interval + */ + private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { + final long curTime = timer.currentTime(); + final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); + final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); + + if ( printProgress || printLog ) { + final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics); + + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds); + final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); + final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); + final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); + final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); + final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); + + if ( printProgress ) { + lastProgressPrintTime = curTime; + updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds()); + + // a pretty name for our position + final String posName = loc == null + ? (mustPrint ? "done" : "unmapped reads") + : String.format("%s:%d", loc.getContig(), loc.getStart()); + + logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", + posName, progressData.unitsProcessed*1.0, elapsed, unitRate, + 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); + + } + + if ( printLog ) { + lastPerformanceLogPrintTime = curTime; + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(), + progressData.bpProcessed, bpRate.getTimeInSeconds(), + fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), + timeToCompletion.getTimeInSeconds()); + } + } + } + + /** + * Determine, based on remaining runtime, how often to print the meter + * + * @param totalRuntimeSeconds kinda obvious, no? + */ + private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) { + // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates + if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS ) + progressPrintFrequency = 60 * 1000; // in milliseconds + else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS ) + progressPrintFrequency = 30 * 1000; // in milliseconds + else + progressPrintFrequency = 10 * 1000; // in milliseconds + } + + /** + * Creates a new ProgressData object recording a snapshot of our progress at this instant + * + * @param loc our current position. If null, assumes we are done traversing + * @param metrics information about what's been processed already + * @return + */ + private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) { + final long nRecords = metrics.getNumIterations(); + // null -> end of processing + final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); + return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed); + } + + /** + * Called after a traversal to print out information about the traversal process + */ + public void printOnDone() { + printProgress(null, true); + + final double elapsed = timer == null ? 0 : timer.getElapsedTime(); + + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); + + // TODO -- move into MicroScheduler + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); + for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + + if ( performanceLog != null ) performanceLog.close(); + } + + /** + * @param curTime (current runtime, in millisecs) + * @param lastPrintTime the last time we printed, in machine milliseconds + * @param printFreq maximum permitted difference between last print and current times + * + * @return true if the maximum interval (in millisecs) has passed since the last printing + */ + private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { + final long elapsed = curTime - lastPrintTime; + return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + } + + /** + * a snapshot of our performance, suitable for storage and later analysis + */ + private static class ProgressData { + final double elapsedSeconds; + final long unitsProcessed; + final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + private double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + private double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + private double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index bbd9346b3..2b7b2f9f5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -36,7 +36,7 @@ public class TraverseActiveRegions extends TraversalEngine myReads = new LinkedHashSet(); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "active regions"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 2b45d894c..2e43ef8f8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -54,7 +54,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine extends TraverseLociBase { } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 9b076fce4..aef3cf7d0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -27,7 +27,7 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read protected static final Logger logger = Logger.getLogger(TraverseReads.class); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index b3a0a1390..77ab0c891 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -65,7 +65,7 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } @@ -135,9 +135,8 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java index 475f7de21..9632a687b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java +++ b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java @@ -69,7 +69,7 @@ public class ArtificialReadsTraversal extends TraversalEngine Date: Sun, 9 Sep 2012 11:00:36 -0400 Subject: [PATCH 05/14] Final cleanup of TraversalProgressMeters, moved to utils.progressmeter -- TraversalProgressMeter now completely generalized, named ProgressMeter in utils.progressmeter. Now just takes "nRecordsProcessed" as an argument to print reads. Completely removes dependence on complex data structures from TraversalProgressMeter. Can be used to measure progress on any task with processing units in genomic locations. -- a fairly simple, class with no dependency on GATK engine or other features. -- Currently only used by the TraversalEngine / MicroScheduler but could be used for any purpose now, really. --- .../sting/gatk/executive/MicroScheduler.java | 45 ++++- .../gatk/traversals/TraversalEngine.java | 10 +- .../progressmeter/ProgressMeter.java} | 163 ++++++------------ .../progressmeter/ProgressMeterData.java | 54 ++++++ 4 files changed, 154 insertions(+), 118 deletions(-) rename public/java/src/org/broadinstitute/sting/{gatk/traversals/TraversalProgressMeter.java => utils/progressmeter/ProgressMeter.java} (65%) create mode 100644 public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 0e8208594..3e843de3e 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -28,6 +28,7 @@ package org.broadinstitute.sting.gatk.executive; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; +import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; @@ -37,8 +38,10 @@ import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; +import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; @@ -47,6 +50,7 @@ import javax.management.ObjectName; import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; +import java.util.Map; /** @@ -90,7 +94,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; - final TraversalProgressMeter progressMeter; + final ProgressMeter progressMeter; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the @@ -176,8 +180,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; - this.progressMeter = new TraversalProgressMeter(engine.getCumulativeMetrics(), progressLogFile, - traversalEngine.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + this.progressMeter = new ProgressMeter(progressLogFile, + traversalEngine.getTraversalUnits(), + engine.getRegionsOfGenomeBeingProcessed()); traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. @@ -241,7 +246,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * Must be called by subclasses when execute is done */ protected void executionIsDone() { - progressMeter.printOnDone(); + progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); + printReadFilteringStats(); // TODO -- generalize to all local thread copies traversalEngine.shutdown(); @@ -254,6 +260,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Prints out information about number of reads observed and filtering, if any reads were used in the traversal + * + * Looks like: + * + * INFO 10:40:47,370 MicroScheduler - 22 reads were filtered out during traversal out of 101 total (21.78%) + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing BadMateFilter + * INFO 10:40:47,370 MicroScheduler - -> 20 reads (19.80% of total) failing DuplicateReadFilter + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing FailsVendorQualityCheckFilter + */ + private void printReadFilteringStats() { + final ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) { + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads, cumulativeMetrics.getNumReadsSeen()))); + + for ( final Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + } + } + /** * Gets the engine that created this microscheduler. * @return The engine owning this microscheduler. diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 159343bf8..668bddcca 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -32,13 +32,14 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); protected GenomeAnalysisEngine engine; - private TraversalProgressMeter progressMeter; + private ProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // @@ -72,7 +73,7 @@ public abstract class TraversalEngine,Provide * @param engine GenomeAnalysisEngine for this traversal * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(final GenomeAnalysisEngine engine, final TraversalProgressMeter progressMeter) { + public void initialize(final GenomeAnalysisEngine engine, final ProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); @@ -118,14 +119,15 @@ public abstract class TraversalEngine,Provide } /** - * Forward request to printProgress + * Forward request to notifyOfProgress * * Assumes that one cycle has been completed * * @param loc the location */ public void printProgress(final GenomeLoc loc) { - if ( progressMeter != null ) progressMeter.printProgress(loc); + if ( progressMeter != null ) + progressMeter.notifyOfProgress(loc, engine.getCumulativeMetrics().getNumIterations()); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java similarity index 65% rename from public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java rename to public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java index 72f20fb0c..69cf52fc2 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalProgressMeter.java +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java @@ -22,13 +22,10 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -package org.broadinstitute.sting.gatk.traversals; +package org.broadinstitute.sting.utils.progressmeter; -import com.google.java.contract.Ensures; import com.google.java.contract.Invariant; -import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -38,13 +35,17 @@ import java.io.FileOutputStream; import java.io.PrintStream; import java.util.Arrays; import java.util.List; -import java.util.Map; /** - * A progress meter that prints a few key metrics to a logger and optionally to a file + * A meter measuring progress on a calculation through a set of genomic regions that can + * print a few key metrics to a logger and optionally to a file * - * Metrics include: - * -- Number of processed X (X = traversal units) + * The key information for assessing progress is a set of genome locs describing the total + * set of regions we will process. Whenever (at reasonable intervals) the processing unit + * can called notifyOfProgress and this logger may, depending on the metering delay, print + * a log message with the following metrics: + * + * -- Number of processed X (X = processing units) * -- Runtime per.1M X * -- Percent of regions to be processed completed * -- The estimated total runtime based on previous performance @@ -54,7 +55,7 @@ import java.util.Map; * suitable for subsequent analysis in R. * * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent - * threads executing traversals will be calling printProgress() simultaneously and this + * threads executing processors will be calling notifyOfProgress() simultaneously and this * class does (and MUST) properly sort out the timings of logs without interlacing outputs * because of these threads. * @@ -69,8 +70,8 @@ import java.util.Map; "targetSizeInBP >= 0", "progressPrintFrequency > 0" }) -public class TraversalProgressMeter { - protected static final Logger logger = Logger.getLogger(TraversalProgressMeter.class); +public class ProgressMeter { + protected static final Logger logger = Logger.getLogger(ProgressMeter.class); // -------------------------------------------------------------------------------- // static constants controlling overall system behavior @@ -127,15 +128,10 @@ public class TraversalProgressMeter { private final long targetSizeInBP; /** - * Used to get the total number of records we've processed so far. + * A string describing the type of units being processes, so we can say things like + * "we are running at X processingUnitName per second" */ - final ReadMetrics cumulativeMetrics; - - /** - * A string describing the type of this traversal, so we can say things like - * "we are running at X traversalType per second" - */ - private final String traversalType; + private final String processingUnitName; /** * A potentially null file where we print a supplementary, R readable performance log @@ -144,29 +140,25 @@ public class TraversalProgressMeter { private final PrintStream performanceLog; /** We use the SimpleTimer to time our run */ - private final SimpleTimer timer = new SimpleTimer("Traversal"); + private final SimpleTimer timer = new SimpleTimer(); /** - * Create a new TraversalProgressMeter + * Create a new ProgressMeter * - * @param cumulativeMetrics the object where the shared traversal counts are being updated * @param performanceLogFile an optional performance log file where a table of performance logs will be written - * @param traversalUnits the name of this traversal type, suitable for saying X seconds per traversalUnits + * @param processingUnitName the name of the unit type being processed, suitable for saying X seconds per processingUnitName * @param processingIntervals the intervals being processed */ - public TraversalProgressMeter(final ReadMetrics cumulativeMetrics, - final File performanceLogFile, - final String traversalUnits, - final GenomeLocSortedSet processingIntervals) { - if ( cumulativeMetrics == null ) throw new IllegalArgumentException("cumulativeMetrics cannot be null!"); - if ( traversalUnits == null ) throw new IllegalArgumentException("traversalUnits cannot be null"); + public ProgressMeter(final File performanceLogFile, + final String processingUnitName, + final GenomeLocSortedSet processingIntervals) { + if ( processingUnitName == null ) throw new IllegalArgumentException("processingUnitName cannot be null"); if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); - this.cumulativeMetrics = cumulativeMetrics; - this.traversalType = traversalUnits; + this.processingUnitName = processingUnitName; this.regionsBeingProcessed = processingIntervals; - // setup the performance logger output, if requested by the GATK engine + // setup the performance logger output, if requested if ( performanceLogFile != null ) { try { this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); @@ -188,45 +180,48 @@ public class TraversalProgressMeter { } /** - * Forward request to printProgress + * Forward request to notifyOfProgress * * Assumes that one cycle has been completed * - * @param loc the location + * @param loc our current location. Null means "in unmapped reads" + * @param nTotalRecordsProcessed the total number of records we've processed */ - public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); + public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) { + notifyOfProgress(loc, false, nTotalRecordsProcessed); } private synchronized void start() { timer.start(); lastProgressPrintTime = timer.currentTime(); - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); + logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]"); logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", traversalType, traversalType)); + "Location", processingUnitName, processingUnitName)); } /** * Utility routine that prints out process information (including timing) every N records or * every M seconds, for N and M set in global variables. * - * Synchronized to ensure that even with multiple threads calling printProgress we still + * Synchronized to ensure that even with multiple threads calling notifyOfProgress we still * get one clean stream of meter logs. * - * @param loc Current location, can be null if you are at the end of the traversal + * @param loc Current location, can be null if you are at the end of the processing unit * @param mustPrint If true, will print out info, regardless of time interval + * @param nTotalRecordsProcessed the total number of records we've processed */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { + private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) { + if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0"); + final long curTime = timer.currentTime(); final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); if ( printProgress || printLog ) { - final ProgressData progressData = takeProgressSnapshot(loc, cumulativeMetrics); + final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed); - final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.elapsedSeconds); + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds()); final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); @@ -243,7 +238,7 @@ public class TraversalProgressMeter { : String.format("%s:%d", loc.getContig(), loc.getStart()); logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, progressData.unitsProcessed*1.0, elapsed, unitRate, + posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate, 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); } @@ -251,8 +246,8 @@ public class TraversalProgressMeter { if ( printLog ) { lastPerformanceLogPrintTime = curTime; performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), progressData.unitsProcessed, unitRate.getTimeInSeconds(), - progressData.bpProcessed, bpRate.getTimeInSeconds(), + elapsed.getTimeInSeconds(), progressData.getUnitsProcessed(), unitRate.getTimeInSeconds(), + progressData.getBpProcessed(), bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), timeToCompletion.getTimeInSeconds()); } @@ -278,44 +273,27 @@ public class TraversalProgressMeter { * Creates a new ProgressData object recording a snapshot of our progress at this instant * * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already + * @param nTotalRecordsProcessed the total number of records we've processed * @return */ - private ProgressData takeProgressSnapshot(final GenomeLoc loc, final ReadMetrics metrics) { - final long nRecords = metrics.getNumIterations(); + private ProgressMeterData takeProgressSnapshot(final GenomeLoc loc, final long nTotalRecordsProcessed) { // null -> end of processing final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); - return new ProgressData(timer.getElapsedTime(), nRecords, bpProcessed); + return new ProgressMeterData(timer.getElapsedTime(), nTotalRecordsProcessed, bpProcessed); } /** - * Called after a traversal to print out information about the traversal process + * Should be called when processing is done */ - public void printOnDone() { - printProgress(null, true); + public void notifyDone(final long nTotalRecordsProcessed) { + // print out the progress meter + notifyOfProgress(null, true, nTotalRecordsProcessed); - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", + timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600)); - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - - // TODO -- move into MicroScheduler - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); + if ( performanceLog != null ) + performanceLog.close(); } /** @@ -329,39 +307,4 @@ public class TraversalProgressMeter { final long elapsed = curTime - lastPrintTime; return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; } - - /** - * a snapshot of our performance, suitable for storage and later analysis - */ - private static class ProgressData { - final double elapsedSeconds; - final long unitsProcessed; - final long bpProcessed; - - @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) - public ProgressData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - @Ensures("result >= 0.0") - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - @Ensures("result >= 0.0") - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fraction of the target intervals have we covered? */ - @Requires("targetSize >= 0") - @Ensures({"result >= 0.0", "result <= 1.0"}) - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / Math.max(targetSize, 1); - } - } } diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java new file mode 100644 index 000000000..096b55be2 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java @@ -0,0 +1,54 @@ +package org.broadinstitute.sting.utils.progressmeter; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; + +/** + * a snapshot of our performance, suitable for storage and later analysis + */ +class ProgressMeterData { + private final double elapsedSeconds; + private final long unitsProcessed; + private final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressMeterData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + @Ensures("result >= 0.0") + public double getElapsedSeconds() { + return elapsedSeconds; + } + + @Ensures("result >= 0") + public long getUnitsProcessed() { + return unitsProcessed; + } + + @Ensures("result >= 0") + public long getBpProcessed() { + return bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + public double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + public double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + public double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } +} From f713d400e2865834980aeb36553f5c89d31aaab2 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 9 Sep 2012 16:52:52 -0400 Subject: [PATCH 06/14] Fixed GSA-515 Nanoscheduler GSA-555 / Make NT and NCT work together -- Can now say -nt 4 and -nct 4 to get 16 threads running for you! -- TraversalEngines are now ThreadLocal variables in the MicroScheduler. -- Misc. code cleanup, final variables, some contracts. --- .../executive/HierarchicalMicroScheduler.java | 24 ++- .../gatk/executive/LinearMicroScheduler.java | 8 +- .../sting/gatk/executive/MicroScheduler.java | 147 ++++++++++++++---- .../sting/gatk/executive/ShardTraverser.java | 6 +- 4 files changed, 134 insertions(+), 51 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 486e83e60..1bac72f3e 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -107,7 +107,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar this.traversalTasks = shardStrategy.iterator(); - ReduceTree reduceTree = new ReduceTree(this); + final ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); while (isShardTraversePending() || isTreeReducePending()) { @@ -301,17 +301,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar if (!traversalTasks.hasNext()) throw new IllegalStateException("Cannot traverse; no pending traversals exist."); - Shard shard = traversalTasks.next(); + final Shard shard = traversalTasks.next(); // todo -- add ownership claim here - ShardTraverser traverser = new ShardTraverser(this, - traversalEngine, - walker, - shard, - outputTracker); + final ShardTraverser traverser = new ShardTraverser(this, walker, shard, outputTracker); - Future traverseResult = threadPool.submit(traverser); + final Future traverseResult = threadPool.submit(traverser); // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. reduceTree.addEntry(traverseResult); @@ -326,7 +322,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar protected void queueNextTreeReduce( Walker walker ) { if (reduceTasks.size() == 0) throw new IllegalStateException("Cannot reduce; no pending reduces exist."); - TreeReduceTask reducer = reduceTasks.remove(); + final TreeReduceTask reducer = reduceTasks.remove(); reducer.setWalker((TreeReducible) walker); threadPool.submit(reducer); @@ -334,7 +330,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** Blocks until a free slot appears in the thread queue. */ protected void waitForFreeQueueSlot() { - ThreadPoolMonitor monitor = new ThreadPoolMonitor(); + final ThreadPoolMonitor monitor = new ThreadPoolMonitor(); synchronized (monitor) { threadPool.submit(monitor); monitor.watch(); @@ -346,8 +342,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * * @return A new, composite future of the result of this reduce. */ - public Future notifyReduce( Future lhs, Future rhs ) { - TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs)); + public Future notifyReduce( final Future lhs, final Future rhs ) { + final TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs)); reduceTasks.add(reducer); return reducer; } @@ -375,7 +371,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar return this.error; } - private final RuntimeException toRuntimeException(final Throwable error) { + private RuntimeException toRuntimeException(final Throwable error) { // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. if (error instanceof RuntimeException) return (RuntimeException)error; @@ -386,7 +382,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { - private TreeReducer treeReducer = null; + final private TreeReducer treeReducer; public TreeReduceTask( TreeReducer treeReducer ) { super(treeReducer); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 697e908fd..60f7317ba 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -69,7 +69,7 @@ public class LinearMicroScheduler extends MicroScheduler { getReadIterator(shard), shard.getGenomeLocs(), SampleUtils.getSAMFileSamples(engine)); for(WindowMaker.WindowMakerIterator iterator: windowMaker) { ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); - Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); + Object result = getTraversalEngine().traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); if ( walker.isDone() ) break; @@ -78,7 +78,7 @@ public class LinearMicroScheduler extends MicroScheduler { } else { ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); - Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); + Object result = getTraversalEngine().traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); } @@ -87,8 +87,8 @@ public class LinearMicroScheduler extends MicroScheduler { } // Special function call to empty out the work queue. Ugly for now but will be cleaned up when we eventually push this functionality more into the engine - if( traversalEngine instanceof TraverseActiveRegions ) { - final Object result = ((TraverseActiveRegions) traversalEngine).endTraversal(walker, accumulator.getReduceInit()); + if( getTraversalEngine() instanceof TraverseActiveRegions ) { + final Object result = ((TraverseActiveRegions) getTraversalEngine()).endTraversal(walker, accumulator.getReduceInit()); accumulator.accumulate(null, result); // Assumes only used with StandardAccumulator } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 3e843de3e..4024b247d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -25,6 +25,7 @@ package org.broadinstitute.sting.gatk.executive; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; @@ -50,6 +51,8 @@ import javax.management.ObjectName; import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.Map; @@ -78,7 +81,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ protected final GenomeAnalysisEngine engine; - protected final TraversalEngine traversalEngine; + private final TraversalEngineCreator traversalEngineCreator; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; @@ -110,11 +113,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { if ( threadAllocation.isRunningInParallelMode() ) { - // TODO -- remove me when we fix running NCT within HMS - if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) - throw new UserException("Currently the GATK does not support running CPU threads within data threads, " + - "please specify only one of NT and NCT"); - logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)", threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads())); } @@ -160,30 +158,12 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { this.reads = reads; this.reference = reference; this.rods = rods; - - if (walker instanceof ReadWalker) { - traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 - ? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()) - : new TraverseReads(); - } else if (walker instanceof LocusWalker) { - traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 - ? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()) - : new TraverseLociLinear(); - } else if (walker instanceof DuplicateWalker) { - traversalEngine = new TraverseDuplicates(); - } else if (walker instanceof ReadPairWalker) { - traversalEngine = new TraverseReadPairs(); - } else if (walker instanceof ActiveRegionWalker) { - traversalEngine = new TraverseActiveRegions(); - } else { - throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } + this.traversalEngineCreator = new TraversalEngineCreator(walker, threadAllocation); final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; this.progressMeter = new ProgressMeter(progressLogFile, - traversalEngine.getTraversalUnits(), + traversalEngineCreator.getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); - traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -249,8 +229,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); printReadFilteringStats(); - // TODO -- generalize to all local thread copies - traversalEngine.shutdown(); + for ( final TraversalEngine te : traversalEngineCreator.getCreatedEngines() ) + te.shutdown(); // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { @@ -317,4 +297,115 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex); } } + + /** + * Returns a traversal engine suitable for use in this thread. + * + * May create a new traversal engine for this thread, if this is the first + * time this thread ever asked for a TraversalEngine. + * + * @return a non-null TraversalEngine suitable for execution in this scheduler + */ + public TraversalEngine getTraversalEngine() { + return traversalEngineCreator.get(); + } + + /** + * ThreadLocal TraversalEngine creator + * + * TraversalEngines are thread local variables to the MicroScheduler. This is necessary + * because in the HMS case you have multiple threads executing a traversal engine independently, and + * these engines may need to create separate resources for efficiency or implementation reasons. For example, + * the nanoScheduler creates threads to implement the traversal, and this creation is instance specific. + * So each HMS thread needs to have it's own distinct copy of the traversal engine if it wants to have + * N data threads x M nano threads => N * M threads total. + * + * This class also tracks all created traversal engines so this microscheduler can properly + * shut them all down when the scheduling is done. + */ + private class TraversalEngineCreator extends ThreadLocal { + final List createdEngines = new LinkedList(); + final Walker walker; + final ThreadAllocation threadAllocation; + + /** + * Creates an initialized TraversalEngine appropriate for walker and threadAllocation, + * and adds it to the list of created engines for later shutdown. + * + * @return a non-null traversal engine + */ + @Override + protected synchronized TraversalEngine initialValue() { + final TraversalEngine traversalEngine = createEngine(); + traversalEngine.initialize(engine, progressMeter); + createdEngines.add(traversalEngine); + return traversalEngine; + } + + /** + * Returns the traversal units for traversal engines created here. + * + * This (unfortunately) creates an uninitialized tmp. TraversalEngine so we can get + * it's traversal units, and then immediately shuts it down... + * + * @return the traversal unit as returned by getTraversalUnits of TraversalEngines created here + */ + protected String getTraversalUnits() { + final TraversalEngine tmp = createEngine(); + final String units = tmp.getTraversalUnits(); + tmp.shutdown(); + return units; + } + + /** + * Really make us a traversal engine of the appropriate type for walker and thread allocation + * + * @return a non-null uninitialized traversal engine + */ + @Ensures("result != null") + protected TraversalEngine createEngine() { + if (walker instanceof ReadWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseReads(); + } else if (walker instanceof LocusWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseLociLinear(); + } else if (walker instanceof DuplicateWalker) { + return new TraverseDuplicates(); + } else if (walker instanceof ReadPairWalker) { + return new TraverseReadPairs(); + } else if (walker instanceof ActiveRegionWalker) { + return new TraverseActiveRegions(); + } else { + throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); + } + } + + /** + * Create a TraversalEngineCreator that makes TraversalEngines appropriate for walker and threadAllocation + * + * @param walker the walker we need traversal engines for + * @param threadAllocation what kind of threading will we use in the traversal? + */ + @com.google.java.contract.Requires({"walker != null", "threadAllocation != null"}) + public TraversalEngineCreator(final Walker walker, final ThreadAllocation threadAllocation) { + super(); + this.walker = walker; + this.threadAllocation = threadAllocation; + } + + /** + * Get the list of all traversal engines we've created + * + * @return a non-null list of engines created so far + */ + @Ensures("result != null") + public List getCreatedEngines() { + return createdEngines; + } + } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 790c6b3ed..e8f15ebef 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -5,7 +5,6 @@ import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvide import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; -import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -29,7 +28,6 @@ public class ShardTraverser implements Callable { final private HierarchicalMicroScheduler microScheduler; final private Walker walker; final private Shard shard; - final private TraversalEngine traversalEngine; final private ThreadLocalOutputTracker outputTracker; private OutputMergeTask outputMergeTask; @@ -42,13 +40,11 @@ public class ShardTraverser implements Callable { private boolean complete = false; public ShardTraverser( HierarchicalMicroScheduler microScheduler, - TraversalEngine traversalEngine, Walker walker, Shard shard, ThreadLocalOutputTracker outputTracker) { this.microScheduler = microScheduler; this.walker = walker; - this.traversalEngine = traversalEngine; this.shard = shard; this.outputTracker = outputTracker; } @@ -65,7 +61,7 @@ public class ShardTraverser implements Callable { for(WindowMaker.WindowMakerIterator iterator: windowMaker) { final ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),microScheduler.getEngine().getGenomeLocParser(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods); - accumulator = traversalEngine.traverse( walker, dataProvider, accumulator ); + accumulator = microScheduler.getTraversalEngine().traverse(walker, dataProvider, accumulator); dataProvider.close(); } From 195cf6df7e8b687b097c444acd64a53ad48656e7 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Mon, 10 Sep 2012 14:17:41 -0400 Subject: [PATCH 07/14] Attempting to fix out of memory errors with new traversal engine creator --- .../sting/gatk/executive/MicroScheduler.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 4024b247d..893548a9b 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -81,7 +81,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ protected final GenomeAnalysisEngine engine; - private final TraversalEngineCreator traversalEngineCreator; + private TraversalEngineCreator traversalEngineCreator; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; @@ -229,8 +229,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); printReadFilteringStats(); - for ( final TraversalEngine te : traversalEngineCreator.getCreatedEngines() ) - te.shutdown(); + traversalEngineCreator.shutdown(); + traversalEngineCreator = null; // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { @@ -399,13 +399,13 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } /** - * Get the list of all traversal engines we've created - * - * @return a non-null list of engines created so far + * Shutdown all of the created engines, and clear the list of created engines, dropping + * pointers to the traversal engines */ - @Ensures("result != null") - public List getCreatedEngines() { - return createdEngines; + public synchronized void shutdown() { + for ( final TraversalEngine te : traversalEngineCreator.createdEngines ) + te.shutdown(); + createdEngines.clear(); } } } From 641c6a361e944b28206f0466a3e67f3137d61ed6 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Mon, 10 Sep 2012 16:16:19 -0400 Subject: [PATCH 08/14] Fix nasty memory leak in new data thread x cpu thread parallelism -- Basically you cannot safely use instance specific ThreadLocal variables, as these cannot be safely cleaned up. The old implementation kept pointers to old writers, with huge tribble block indexes, and eventually we crashed out of integration tests -- See http://weblogs.java.net/blog/jjviana/archive/2010/06/10/threadlocal-thread-pool-bad-idea-or-dealing-apparent-glassfish-memor for more information -- New implementation uses a borrow/return schedule with a list of N TraversalEngines managed by the MicroScheduler directly. --- .../gatk/executive/LinearMicroScheduler.java | 11 +- .../sting/gatk/executive/MicroScheduler.java | 212 +++++++++--------- .../sting/gatk/executive/ShardTraverser.java | 5 +- 3 files changed, 118 insertions(+), 110 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 60f7317ba..09b18bfe1 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.DirectOutputTracker; import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.SampleUtils; @@ -60,6 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; + final TraversalEngine traversalEngine = borrowTraversalEngine(); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -69,7 +71,7 @@ public class LinearMicroScheduler extends MicroScheduler { getReadIterator(shard), shard.getGenomeLocs(), SampleUtils.getSAMFileSamples(engine)); for(WindowMaker.WindowMakerIterator iterator: windowMaker) { ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); - Object result = getTraversalEngine().traverse(walker, dataProvider, accumulator.getReduceInit()); + Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); if ( walker.isDone() ) break; @@ -78,7 +80,7 @@ public class LinearMicroScheduler extends MicroScheduler { } else { ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); - Object result = getTraversalEngine().traverse(walker, dataProvider, accumulator.getReduceInit()); + Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); } @@ -87,14 +89,15 @@ public class LinearMicroScheduler extends MicroScheduler { } // Special function call to empty out the work queue. Ugly for now but will be cleaned up when we eventually push this functionality more into the engine - if( getTraversalEngine() instanceof TraverseActiveRegions ) { - final Object result = ((TraverseActiveRegions) getTraversalEngine()).endTraversal(walker, accumulator.getReduceInit()); + if( traversalEngine instanceof TraverseActiveRegions ) { + final Object result = ((TraverseActiveRegions) traversalEngine).endTraversal(walker, accumulator.getReduceInit()); accumulator.accumulate(null, result); // Assumes only used with StandardAccumulator } Object result = accumulator.finishTraversal(); outputTracker.close(); + returnTraversalEngine(traversalEngine); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 893548a9b..030f8d0f2 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -63,14 +63,36 @@ import java.util.Map; * Time: 12:37:23 PM * * General base class for all scheduling algorithms + * Shards and schedules data in manageable chunks. + * + * Creates N TraversalEngines for each data thread for the MicroScheduler. This is necessary + * because in the HMS case you have multiple threads executing a traversal engine independently, and + * these engines may need to create separate resources for efficiency or implementation reasons. For example, + * the nanoScheduler creates threads to implement the traversal, and this creation is instance specific. + * So each HMS thread needs to have it's own distinct copy of the traversal engine if it wants to have + * N data threads x M nano threads => N * M threads total. These are borrowed from this microscheduler + * and returned when done. Also allows us to tracks all created traversal engines so this microscheduler + * can properly shut them all down when the scheduling is done. + * */ - -/** Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler implements MicroSchedulerMBean { // TODO -- remove me and retire non nano scheduled versions of traversals private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true; protected static final Logger logger = Logger.getLogger(MicroScheduler.class); + /** + * The list of all Traversal engines we've created in this micro scheduler + */ + final List allCreatedTraversalEngines = new LinkedList(); + + /** + * All available engines. Engines are borrowed and returned when a subclass is actually + * going to execute the engine on some data. This allows us to have N copies for + * N data parallel executions, but without the dangerous code of having local + * ThreadLocal variables. + */ + final LinkedList availableTraversalEngines = new LinkedList(); + /** * Counts the number of instances of the class that are currently alive. */ @@ -81,7 +103,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ protected final GenomeAnalysisEngine engine; - private TraversalEngineCreator traversalEngineCreator; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; @@ -158,13 +179,27 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { this.reads = reads; this.reference = reference; this.rods = rods; - this.traversalEngineCreator = new TraversalEngineCreator(walker, threadAllocation); final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; + + // Creates uninitialized TraversalEngines appropriate for walker and threadAllocation, + // and adds it to the list of created engines for later shutdown. + for ( int i = 0; i < threadAllocation.getNumDataThreads(); i++ ) { + final TraversalEngine traversalEngine = createTraversalEngine(walker, threadAllocation); + allCreatedTraversalEngines.add(traversalEngine); + availableTraversalEngines.add(traversalEngine); + } + logger.info("Creating " + threadAllocation.getNumDataThreads() + " traversal engines"); + + // Create our progress meter this.progressMeter = new ProgressMeter(progressLogFile, - traversalEngineCreator.getTraversalUnits(), + availableTraversalEngines.peek().getTraversalUnits(), engine.getRegionsOfGenomeBeingProcessed()); + // Now that we have a progress meter, go through and initialize the traversal engines + for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines ) + traversalEngine.initialize(engine, progressMeter); + // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that // will count the number of instances of this object that have been created in this JVM. @@ -179,6 +214,35 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Really make us a traversal engine of the appropriate type for walker and thread allocation + * + * @return a non-null uninitialized traversal engine + */ + @Ensures("result != null") + private TraversalEngine createTraversalEngine(final Walker walker, final ThreadAllocation threadAllocation) { + if (walker instanceof ReadWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseReads(); + } else if (walker instanceof LocusWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseLociLinear(); + } else if (walker instanceof DuplicateWalker) { + return new TraverseDuplicates(); + } else if (walker instanceof ReadPairWalker) { + return new TraverseReadPairs(); + } else if (walker instanceof ActiveRegionWalker) { + return new TraverseActiveRegions(); + } else { + throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); + } + } + + /** * Return the ThreadEfficiencyMonitor we are using to track our resource utilization, if there is one * @@ -228,9 +292,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { protected void executionIsDone() { progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); printReadFilteringStats(); - - traversalEngineCreator.shutdown(); - traversalEngineCreator = null; + shutdownTraversalEngines(); // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { @@ -240,6 +302,23 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Shutdown all of the created engines, and clear the list of created engines, dropping + * pointers to the traversal engines + */ + public synchronized void shutdownTraversalEngines() { + if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) + throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + + "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() + + " have been returned"); + + for ( final TraversalEngine te : allCreatedTraversalEngines) + te.shutdown(); + + allCreatedTraversalEngines.clear(); + availableTraversalEngines.clear(); + } + /** * Prints out information about number of reads observed and filtering, if any reads were used in the traversal * @@ -301,111 +380,34 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { /** * Returns a traversal engine suitable for use in this thread. * - * May create a new traversal engine for this thread, if this is the first - * time this thread ever asked for a TraversalEngine. + * Pops the next available engine from the available ones maintained by this + * microscheduler. Note that it's a runtime error to pop a traversal engine + * from this scheduler if there are none available. Callers that + * once pop'd an engine for use must return it with returnTraversalEngine * * @return a non-null TraversalEngine suitable for execution in this scheduler */ - public TraversalEngine getTraversalEngine() { - return traversalEngineCreator.get(); + @Ensures("result != null") + protected synchronized TraversalEngine borrowTraversalEngine() { + if ( availableTraversalEngines.isEmpty() ) + throw new IllegalStateException("no traversal engines were available"); + else { + return availableTraversalEngines.pop(); + } } /** - * ThreadLocal TraversalEngine creator + * Return a borrowed traversal engine to this MicroScheduler, for later use + * in another traversal execution * - * TraversalEngines are thread local variables to the MicroScheduler. This is necessary - * because in the HMS case you have multiple threads executing a traversal engine independently, and - * these engines may need to create separate resources for efficiency or implementation reasons. For example, - * the nanoScheduler creates threads to implement the traversal, and this creation is instance specific. - * So each HMS thread needs to have it's own distinct copy of the traversal engine if it wants to have - * N data threads x M nano threads => N * M threads total. - * - * This class also tracks all created traversal engines so this microscheduler can properly - * shut them all down when the scheduling is done. + * @param traversalEngine the borrowed traversal engine. Must have been previously borrowed. */ - private class TraversalEngineCreator extends ThreadLocal { - final List createdEngines = new LinkedList(); - final Walker walker; - final ThreadAllocation threadAllocation; + protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) { + if ( traversalEngine == null ) + throw new IllegalArgumentException("Attempting to push a null traversal engine"); + if ( ! allCreatedTraversalEngines.contains(traversalEngine) ) + throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine); - /** - * Creates an initialized TraversalEngine appropriate for walker and threadAllocation, - * and adds it to the list of created engines for later shutdown. - * - * @return a non-null traversal engine - */ - @Override - protected synchronized TraversalEngine initialValue() { - final TraversalEngine traversalEngine = createEngine(); - traversalEngine.initialize(engine, progressMeter); - createdEngines.add(traversalEngine); - return traversalEngine; - } - - /** - * Returns the traversal units for traversal engines created here. - * - * This (unfortunately) creates an uninitialized tmp. TraversalEngine so we can get - * it's traversal units, and then immediately shuts it down... - * - * @return the traversal unit as returned by getTraversalUnits of TraversalEngines created here - */ - protected String getTraversalUnits() { - final TraversalEngine tmp = createEngine(); - final String units = tmp.getTraversalUnits(); - tmp.shutdown(); - return units; - } - - /** - * Really make us a traversal engine of the appropriate type for walker and thread allocation - * - * @return a non-null uninitialized traversal engine - */ - @Ensures("result != null") - protected TraversalEngine createEngine() { - if (walker instanceof ReadWalker) { - if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) - return new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()); - else - return new TraverseReads(); - } else if (walker instanceof LocusWalker) { - if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) - return new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()); - else - return new TraverseLociLinear(); - } else if (walker instanceof DuplicateWalker) { - return new TraverseDuplicates(); - } else if (walker instanceof ReadPairWalker) { - return new TraverseReadPairs(); - } else if (walker instanceof ActiveRegionWalker) { - return new TraverseActiveRegions(); - } else { - throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } - } - - /** - * Create a TraversalEngineCreator that makes TraversalEngines appropriate for walker and threadAllocation - * - * @param walker the walker we need traversal engines for - * @param threadAllocation what kind of threading will we use in the traversal? - */ - @com.google.java.contract.Requires({"walker != null", "threadAllocation != null"}) - public TraversalEngineCreator(final Walker walker, final ThreadAllocation threadAllocation) { - super(); - this.walker = walker; - this.threadAllocation = threadAllocation; - } - - /** - * Shutdown all of the created engines, and clear the list of created engines, dropping - * pointers to the traversal engines - */ - public synchronized void shutdown() { - for ( final TraversalEngine te : traversalEngineCreator.createdEngines ) - te.shutdown(); - createdEngines.clear(); - } + availableTraversalEngines.push(traversalEngine); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index e8f15ebef..d632892d5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -5,6 +5,7 @@ import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvide import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -50,6 +51,7 @@ public class ShardTraverser implements Callable { } public Object call() { + final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(); try { final long startTime = System.currentTimeMillis(); @@ -61,7 +63,7 @@ public class ShardTraverser implements Callable { for(WindowMaker.WindowMakerIterator iterator: windowMaker) { final ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),microScheduler.getEngine().getGenomeLocParser(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods); - accumulator = microScheduler.getTraversalEngine().traverse(walker, dataProvider, accumulator); + accumulator = traversalEngine.traverse(walker, dataProvider, accumulator); dataProvider.close(); } @@ -79,6 +81,7 @@ public class ShardTraverser implements Callable { } finally { synchronized(this) { complete = true; + microScheduler.returnTraversalEngine(traversalEngine); notifyAll(); } } From d6e42d839cde59bc815e4a151fce912c638c15c9 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Mon, 10 Sep 2012 16:39:49 -0400 Subject: [PATCH 09/14] Fixes GSA-558 GATK ReadShards don't handle unmapped reads correctly. --- .../sting/gatk/datasources/reads/ReadShard.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java index fd1ee9859..def27b20d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java @@ -8,7 +8,10 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; /** * @@ -149,7 +152,12 @@ public class ReadShard extends Shard { if ( read.getAlignmentEnd() > stop ) stop = read.getAlignmentEnd(); } - return parser.createGenomeLoc(contig, start, stop); + assert contig != null; + + if ( contig.equals("*") ) // all reads are unmapped + return GenomeLoc.UNMAPPED; + else + return parser.createGenomeLoc(contig, start, stop); } } } From e25e617d1a47f050247b456a709dcbff13402243 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 11 Sep 2012 07:38:34 -0400 Subject: [PATCH 11/14] Fixes GSA-515 Nanoscheduler GSA-560 / Fix display of NanoScheduler and MonitoringEfficiency -- Now prints out a single combined NanoScheduler runtime profile report across all nano schedulers in use. So now if you run with -nt 4 you'll get one combined NanoScheduler profiler across all 4 instances of the NanoScheduler within TraverseXNano. --- .../sting/gatk/executive/MicroScheduler.java | 4 + .../sting/utils/SimpleTimer.java | 9 ++ .../utils/nanoScheduler/InputProducer.java | 16 ++- .../utils/nanoScheduler/NSRuntimeProfile.java | 69 +++++++++++ .../utils/nanoScheduler/NanoScheduler.java | 112 +++++++++--------- .../utils/nanoScheduler/ReducerThread.java | 5 +- .../nanoScheduler/InputProducerUnitTest.java | 3 +- .../nanoScheduler/NanoSchedulerUnitTest.java | 15 ++- .../nanoScheduler/ReducerThreadUnitTest.java | 3 +- 9 files changed, 170 insertions(+), 66 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 030f8d0f2..a78ab4375 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -42,6 +42,7 @@ import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; @@ -315,6 +316,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { for ( final TraversalEngine te : allCreatedTraversalEngines) te.shutdown(); + // horrible hack to print nano scheduling information across all nano schedulers, if any were used + NanoScheduler.printCombinedRuntimeProfile(); + allCreatedTraversalEngines.clear(); availableTraversalEngines.clear(); } diff --git a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java index b3a9986c5..4c54d4126 100644 --- a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java +++ b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java @@ -145,4 +145,13 @@ public class SimpleTimer { public synchronized long getElapsedTimeNano() { return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano; } + + /** + * Add the elapsed time from toAdd to this elapsed time + * + * @param toAdd the timer whose elapsed time we want to add to this timer + */ + public synchronized void addElapsed(final SimpleTimer toAdd) { + elapsedTimeNano += toAdd.getElapsedTimeNano(); + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 29dddbc49..f5eb53456 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -29,6 +29,7 @@ class InputProducer implements Runnable { final SimpleTimer inputTimer, final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; @@ -38,11 +39,16 @@ class InputProducer implements Runnable { public void run() { try { - while ( inputReader.hasNext() ) { - if ( inputTimer != null ) inputTimer.restart(); - final InputType input = inputReader.next(); - if ( inputTimer != null ) inputTimer.stop(); - outputQueue.put(new InputValue(input)); + while ( true ) { + inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + inputTimer.stop(); + outputQueue.put(new InputValue(input)); + } } // add the EOF object so our consumer knows we are done in all inputs diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java new file mode 100644 index 000000000..874434eae --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java @@ -0,0 +1,69 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.AutoFormattingTime; +import org.broadinstitute.sting.utils.SimpleTimer; + +/** + * Holds runtime profile (input, read, map) times as tracked by NanoScheduler + * + * User: depristo + * Date: 9/10/12 + * Time: 8:31 PM + */ +public class NSRuntimeProfile { + final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside"); + final SimpleTimer inputTimer = new SimpleTimer("input"); + final SimpleTimer mapTimer = new SimpleTimer("map"); + final SimpleTimer reduceTimer = new SimpleTimer("reduce"); + + /** + * Combine the elapsed time information from other with this profile + * + * @param other a non-null profile + */ + public void combine(final NSRuntimeProfile other) { + outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer); + inputTimer.addElapsed(other.inputTimer); + mapTimer.addElapsed(other.mapTimer); + reduceTimer.addElapsed(other.reduceTimer); + } + + /** + * Print the runtime profiling to logger + * + * @param logger + */ + public void log(final Logger logger) { + log1(logger, "Input time", inputTimer); + log1(logger, "Map time", mapTimer); + log1(logger, "Reduce time", reduceTimer); + log1(logger, "Outside time", outsideSchedulerTimer); + } + + /** + * @return the total runtime for all functions of this nano scheduler + */ + @Ensures("result >= 0.0") + public double totalRuntimeInSeconds() { + return inputTimer.getElapsedTime() + + mapTimer.getElapsedTime() + + reduceTimer.getElapsedTime() + + outsideSchedulerTimer.getElapsedTime(); + } + + /** + * Print to logger.info timing information from timer, with name label + * + * @param label the name of the timer to display. Should be human readable + * @param timer the timer whose elapsed time we will display + */ + @Requires({"label != null", "timer != null"}) + private void log1(final Logger logger, final String label, final SimpleTimer timer) { + final double myTimeInSec = timer.getElapsedTime(); + final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100; + logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 664fb7b9b..bb9afa879 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -3,8 +3,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.AutoFormattingTime; -import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; @@ -46,7 +44,6 @@ public class NanoScheduler { private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean LOG_MAP_TIMES = false; - private final static boolean TIME_CALLS = true; private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; @@ -61,10 +58,15 @@ public class NanoScheduler { boolean debug = false; private NSProgressFunction progressFunction = null; - final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null; - final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null; - final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null; - final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null; + /** + * Tracks the combined runtime profiles across all created nano schedulers + */ + final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile(); + + /** + * The profile specific to this nano scheduler + */ + final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile(); /** * Create a new nanoscheduler with the desire characteristics requested by the argument @@ -92,7 +94,7 @@ public class NanoScheduler { } // start timing the time spent outside of the nanoScheduler - outsideSchedulerTimer.start(); + myNSRuntimeProfile.outsideSchedulerTimer.start(); } /** @@ -119,21 +121,31 @@ public class NanoScheduler { * After this call, execute cannot be invoked without throwing an error */ public void shutdown() { - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); + + // add my timing information to the combined NS runtime profile + combinedNSRuntimeProfiler.combine(myNSRuntimeProfile); if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); shutdownExecutor("reduceExecutor", reduceExecutor); } - shutdown = true; - if (TIME_CALLS) { - printTimerInfo("Input time", inputTimer); - printTimerInfo("Map time", mapTimer); - printTimerInfo("Reduce time", reduceTimer); - printTimerInfo("Outside time", outsideSchedulerTimer); - } + shutdown = true; + } + + public void printRuntimeProfile() { + myNSRuntimeProfile.log(logger); + } + + public static void printCombinedRuntimeProfile() { + if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 ) + combinedNSRuntimeProfiler.log(logger); + } + + protected double getTotalRuntime() { + return myNSRuntimeProfile.totalRuntimeInSeconds(); } /** @@ -154,21 +166,6 @@ public class NanoScheduler { throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!"); } - /** - * Print to logger.info timing information from timer, with name label - * - * @param label the name of the timer to display. Should be human readable - * @param timer the timer whose elapsed time we will display - */ - @Requires({"label != null", "timer != null"}) - private void printTimerInfo(final String label, final SimpleTimer timer) { - final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime() - + reduceTimer.getElapsedTime() + outsideSchedulerTimer.getElapsedTime(); - final double myTimeInSec = timer.getElapsedTime(); - final double myTimePercent = myTimeInSec / total * 100; - logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); - } - /** * @return true if this nanoScheduler is shutdown, or false if its still open for business */ @@ -246,7 +243,7 @@ public class NanoScheduler { if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); ReduceType result; if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { @@ -255,7 +252,7 @@ public class NanoScheduler { result = executeMultiThreaded(inputReader, map, initialValue, reduce); } - outsideSchedulerTimer.restart(); + myNSRuntimeProfile.outsideSchedulerTimer.restart(); return result; } @@ -272,28 +269,31 @@ public class NanoScheduler { ReduceType sum = initialValue; int i = 0; - // start timer to ensure that both hasNext and next are caught by the timer - if ( TIME_CALLS ) inputTimer.restart(); - while ( inputReader.hasNext() ) { - final InputType input = inputReader.next(); - if ( TIME_CALLS ) inputTimer.stop(); + while ( true ) { + // start timer to ensure that both hasNext and next are caught by the timer + myNSRuntimeProfile.inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + myNSRuntimeProfile.inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + myNSRuntimeProfile.inputTimer.stop(); - // map - if ( TIME_CALLS ) mapTimer.restart(); - final long preMapTime = LOG_MAP_TIMES ? 0 : mapTimer.currentTimeNano(); - final MapType mapValue = map.apply(input); - if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime)); - if ( TIME_CALLS ) mapTimer.stop(); + // map + myNSRuntimeProfile.mapTimer.restart(); + final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano(); + final MapType mapValue = map.apply(input); + if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime)); + myNSRuntimeProfile.mapTimer.stop(); - if ( i++ % inputBufferSize == 0 && progressFunction != null ) - progressFunction.progress(input); + if ( i++ % inputBufferSize == 0 && progressFunction != null ) + progressFunction.progress(input); - // reduce - if ( TIME_CALLS ) reduceTimer.restart(); - sum = reduce.apply(mapValue, sum); - if ( TIME_CALLS ) reduceTimer.stop(); - - if ( TIME_CALLS ) inputTimer.restart(); + // reduce + myNSRuntimeProfile.reduceTimer.restart(); + sum = reduce.apply(mapValue, sum); + myNSRuntimeProfile.reduceTimer.stop(); + } } return sum; @@ -321,11 +321,11 @@ public class NanoScheduler { new LinkedBlockingDeque>>(mapBufferSize); // Start running the input reader thread - inputExecutor.submit(new InputProducer(inputReader, inputTimer, inputQueue)); + inputExecutor.submit(new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue)); // Start running the reducer thread final ReducerThread reducer - = new ReducerThread(reduce, reduceTimer, initialValue, mapResultQueue); + = new ReducerThread(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue); final Future reduceResult = reduceExecutor.submit(reducer); try { @@ -382,10 +382,10 @@ public class NanoScheduler { @Override public MapResult call() { - if ( TIME_CALLS ) mapTimer.restart(); if ( debug ) debugPrint("\t\tmap " + input); + myNSRuntimeProfile.mapTimer.restart(); final MapType result = map.apply(input); - if ( TIME_CALLS ) mapTimer.stop(); + myNSRuntimeProfile.mapTimer.stop(); return new MapResult(result, id); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java index 506e45453..dcdba3490 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java @@ -29,6 +29,7 @@ class ReducerThread implements Callable { final ReduceType sum, final BlockingQueue>> mapResultQueue) { if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); + if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); this.reduce = reduce; @@ -51,9 +52,9 @@ class ReducerThread implements Callable { } else { lastJobID = result.getJobID(); // apply reduce, keeping track of sum - if ( reduceTimer != null ) reduceTimer.restart(); + reduceTimer.restart(); sum = reduce.apply(result.getValue(), sum); - if ( reduceTimer != null ) reduceTimer.stop(); + reduceTimer.stop(); } } } catch (ExecutionException ex) { diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index b3365c13c..b3986e74e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -42,7 +43,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(queueSize); - final InputProducer ip = new InputProducer(elements.iterator(), null, readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(ip); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 47dcc1d5e..a0ab493c1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.BasicConfigurator; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -86,7 +87,7 @@ public class NanoSchedulerUnitTest extends BaseTest { static NanoSchedulerBasicTest exampleTest = null; @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { - for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000, 10000000) ) { for ( final int nt : Arrays.asList(1, 2, 4) ) { for ( final int start : Arrays.asList(0) ) { for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) { @@ -114,6 +115,7 @@ public class NanoSchedulerUnitTest extends BaseTest { } private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final SimpleTimer timer = new SimpleTimer().start(); final NanoScheduler nanoScheduler = new NanoScheduler(test.bufferSize, test.nThreads); @@ -129,6 +131,17 @@ public class NanoSchedulerUnitTest extends BaseTest { Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks); nanoScheduler.shutdown(); + + // TODO -- need to enable only in the case where there's serious time spend in + // TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up + final double myTimeEstimate = timer.getElapsedTime(); + final double tolerance = 0.1; + if ( false && myTimeEstimate > 0.1 ) { + Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance, + "NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime() + + " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of " + + tolerance); + } } @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME) diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java index 61d1330bc..08771e9ec 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -61,7 +62,7 @@ public class ReducerThreadUnitTest extends BaseTest { final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); final ReducerThread thread - = new ReducerThread(reduce, null, 0, mapResultsQueue); + = new ReducerThread(reduce, new SimpleTimer(), 0, mapResultsQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); final Future value = es.submit(thread); From 6fad0f25bb88fc201c7575ecf5a94f588eab33f7 Mon Sep 17 00:00:00 2001 From: David Roazen Date: Tue, 11 Sep 2012 10:34:14 -0400 Subject: [PATCH 12/14] Merge Eric's LocusIteratorByStateUnitTest changes into LocusIteratorByStateExperimentalUnitTest --- ...usIteratorByStateExperimentalUnitTest.java | 269 ++++++++++++------ 1 file changed, 185 insertions(+), 84 deletions(-) diff --git a/public/java/test/org/broadinstitute/sting/gatk/iterators/LocusIteratorByStateExperimentalUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/iterators/LocusIteratorByStateExperimentalUnitTest.java index c148bcf84..9d592cd26 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/iterators/LocusIteratorByStateExperimentalUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/iterators/LocusIteratorByStateExperimentalUnitTest.java @@ -1,8 +1,6 @@ package org.broadinstitute.sting.gatk.iterators; -import net.sf.samtools.SAMFileHeader; -import net.sf.samtools.SAMFileReader; -import net.sf.samtools.SAMRecord; +import net.sf.samtools.*; import net.sf.samtools.util.CloseableIterator; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.gatk.ReadProperties; @@ -39,57 +37,10 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { genomeLocParser = new GenomeLocParser(header.getSequenceDictionary()); } - private final LocusIteratorByStateExperimental makeLTBS(List reads, ReadProperties readAttributes) { + private LocusIteratorByStateExperimental makeLTBS(List reads, ReadProperties readAttributes) { return new LocusIteratorByStateExperimental(new FakeCloseableIterator(reads.iterator()), readAttributes, genomeLocParser, LocusIteratorByStateExperimental.sampleListForSAMWithoutReadGroups()); } - private static ReadProperties createTestReadProperties() { - return createTestReadProperties(null); - } - - private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) { - return new ReadProperties( - Collections.emptyList(), - new SAMFileHeader(), - false, - SAMFileReader.ValidationStringency.STRICT, - downsamplingMethod, - new ValidationExclusion(), - Collections.emptyList(), - Collections.emptyList(), - false, - (byte) -1 - ); - } - - private static class FakeCloseableIterator implements CloseableIterator { - Iterator iterator; - - public FakeCloseableIterator(Iterator it) { - iterator = it; - } - - @Override - public void close() { - return; - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public T next() { - return iterator.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Don't remove!"); - } - } - @Test public void testXandEQOperators() { final byte[] bases1 = new byte[] {'A','A','A','A','A','A','A','A','A','A'}; @@ -308,45 +259,36 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { // comprehensive LIBS/PileupElement tests // //////////////////////////////////////////// - private static final int IS_BEFORE_DELETED_BASE_FLAG = 1; - private static final int IS_BEFORE_DELETION_START_FLAG = 2; - private static final int IS_AFTER_DELETED_BASE_FLAG = 4; - private static final int IS_AFTER_DELETION_END_FLAG = 8; - private static final int IS_BEFORE_INSERTION_FLAG = 16; - private static final int IS_AFTER_INSERTION_FLAG = 32; - private static final int IS_NEXT_TO_SOFTCLIP_FLAG = 64; - private static class LIBSTest { final String cigar; final int readLength; - final List offsets; - final List flags; - private LIBSTest(final String cigar, final int readLength, final List offsets, final List flags) { + private LIBSTest(final String cigar, final int readLength) { this.cigar = cigar; this.readLength = readLength; - this.offsets = offsets; - this.flags = flags; } } @DataProvider(name = "LIBSTest") public Object[][] createLIBSTestData() { + + //TODO -- when LIBS is fixed this should be replaced to provide all possible permutations of CIGAR strings + return new Object[][]{ - {new LIBSTest("1I", 1, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))}, - {new LIBSTest("10I", 10, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))}, - {new LIBSTest("2M2I2M", 6, Arrays.asList(0,1,4,5), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG,IS_AFTER_INSERTION_FLAG,0))}, - {new LIBSTest("2M2I", 4, Arrays.asList(0,1), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG))}, + {new LIBSTest("1I", 1)}, + {new LIBSTest("10I", 10)}, + {new LIBSTest("2M2I2M", 6)}, + {new LIBSTest("2M2I", 4)}, //TODO -- uncomment these when LIBS is fixed //{new LIBSTest("2I2M", 4, Arrays.asList(2,3), Arrays.asList(IS_AFTER_INSERTION_FLAG,0))}, //{new LIBSTest("1I1M1D1M", 3, Arrays.asList(0,1), Arrays.asList(IS_AFTER_INSERTION_FLAG | IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG))}, //{new LIBSTest("1S1I1M", 3, Arrays.asList(2), Arrays.asList(IS_AFTER_INSERTION_FLAG))}, - {new LIBSTest("1M2D2M", 3, Arrays.asList(0,1,2), Arrays.asList(IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG,0))}, - {new LIBSTest("1S1M", 2, Arrays.asList(1), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))}, - {new LIBSTest("1M1S", 2, Arrays.asList(0), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))}, - {new LIBSTest("1S1M1I", 3, Arrays.asList(1), Arrays.asList(IS_BEFORE_INSERTION_FLAG | IS_NEXT_TO_SOFTCLIP_FLAG))} + //{new LIBSTest("1M2D2M", 3)}, + {new LIBSTest("1S1M", 2)}, + {new LIBSTest("1M1S", 2)}, + {new LIBSTest("1S1M1I", 3)} }; } @@ -361,26 +303,24 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { // create the iterator by state with the fake reads and fake records li = makeLTBS(Arrays.asList(read), createTestReadProperties()); + final LIBS_position tester = new LIBS_position(read); - int offset = 0; while ( li.hasNext() ) { AlignmentContext alignmentContext = li.next(); ReadBackedPileup p = alignmentContext.getBasePileup(); Assert.assertTrue(p.getNumberOfElements() == 1); PileupElement pe = p.iterator().next(); - final int flag = params.flags.get(offset); - Assert.assertEquals(pe.isBeforeDeletedBase(), (flag & IS_BEFORE_DELETED_BASE_FLAG) != 0); - Assert.assertEquals(pe.isBeforeDeletionStart(), (flag & IS_BEFORE_DELETION_START_FLAG) != 0); - Assert.assertEquals(pe.isAfterDeletedBase(), (flag & IS_AFTER_DELETED_BASE_FLAG) != 0); - Assert.assertEquals(pe.isAfterDeletionEnd(), (flag & IS_AFTER_DELETION_END_FLAG) != 0); - Assert.assertEquals(pe.isBeforeInsertion(), (flag & IS_BEFORE_INSERTION_FLAG) != 0); - Assert.assertEquals(pe.isAfterInsertion(), (flag & IS_AFTER_INSERTION_FLAG) != 0); - Assert.assertEquals(pe.isNextToSoftClip(), (flag & IS_NEXT_TO_SOFTCLIP_FLAG) != 0); + tester.stepForwardOnGenome(); - Assert.assertEquals(pe.getOffset(), params.offsets.get(offset).intValue()); - - offset++; + Assert.assertEquals(pe.isBeforeDeletedBase(), tester.isBeforeDeletedBase); + Assert.assertEquals(pe.isBeforeDeletionStart(), tester.isBeforeDeletionStart); + Assert.assertEquals(pe.isAfterDeletedBase(), tester.isAfterDeletedBase); + Assert.assertEquals(pe.isAfterDeletionEnd(), tester.isAfterDeletionEnd); + Assert.assertEquals(pe.isBeforeInsertion(), tester.isBeforeInsertion); + Assert.assertEquals(pe.isAfterInsertion(), tester.isAfterInsertion); + Assert.assertEquals(pe.isNextToSoftClip(), tester.isNextToSoftClip); + Assert.assertEquals(pe.getOffset(), tester.getCurrentReadOffset()); } } @@ -543,4 +483,165 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { test.run(); } + + /////////////////////////////////////// + // End Read State Manager Tests // + /////////////////////////////////////// + + + + /////////////////////////////////////// + // Helper methods / classes // + /////////////////////////////////////// + + private static ReadProperties createTestReadProperties() { + return createTestReadProperties(null); + } + + private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) { + return new ReadProperties( + Collections.emptyList(), + new SAMFileHeader(), + false, + SAMFileReader.ValidationStringency.STRICT, + downsamplingMethod, + new ValidationExclusion(), + Collections.emptyList(), + Collections.emptyList(), + false, + (byte) -1 + ); + } + + private static class FakeCloseableIterator implements CloseableIterator { + Iterator iterator; + + public FakeCloseableIterator(Iterator it) { + iterator = it; + } + + @Override + public void close() {} + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Don't remove!"); + } + } + + private static final class LIBS_position { + + SAMRecord read; + + final int numOperators; + int currentOperatorIndex = 0; + int currentPositionOnOperator = 0; + int currentReadOffset = 0; + + boolean isBeforeDeletionStart = false; + boolean isBeforeDeletedBase = false; + boolean isAfterDeletionEnd = false; + boolean isAfterDeletedBase = false; + boolean isBeforeInsertion = false; + boolean isAfterInsertion = false; + boolean isNextToSoftClip = false; + + boolean sawMop = false; + + public LIBS_position(final SAMRecord read) { + this.read = read; + numOperators = read.getCigar().numCigarElements(); + } + + public int getCurrentReadOffset() { + return Math.max(0, currentReadOffset - 1); + } + + /** + * Steps forward on the genome. Returns false when done reading the read, true otherwise. + */ + public boolean stepForwardOnGenome() { + if ( currentOperatorIndex == numOperators ) + return false; + + CigarElement curElement = read.getCigar().getCigarElement(currentOperatorIndex); + if ( currentPositionOnOperator >= curElement.getLength() ) { + if ( ++currentOperatorIndex == numOperators ) + return false; + + curElement = read.getCigar().getCigarElement(currentOperatorIndex); + currentPositionOnOperator = 0; + } + + switch ( curElement.getOperator() ) { + case I: // insertion w.r.t. the reference + if ( !sawMop ) + break; + case S: // soft clip + currentReadOffset += curElement.getLength(); + case H: // hard clip + case P: // padding + currentOperatorIndex++; + return stepForwardOnGenome(); + + case D: // deletion w.r.t. the reference + case N: // reference skip (looks and gets processed just like a "deletion", just different logical meaning) + currentPositionOnOperator++; + break; + + case M: + case EQ: + case X: + sawMop = true; + currentReadOffset++; + currentPositionOnOperator++; + break; + default: + throw new IllegalStateException("No support for cigar op: " + curElement.getOperator()); + } + + final boolean isFirstOp = currentOperatorIndex == 0; + final boolean isLastOp = currentOperatorIndex == numOperators - 1; + final boolean isFirstBaseOfOp = currentPositionOnOperator == 1; + final boolean isLastBaseOfOp = currentPositionOnOperator == curElement.getLength(); + + isBeforeDeletionStart = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isLastOp, isLastBaseOfOp); + isBeforeDeletedBase = isBeforeDeletionStart || (!isLastBaseOfOp && curElement.getOperator() == CigarOperator.D); + isAfterDeletionEnd = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isFirstOp, isFirstBaseOfOp); + isAfterDeletedBase = isAfterDeletionEnd || (!isFirstBaseOfOp && curElement.getOperator() == CigarOperator.D); + isBeforeInsertion = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isLastOp, isLastBaseOfOp) + || (!sawMop && curElement.getOperator() == CigarOperator.I); + isAfterInsertion = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isFirstOp, isFirstBaseOfOp); + isNextToSoftClip = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isLastOp, isLastBaseOfOp) + || isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isFirstOp, isFirstBaseOfOp); + + return true; + } + + private static boolean isBeforeOp(final Cigar cigar, + final int currentOperatorIndex, + final CigarOperator op, + final boolean isLastOp, + final boolean isLastBaseOfOp) { + return !isLastOp && isLastBaseOfOp && cigar.getCigarElement(currentOperatorIndex+1).getOperator() == op; + } + + private static boolean isAfterOp(final Cigar cigar, + final int currentOperatorIndex, + final CigarOperator op, + final boolean isFirstOp, + final boolean isFirstBaseOfOp) { + return !isFirstOp && isFirstBaseOfOp && cigar.getCigarElement(currentOperatorIndex-1).getOperator() == op; + } + } } From 13831106d570ce029fbc04ebab7aeb06a6ed1557 Mon Sep 17 00:00:00 2001 From: Guillermo del Angel Date: Tue, 11 Sep 2012 11:01:26 -0400 Subject: [PATCH 13/14] Fix GSA-535: storing likelihoods in allele map was busted when running HaplotypeCaller, only the last likelihood of a haplotype was being stored, as opposed to the max likelihood of all haplotypes mapping to an allele --- .../LikelihoodCalculationEngine.java | 9 +++++---- .../HaplotypeCallerIntegrationTest.java | 14 +++++++------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java index 69af66185..db289ecab 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java @@ -335,14 +335,15 @@ public class LikelihoodCalculationEngine { final GATKSAMRecord read = readsForThisSample.get(iii); // BUGBUG: assumes read order in this list and haplotype likelihood list are the same! // only count the read if it overlaps the event, otherwise it is not added to the output read list at all if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) { - final double likelihoods[] = new double[call.getFirst().getAlleles().size()]; - int count = 0; - for( final Allele a : call.getFirst().getAlleles() ) { + double maxLikelihood = Double.NEGATIVE_INFINITY; for( final Haplotype h : call.getSecond().get(a) ) { // use the max likelihood from all the haplotypes which mapped to this allele (achieved via the haplotype mapper object) final double likelihood = h.getReadLikelihoods(sample.getKey())[iii]; - likelihoodMap.add(read, a, likelihood); + if( likelihood > maxLikelihood ) { + maxLikelihood = likelihood; + } } + likelihoodMap.add(read, a, maxLikelihood); } } } diff --git a/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java index b5359af46..b45c027a7 100644 --- a/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java +++ b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java @@ -21,17 +21,17 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerMultiSample() { - HCTest(CEUTRIO_BAM, "", "e5b4a0627a1d69b9356f8a7cd2260e89"); + HCTest(CEUTRIO_BAM, "", "5b751474ad0aef4cdb53f094e605f97c"); } @Test public void testHaplotypeCallerSingleSample() { - HCTest(NA12878_BAM, "", "202d5b6edaf74f411c170099749f202f"); + HCTest(NA12878_BAM, "", "60efcd2d2722087e900f6365985d18bf"); } @Test public void testHaplotypeCallerMultiSampleGGA() { - HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "561931ba3919808ec471e745cb3148c7"); + HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "71bec55320a2f07af0d54be9d7735322"); } private void HCTestComplexVariants(String bam, String args, String md5) { @@ -42,7 +42,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerMultiSampleComplex() { - HCTestComplexVariants(CEUTRIO_BAM, "", "3424b398a9f47c8ac606a5c56eb7d8a7"); + HCTestComplexVariants(CEUTRIO_BAM, "", "f5a809e3fbd9998f79b75bb2973209e1"); } private void HCTestSymbolicVariants(String bam, String args, String md5) { @@ -53,7 +53,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerSingleSampleSymbolic() { - HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "b71cfaea9390136c584c9671b149d573"); + HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "8043b0451a4384e678a93600b34afce7"); } private void HCTestIndelQualityScores(String bam, String args, String md5) { @@ -64,13 +64,13 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerSingleSampleIndelQualityScores() { - HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "e1f88fac91424740c0eaac1de48b3970"); + HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "ea6539e05faf10ffaf76f2d16907c47a"); } @Test public void HCTestProblematicReadsModifiedInActiveRegions() { final String base = String.format("-T HaplotypeCaller -R %s -I %s", REF, privateTestDir + "haplotype-problem-4.bam") + " --no_cmdline_in_header -o %s -minPruning 3"; - final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("000fd36d5cf8090386bb2ac15e3ab0b5")); + final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("8d092b25f40456e618eef91fdce8adf0")); executeTest("HCTestProblematicReadsModifiedInActiveRegions: ", spec); } }