diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java index d04c1a9e2..db289ecab 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java @@ -335,31 +335,27 @@ public class LikelihoodCalculationEngine { final GATKSAMRecord read = readsForThisSample.get(iii); // BUGBUG: assumes read order in this list and haplotype likelihood list are the same! // only count the read if it overlaps the event, otherwise it is not added to the output read list at all if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) { - final double likelihoods[] = new double[call.getFirst().getAlleles().size()]; - int count = 0; - for( final Allele a : call.getFirst().getAlleles() ) { + double maxLikelihood = Double.NEGATIVE_INFINITY; for( final Haplotype h : call.getSecond().get(a) ) { // use the max likelihood from all the haplotypes which mapped to this allele (achieved via the haplotype mapper object) final double likelihood = h.getReadLikelihoods(sample.getKey())[iii]; - likelihoodMap.add(read, a, likelihood); + if( likelihood > maxLikelihood ) { + maxLikelihood = likelihood; + } } + likelihoodMap.add(read, a, maxLikelihood); } } } -/* // add all filtered reads to the NO_CALL list because they weren't given any likelihoods - List readList = alleleReadMap.get(Allele.NO_CALL); - if( readList == null ) { - readList = new ArrayList(); - alleleReadMap.put(Allele.NO_CALL, readList); - } - */ - /* for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) { + // add all filtered reads to the NO_CALL list because they weren't given any likelihoods + for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) { // only count the read if it overlaps the event, otherwise it is not added to the output read list at all if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) { - readList.add(read); + for( final Allele a : call.getFirst().getAlleles() ) + likelihoodMap.add(read,a,0.0); } } - */ + returnMap.put(sample.getKey(), likelihoodMap); } diff --git a/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java index b5359af46..b45c027a7 100644 --- a/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java +++ b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerIntegrationTest.java @@ -21,17 +21,17 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerMultiSample() { - HCTest(CEUTRIO_BAM, "", "e5b4a0627a1d69b9356f8a7cd2260e89"); + HCTest(CEUTRIO_BAM, "", "5b751474ad0aef4cdb53f094e605f97c"); } @Test public void testHaplotypeCallerSingleSample() { - HCTest(NA12878_BAM, "", "202d5b6edaf74f411c170099749f202f"); + HCTest(NA12878_BAM, "", "60efcd2d2722087e900f6365985d18bf"); } @Test public void testHaplotypeCallerMultiSampleGGA() { - HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "561931ba3919808ec471e745cb3148c7"); + HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "71bec55320a2f07af0d54be9d7735322"); } private void HCTestComplexVariants(String bam, String args, String md5) { @@ -42,7 +42,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerMultiSampleComplex() { - HCTestComplexVariants(CEUTRIO_BAM, "", "3424b398a9f47c8ac606a5c56eb7d8a7"); + HCTestComplexVariants(CEUTRIO_BAM, "", "f5a809e3fbd9998f79b75bb2973209e1"); } private void HCTestSymbolicVariants(String bam, String args, String md5) { @@ -53,7 +53,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerSingleSampleSymbolic() { - HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "b71cfaea9390136c584c9671b149d573"); + HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "8043b0451a4384e678a93600b34afce7"); } private void HCTestIndelQualityScores(String bam, String args, String md5) { @@ -64,13 +64,13 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest { @Test public void testHaplotypeCallerSingleSampleIndelQualityScores() { - HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "e1f88fac91424740c0eaac1de48b3970"); + HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "ea6539e05faf10ffaf76f2d16907c47a"); } @Test public void HCTestProblematicReadsModifiedInActiveRegions() { final String base = String.format("-T HaplotypeCaller -R %s -I %s", REF, privateTestDir + "haplotype-problem-4.bam") + " --no_cmdline_in_header -o %s -minPruning 3"; - final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("000fd36d5cf8090386bb2ac15e3ab0b5")); + final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("8d092b25f40456e618eef91fdce8adf0")); executeTest("HCTestProblematicReadsModifiedInActiveRegions: ", spec); } } diff --git a/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R b/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R index 4c228ccb4..eba94c0cb 100644 --- a/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R +++ b/public/R/src/org/broadinstitute/sting/utils/R/gsalib/R/gsa.read.gatkreport.R @@ -111,7 +111,13 @@ gsa.read.gatkreportv1 <- function(lines) { headerRowCount = -1; finishTable <- function() { - .gsa.assignGATKTableToEnvironment(tableName, tableHeader, tableRows[1:rowCount,], tableEnv); + if ( rowCount == 1 ) + # good I hate R. Work around to avoid collapsing into an unstructured vector when + # there's only 1 row + sub <- t(as.matrix(tableRows[1:rowCount,])) + else + sub <- tableRows[1:rowCount,] + .gsa.assignGATKTableToEnvironment(tableName, tableHeader, sub, tableEnv); } for (line in lines) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 3ce8a92b7..516ea8451 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -24,6 +24,7 @@ package org.broadinstitute.sting.gatk; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.samtools.SAMFileHeader; @@ -682,14 +683,14 @@ public class GenomeAnalysisEngine { // if include argument isn't given, create new set of all possible intervals - Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( + final Pair includeExcludePair = IntervalUtils.parseIntervalBindingsPair( this.referenceDataSource, argCollection.intervals, argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding, argCollection.excludeIntervals); - GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); - GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); + final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst(); + final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond(); // if no exclude arguments, can return parseIntervalArguments directly if ( excludeSortedSet == null ) @@ -700,13 +701,15 @@ public class GenomeAnalysisEngine { intervals = includeSortedSet.subtractRegions(excludeSortedSet); // logging messages only printed when exclude (-XL) arguments are given - long toPruneSize = includeSortedSet.coveredSize(); - long toExcludeSize = excludeSortedSet.coveredSize(); - long intervalSize = intervals.coveredSize(); + final long toPruneSize = includeSortedSet.coveredSize(); + final long toExcludeSize = excludeSortedSet.coveredSize(); + final long intervalSize = intervals.coveredSize(); logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize)); logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)", toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize))); } + + logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize())); } /** @@ -981,6 +984,22 @@ public class GenomeAnalysisEngine { return this.intervals; } + /** + * Get the list of regions of the genome being processed. If the user + * requested specific intervals, return those, otherwise return regions + * corresponding to the entire genome. Never returns null. + * + * @return a non-null set of intervals being processed + */ + @Ensures("result != null") + public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() { + if ( getIntervals() == null ) + // if we don't have any intervals defined, create intervals from the reference itself + return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary()); + else + return getIntervals(); + } + /** * Gets the list of filters employed by this engine. * @return Collection of filters (actual instances) used by this engine. diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java index fd1ee9859..def27b20d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ReadShard.java @@ -8,7 +8,10 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; /** * @@ -149,7 +152,12 @@ public class ReadShard extends Shard { if ( read.getAlignmentEnd() > stop ) stop = read.getAlignmentEnd(); } - return parser.createGenomeLoc(contig, start, stop); + assert contig != null; + + if ( contig.equals("*") ) // all reads are unmapped + return GenomeLoc.UNMAPPED; + else + return parser.createGenomeLoc(contig, start, stop); } } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index f1d2f7b5b..1bac72f3e 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -107,7 +107,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar this.traversalTasks = shardStrategy.iterator(); - ReduceTree reduceTree = new ReduceTree(this); + final ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); while (isShardTraversePending() || isTreeReducePending()) { @@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar outputTracker.bypassThreadLocalStorage(true); try { walker.onTraversalDone(result); - printOnTraversalDone(result); } finally { outputTracker.bypassThreadLocalStorage(false); @@ -302,17 +301,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar if (!traversalTasks.hasNext()) throw new IllegalStateException("Cannot traverse; no pending traversals exist."); - Shard shard = traversalTasks.next(); + final Shard shard = traversalTasks.next(); // todo -- add ownership claim here - ShardTraverser traverser = new ShardTraverser(this, - traversalEngine, - walker, - shard, - outputTracker); + final ShardTraverser traverser = new ShardTraverser(this, walker, shard, outputTracker); - Future traverseResult = threadPool.submit(traverser); + final Future traverseResult = threadPool.submit(traverser); // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. reduceTree.addEntry(traverseResult); @@ -327,7 +322,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar protected void queueNextTreeReduce( Walker walker ) { if (reduceTasks.size() == 0) throw new IllegalStateException("Cannot reduce; no pending reduces exist."); - TreeReduceTask reducer = reduceTasks.remove(); + final TreeReduceTask reducer = reduceTasks.remove(); reducer.setWalker((TreeReducible) walker); threadPool.submit(reducer); @@ -335,7 +330,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** Blocks until a free slot appears in the thread queue. */ protected void waitForFreeQueueSlot() { - ThreadPoolMonitor monitor = new ThreadPoolMonitor(); + final ThreadPoolMonitor monitor = new ThreadPoolMonitor(); synchronized (monitor) { threadPool.submit(monitor); monitor.watch(); @@ -347,8 +342,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * * @return A new, composite future of the result of this reduce. */ - public Future notifyReduce( Future lhs, Future rhs ) { - TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs)); + public Future notifyReduce( final Future lhs, final Future rhs ) { + final TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs)); reduceTasks.add(reducer); return reducer; } @@ -376,7 +371,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar return this.error; } - private final RuntimeException toRuntimeException(final Throwable error) { + private RuntimeException toRuntimeException(final Throwable error) { // If the error is already a Runtime, pass it along as is. Otherwise, wrap it. if (error instanceof RuntimeException) return (RuntimeException)error; @@ -387,7 +382,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { - private TreeReducer treeReducer = null; + final private TreeReducer treeReducer; public TreeReduceTask( TreeReducer treeReducer ) { super(treeReducer); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 530285db0..87d0ad721 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive; * An interface for retrieving runtime statistics about how the hierarchical * microscheduler is behaving. */ -public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { +public interface HierarchicalMicroSchedulerMBean { /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue? diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index ceb4a6f9b..09b18bfe1 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.DirectOutputTracker; import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.SampleUtils; @@ -60,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; - traversalEngine.startTimersIfNecessary(); + final TraversalEngine traversalEngine = borrowTraversalEngine(); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -95,9 +96,8 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = accumulator.finishTraversal(); - printOnTraversalDone(result); - outputTracker.close(); + returnTraversalEngine(traversalEngine); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index c6ef9acf1..a78ab4375 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -25,9 +25,11 @@ package org.broadinstitute.sting.gatk.executive; +import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; +import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; @@ -37,15 +39,22 @@ import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; +import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import java.io.File; import java.lang.management.ManagementFactory; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** @@ -55,14 +64,36 @@ import java.util.Collection; * Time: 12:37:23 PM * * General base class for all scheduling algorithms + * Shards and schedules data in manageable chunks. + * + * Creates N TraversalEngines for each data thread for the MicroScheduler. This is necessary + * because in the HMS case you have multiple threads executing a traversal engine independently, and + * these engines may need to create separate resources for efficiency or implementation reasons. For example, + * the nanoScheduler creates threads to implement the traversal, and this creation is instance specific. + * So each HMS thread needs to have it's own distinct copy of the traversal engine if it wants to have + * N data threads x M nano threads => N * M threads total. These are borrowed from this microscheduler + * and returned when done. Also allows us to tracks all created traversal engines so this microscheduler + * can properly shut them all down when the scheduling is done. + * */ - -/** Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler implements MicroSchedulerMBean { // TODO -- remove me and retire non nano scheduled versions of traversals private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true; protected static final Logger logger = Logger.getLogger(MicroScheduler.class); + /** + * The list of all Traversal engines we've created in this micro scheduler + */ + final List allCreatedTraversalEngines = new LinkedList(); + + /** + * All available engines. Engines are borrowed and returned when a subclass is actually + * going to execute the engine on some data. This allows us to have N copies for + * N data parallel executions, but without the dangerous code of having local + * ThreadLocal variables. + */ + final LinkedList availableTraversalEngines = new LinkedList(); + /** * Counts the number of instances of the class that are currently alive. */ @@ -73,7 +104,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ protected final GenomeAnalysisEngine engine; - protected final TraversalEngine traversalEngine; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; @@ -89,6 +119,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ ThreadEfficiencyMonitor threadEfficiencyMonitor = null; + final ProgressMeter progressMeter; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -103,11 +135,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { if ( threadAllocation.isRunningInParallelMode() ) { - // TODO -- remove me when we fix running NCT within HMS - if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) - throw new UserException("Currently the GATK does not support running CPU threads within data threads, " + - "please specify only one of NT and NCT"); - logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)", threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads())); } @@ -154,25 +181,25 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { this.reference = reference; this.rods = rods; - if (walker instanceof ReadWalker) { - traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 - ? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()) - : new TraverseReads(); - } else if (walker instanceof LocusWalker) { - traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 - ? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()) - : new TraverseLociLinear(); - } else if (walker instanceof DuplicateWalker) { - traversalEngine = new TraverseDuplicates(); - } else if (walker instanceof ReadPairWalker) { - traversalEngine = new TraverseReadPairs(); - } else if (walker instanceof ActiveRegionWalker) { - traversalEngine = new TraverseActiveRegions(); - } else { - throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); - } + final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog; - traversalEngine.initialize(engine); + // Creates uninitialized TraversalEngines appropriate for walker and threadAllocation, + // and adds it to the list of created engines for later shutdown. + for ( int i = 0; i < threadAllocation.getNumDataThreads(); i++ ) { + final TraversalEngine traversalEngine = createTraversalEngine(walker, threadAllocation); + allCreatedTraversalEngines.add(traversalEngine); + availableTraversalEngines.add(traversalEngine); + } + logger.info("Creating " + threadAllocation.getNumDataThreads() + " traversal engines"); + + // Create our progress meter + this.progressMeter = new ProgressMeter(progressLogFile, + availableTraversalEngines.peek().getTraversalUnits(), + engine.getRegionsOfGenomeBeingProcessed()); + + // Now that we have a progress meter, go through and initialize the traversal engines + for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines ) + traversalEngine.initialize(engine, progressMeter); // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. // To get around this limitation and since we have no job identifier at this point, register a simple counter that @@ -188,6 +215,35 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Really make us a traversal engine of the appropriate type for walker and thread allocation + * + * @return a non-null uninitialized traversal engine + */ + @Ensures("result != null") + private TraversalEngine createTraversalEngine(final Walker walker, final ThreadAllocation threadAllocation) { + if (walker instanceof ReadWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseReads(); + } else if (walker instanceof LocusWalker) { + if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 ) + return new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()); + else + return new TraverseLociLinear(); + } else if (walker instanceof DuplicateWalker) { + return new TraverseDuplicates(); + } else if (walker instanceof ReadPairWalker) { + return new TraverseReadPairs(); + } else if (walker instanceof ActiveRegionWalker) { + return new TraverseActiveRegions(); + } else { + throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); + } + } + + /** * Return the ThreadEfficiencyMonitor we are using to track our resource utilization, if there is one * @@ -231,18 +287,14 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator(); } - /** - * Print summary information for the analysis. - * @param sum The final reduce output. - */ - protected void printOnTraversalDone(Object sum) { - traversalEngine.printOnTraversalDone(); - } - /** * Must be called by subclasses when execute is done */ protected void executionIsDone() { + progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations()); + printReadFilteringStats(); + shutdownTraversalEngines(); + // Print out the threading efficiency of this HMS, if state monitoring is enabled if ( threadEfficiencyMonitor != null ) { // include the master thread information @@ -251,6 +303,57 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } + /** + * Shutdown all of the created engines, and clear the list of created engines, dropping + * pointers to the traversal engines + */ + public synchronized void shutdownTraversalEngines() { + if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) + throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + + "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() + + " have been returned"); + + for ( final TraversalEngine te : allCreatedTraversalEngines) + te.shutdown(); + + // horrible hack to print nano scheduling information across all nano schedulers, if any were used + NanoScheduler.printCombinedRuntimeProfile(); + + allCreatedTraversalEngines.clear(); + availableTraversalEngines.clear(); + } + + /** + * Prints out information about number of reads observed and filtering, if any reads were used in the traversal + * + * Looks like: + * + * INFO 10:40:47,370 MicroScheduler - 22 reads were filtered out during traversal out of 101 total (21.78%) + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing BadMateFilter + * INFO 10:40:47,370 MicroScheduler - -> 20 reads (19.80% of total) failing DuplicateReadFilter + * INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing FailsVendorQualityCheckFilter + */ + private void printReadFilteringStats() { + final ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); + if ( cumulativeMetrics.getNumReadsSeen() > 0 ) { + // count up the number of skipped reads by summing over all filters + long nSkippedReads = 0L; + for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) + nSkippedReads += countsByFilter; + + logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", + nSkippedReads, + cumulativeMetrics.getNumReadsSeen(), + 100.0 * MathUtils.ratio(nSkippedReads, cumulativeMetrics.getNumReadsSeen()))); + + for ( final Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { + long count = filterCounts.getValue(); + logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", + count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); + } + } + } + /** * Gets the engine that created this microscheduler. * @return The engine owning this microscheduler. @@ -269,38 +372,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public IndexedFastaSequenceFile getReference() { return reference; } - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - return traversalEngine.getPerformanceLogFileName(); - } - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - traversalEngine.setPerformanceLogFileName(fileName); - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return traversalEngine.getPerformanceProgressPrintFrequencySeconds(); - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds); - } - protected void cleanup() { try { mBeanServer.unregisterMBean(mBeanName); @@ -309,4 +380,38 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex); } } + + /** + * Returns a traversal engine suitable for use in this thread. + * + * Pops the next available engine from the available ones maintained by this + * microscheduler. Note that it's a runtime error to pop a traversal engine + * from this scheduler if there are none available. Callers that + * once pop'd an engine for use must return it with returnTraversalEngine + * + * @return a non-null TraversalEngine suitable for execution in this scheduler + */ + @Ensures("result != null") + protected synchronized TraversalEngine borrowTraversalEngine() { + if ( availableTraversalEngines.isEmpty() ) + throw new IllegalStateException("no traversal engines were available"); + else { + return availableTraversalEngines.pop(); + } + } + + /** + * Return a borrowed traversal engine to this MicroScheduler, for later use + * in another traversal execution + * + * @param traversalEngine the borrowed traversal engine. Must have been previously borrowed. + */ + protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) { + if ( traversalEngine == null ) + throw new IllegalArgumentException("Attempting to push a null traversal engine"); + if ( ! allCreatedTraversalEngines.contains(traversalEngine) ) + throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine); + + availableTraversalEngines.push(traversalEngine); + } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java index e510822b8..8be6b0b62 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java @@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive; * To change this template use File | Settings | File Templates. */ public interface MicroSchedulerMBean { - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName(); - - /** - * Set the filename of the log for performance. If set, - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName); - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds(); - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds); + // has nothing because we don't have anything we currently track } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index aefa9c12d..d632892d5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; /** * User: hanna * Date: Apr 29, 2009 @@ -30,7 +29,6 @@ public class ShardTraverser implements Callable { final private HierarchicalMicroScheduler microScheduler; final private Walker walker; final private Shard shard; - final private TraversalEngine traversalEngine; final private ThreadLocalOutputTracker outputTracker; private OutputMergeTask outputMergeTask; @@ -43,20 +41,18 @@ public class ShardTraverser implements Callable { private boolean complete = false; public ShardTraverser( HierarchicalMicroScheduler microScheduler, - TraversalEngine traversalEngine, Walker walker, Shard shard, ThreadLocalOutputTracker outputTracker) { this.microScheduler = microScheduler; this.walker = walker; - this.traversalEngine = traversalEngine; this.shard = shard; this.outputTracker = outputTracker; } public Object call() { + final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(); try { - traversalEngine.startTimersIfNecessary(); final long startTime = System.currentTimeMillis(); Object accumulator = walker.reduceInit(); @@ -67,7 +63,7 @@ public class ShardTraverser implements Callable { for(WindowMaker.WindowMakerIterator iterator: windowMaker) { final ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),microScheduler.getEngine().getGenomeLocParser(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods); - accumulator = traversalEngine.traverse( walker, dataProvider, accumulator ); + accumulator = traversalEngine.traverse(walker, dataProvider, accumulator); dataProvider.close(); } @@ -85,6 +81,7 @@ public class ShardTraverser implements Callable { } finally { synchronized(this) { complete = true; + microScheduler.returnTraversalEngine(traversalEngine); notifyAll(); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index 8c617e4dc..668bddcca 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -30,66 +30,29 @@ import org.broadinstitute.sting.gatk.ReadMetrics; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; public abstract class TraversalEngine,ProviderType extends ShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraversalEngine.class); - // Time in milliseconds since we initialized this engine - private static final int HISTORY_WINDOW_SIZE = 50; - - /** lock object to sure updates to history are consistent across threads */ - private static final Object lock = new Object(); - LinkedList history = new LinkedList(); - - /** We use the SimpleTimer to time our run */ - private SimpleTimer timer = null; - - // How long can we go without printing some progress info? - private long lastProgressPrintTime = -1; // When was the last time we printed progress log? - - private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds - private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; - private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; - private long progressPrintFrequency = 10 * 1000; // in milliseconds - private boolean progressMeterInitialized = false; - - // for performance log - private static final boolean PERFORMANCE_LOG_ENABLED = true; - private final Object performanceLogLock = new Object(); - private File performanceLogFile; - private PrintStream performanceLog = null; - private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? - private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds - - /** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */ - long targetSize = -1; - GenomeLocSortedSet targetIntervals = null; - protected GenomeAnalysisEngine engine; + private ProgressMeter progressMeter; // ---------------------------------------------------------------------------------------------------- // // ABSTRACT METHODS // // ---------------------------------------------------------------------------------------------------- + /** - * Gets the named traversal type associated with the given traversal. + * Gets the named traversal type associated with the given traversal, such as loci, reads, etc. + * * @return A user-friendly name for the given traversal type. */ - protected abstract String getTraversalType(); + public abstract String getTraversalUnits(); /** * this method must be implemented by all traversal engines @@ -104,70 +67,36 @@ public abstract class TraversalEngine,Provide ProviderType dataProvider, T sum); - // ---------------------------------------------------------------------------------------------------- - // - // Common timing routines - // - // ---------------------------------------------------------------------------------------------------- /** * Initialize the traversal engine. After this point traversals can be run over the data + * * @param engine GenomeAnalysisEngine for this traversal + * @param progressMeter An optional (null == optional) meter to track our progress */ - public void initialize(GenomeAnalysisEngine engine) { + public void initialize(final GenomeAnalysisEngine engine, final ProgressMeter progressMeter) { if ( engine == null ) throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!"); this.engine = engine; - - if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - synchronized(this.performanceLogLock) { - performanceLogFile = engine.getArguments().performanceLog; - createNewPerformanceLog(); - } - } - - // if we don't have any intervals defined, create intervals from the reference itself - if ( this.engine.getIntervals() == null ) - targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary()); - else - targetIntervals = this.engine.getIntervals(); - targetSize = targetIntervals.coveredSize(); - } - - private void createNewPerformanceLog() { - synchronized(performanceLogLock) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); - performanceLog.println(Utils.join("\t", pLogHeader)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); - } - } - } - /** - * Should be called to indicate that we're going to process records and the timer should start ticking. This - * function should be called right before any traversal work is done, to avoid counting setup costs in the - * processing costs and inflating the estimated runtime. - */ - public void startTimersIfNecessary() { - if ( timer == null ) { - timer = new SimpleTimer("Traversal"); - timer.start(); - lastProgressPrintTime = timer.currentTime(); - } + this.progressMeter = progressMeter; } /** - * @param curTime (current runtime, in millisecs) - * @param lastPrintTime the last time we printed, in machine milliseconds - * @param printFreq maximum permitted difference between last print and current times + * For testing only. Does not initialize the progress meter * - * @return true if the maximum interval (in millisecs) has passed since the last printing + * @param engine */ - private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { - long elapsed = curTime - lastPrintTime; - return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + protected void initialize(final GenomeAnalysisEngine engine) { + initialize(engine, null); + } + + /** + * Called by the MicroScheduler when all work is done and the GATK is shutting down. + * + * To be used by subclasses that need to free up resources (such as threads) + */ + public void shutdown() { + // by default there's nothing to do } /** @@ -190,201 +119,15 @@ public abstract class TraversalEngine,Provide } /** - * Forward request to printProgress + * Forward request to notifyOfProgress * * Assumes that one cycle has been completed * * @param loc the location */ public void printProgress(final GenomeLoc loc) { - // A bypass is inserted here for unit testing. - printProgress(loc, false); - } - - /** - * Utility routine that prints out process information (including timing) every N records or - * every M seconds, for N and M set in global variables. - * - * @param loc Current location, can be null if you are at the end of the traversal - * @param mustPrint If true, will print out info, regardless of nRecords or time interval - */ - private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) { - if( ! progressMeterInitialized ) { - logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]"); - logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", - "Location", getTraversalType(), getTraversalType())); - progressMeterInitialized = true; - } - - final long curTime = timer.currentTime(); - boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); - boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); - - if ( printProgress || printLog ) { - final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics()); - - final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds); - final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP()); - final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements()); - final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize); - final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); - final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); - final long nRecords = engine.getCumulativeMetrics().getNumIterations(); - - if ( printProgress ) { - lastProgressPrintTime = curTime; - - // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates - if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS ) - progressPrintFrequency = 60 * 1000; // in milliseconds - else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS ) - progressPrintFrequency = 30 * 1000; // in milliseconds - else - progressPrintFrequency = 10 * 1000; // in milliseconds - - final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart()); - logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, nRecords*1.0, elapsed, unitRate, - 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); - - } - - if ( printLog ) { - lastPerformanceLogPrintTime = curTime; - synchronized(performanceLogLock) { - performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed, - bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), - timeToCompletion.getTimeInSeconds()); - } - } - } - } - - /** - * Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the - * history isn't used in any way, but in the future it'll become valuable for more accurate estimates - * for when a process will complete. - * - * @param loc our current position. If null, assumes we are done traversing - * @param metrics information about what's been processed already - * @return - */ - private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) { - synchronized (lock) { - if ( history.size() > HISTORY_WINDOW_SIZE ) - history.pop(); - - long nRecords = metrics.getNumIterations(); - long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing - history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed)); - - return history.getLast(); - } - } - - /** - * Called after a traversal to print out information about the traversal process - */ - public void printOnTraversalDone() { - printProgress(null, true); - - final double elapsed = timer == null ? 0 : timer.getElapsedTime(); - - ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics(); - - // count up the number of skipped reads by summing over all filters - long nSkippedReads = 0L; - for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values()) - nSkippedReads += countsByFilter; - - logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600)); - if ( cumulativeMetrics.getNumReadsSeen() > 0 ) - logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)", - nSkippedReads, - cumulativeMetrics.getNumReadsSeen(), - 100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen()))); - for ( Map.Entry filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) { - long count = filterCounts.getValue(); - logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s", - count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey())); - } - - if ( performanceLog != null ) performanceLog.close(); - } - - /** - * Gets the filename to which performance data is currently being written. - * @return Filename to which performance data is currently being written. - */ - public String getPerformanceLogFileName() { - synchronized(performanceLogLock) { - return performanceLogFile.getAbsolutePath(); - } - } - - /** - * Sets the filename of the log for performance. If set, will write performance data. - * @param fileName filename to use when writing performance data. - */ - public void setPerformanceLogFileName(String fileName) { - File file = new File(fileName); - - synchronized(performanceLogLock) { - // Ignore multiple calls to reset the same lock. - if(performanceLogFile != null && performanceLogFile.equals(file)) - return; - - // Close an existing log - if(performanceLog != null) performanceLog.close(); - - performanceLogFile = file; - createNewPerformanceLog(); - } - } - - /** - * Gets the frequency with which performance data is written. - * @return Frequency, in seconds, of performance log writes. - */ - public long getPerformanceProgressPrintFrequencySeconds() { - return progressPrintFrequency; - } - - /** - * How often should the performance log message be written? - * @param seconds number of seconds between messages indicating performance frequency. - */ - public void setPerformanceProgressPrintFrequencySeconds(long seconds) { - progressPrintFrequency = seconds; - } - - private static class ProcessingHistory { - double elapsedSeconds; - long unitsProcessed; - long bpProcessed; - GenomeLoc loc; - - public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) { - this.elapsedSeconds = elapsedSeconds; - this.loc = loc; - this.unitsProcessed = unitsProcessed; - this.bpProcessed = bpProcessed; - } - - /** How long in seconds to process 1M traversal units? */ - private double secondsPerMillionElements() { - return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); - } - - /** How long in seconds to process 1M bp on the genome? */ - private double secondsPerMillionBP() { - return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); - } - - /** What fractoin of the target intervals have we covered? */ - private double calculateFractionGenomeTargetCompleted(final long targetSize) { - return (1.0*bpProcessed) / targetSize; - } + if ( progressMeter != null ) + progressMeter.notifyOfProgress(loc, engine.getCumulativeMetrics().getNumIterations()); } } + diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index bbd9346b3..2b7b2f9f5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -36,7 +36,7 @@ public class TraverseActiveRegions extends TraversalEngine myReads = new LinkedHashSet(); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "active regions"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 2b45d894c..2e43ef8f8 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -54,7 +54,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine extends TraverseLociBase { } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 9b076fce4..aef3cf7d0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -27,7 +27,7 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read protected static final Logger logger = Logger.getLogger(TraverseReads.class); @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index b3a0a1390..77ab0c891 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -65,7 +65,7 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - protected String getTraversalType() { + public String getTraversalUnits() { return "reads"; } @@ -135,9 +135,8 @@ public class TraverseReadsNano extends TraversalEngine, } @Override - public void printOnTraversalDone() { + public void shutdown() { nanoScheduler.shutdown(); - super.printOnTraversalDone(); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java index b3a9986c5..4c54d4126 100644 --- a/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java +++ b/public/java/src/org/broadinstitute/sting/utils/SimpleTimer.java @@ -145,4 +145,13 @@ public class SimpleTimer { public synchronized long getElapsedTimeNano() { return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano; } + + /** + * Add the elapsed time from toAdd to this elapsed time + * + * @param toAdd the timer whose elapsed time we want to add to this timer + */ + public synchronized void addElapsed(final SimpleTimer toAdd) { + elapsedTimeNano += toAdd.getElapsedTimeNano(); + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 29dddbc49..f5eb53456 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -29,6 +29,7 @@ class InputProducer implements Runnable { final SimpleTimer inputTimer, final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; @@ -38,11 +39,16 @@ class InputProducer implements Runnable { public void run() { try { - while ( inputReader.hasNext() ) { - if ( inputTimer != null ) inputTimer.restart(); - final InputType input = inputReader.next(); - if ( inputTimer != null ) inputTimer.stop(); - outputQueue.put(new InputValue(input)); + while ( true ) { + inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + inputTimer.stop(); + outputQueue.put(new InputValue(input)); + } } // add the EOF object so our consumer knows we are done in all inputs diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java new file mode 100644 index 000000000..874434eae --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java @@ -0,0 +1,69 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.AutoFormattingTime; +import org.broadinstitute.sting.utils.SimpleTimer; + +/** + * Holds runtime profile (input, read, map) times as tracked by NanoScheduler + * + * User: depristo + * Date: 9/10/12 + * Time: 8:31 PM + */ +public class NSRuntimeProfile { + final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside"); + final SimpleTimer inputTimer = new SimpleTimer("input"); + final SimpleTimer mapTimer = new SimpleTimer("map"); + final SimpleTimer reduceTimer = new SimpleTimer("reduce"); + + /** + * Combine the elapsed time information from other with this profile + * + * @param other a non-null profile + */ + public void combine(final NSRuntimeProfile other) { + outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer); + inputTimer.addElapsed(other.inputTimer); + mapTimer.addElapsed(other.mapTimer); + reduceTimer.addElapsed(other.reduceTimer); + } + + /** + * Print the runtime profiling to logger + * + * @param logger + */ + public void log(final Logger logger) { + log1(logger, "Input time", inputTimer); + log1(logger, "Map time", mapTimer); + log1(logger, "Reduce time", reduceTimer); + log1(logger, "Outside time", outsideSchedulerTimer); + } + + /** + * @return the total runtime for all functions of this nano scheduler + */ + @Ensures("result >= 0.0") + public double totalRuntimeInSeconds() { + return inputTimer.getElapsedTime() + + mapTimer.getElapsedTime() + + reduceTimer.getElapsedTime() + + outsideSchedulerTimer.getElapsedTime(); + } + + /** + * Print to logger.info timing information from timer, with name label + * + * @param label the name of the timer to display. Should be human readable + * @param timer the timer whose elapsed time we will display + */ + @Requires({"label != null", "timer != null"}) + private void log1(final Logger logger, final String label, final SimpleTimer timer) { + final double myTimeInSec = timer.getElapsedTime(); + final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100; + logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 664fb7b9b..bb9afa879 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -3,8 +3,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.AutoFormattingTime; -import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; @@ -46,7 +44,6 @@ public class NanoScheduler { private final static Logger logger = Logger.getLogger(NanoScheduler.class); private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean LOG_MAP_TIMES = false; - private final static boolean TIME_CALLS = true; private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; @@ -61,10 +58,15 @@ public class NanoScheduler { boolean debug = false; private NSProgressFunction progressFunction = null; - final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null; - final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null; - final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null; - final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null; + /** + * Tracks the combined runtime profiles across all created nano schedulers + */ + final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile(); + + /** + * The profile specific to this nano scheduler + */ + final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile(); /** * Create a new nanoscheduler with the desire characteristics requested by the argument @@ -92,7 +94,7 @@ public class NanoScheduler { } // start timing the time spent outside of the nanoScheduler - outsideSchedulerTimer.start(); + myNSRuntimeProfile.outsideSchedulerTimer.start(); } /** @@ -119,21 +121,31 @@ public class NanoScheduler { * After this call, execute cannot be invoked without throwing an error */ public void shutdown() { - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); + + // add my timing information to the combined NS runtime profile + combinedNSRuntimeProfiler.combine(myNSRuntimeProfile); if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); shutdownExecutor("reduceExecutor", reduceExecutor); } - shutdown = true; - if (TIME_CALLS) { - printTimerInfo("Input time", inputTimer); - printTimerInfo("Map time", mapTimer); - printTimerInfo("Reduce time", reduceTimer); - printTimerInfo("Outside time", outsideSchedulerTimer); - } + shutdown = true; + } + + public void printRuntimeProfile() { + myNSRuntimeProfile.log(logger); + } + + public static void printCombinedRuntimeProfile() { + if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 ) + combinedNSRuntimeProfiler.log(logger); + } + + protected double getTotalRuntime() { + return myNSRuntimeProfile.totalRuntimeInSeconds(); } /** @@ -154,21 +166,6 @@ public class NanoScheduler { throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!"); } - /** - * Print to logger.info timing information from timer, with name label - * - * @param label the name of the timer to display. Should be human readable - * @param timer the timer whose elapsed time we will display - */ - @Requires({"label != null", "timer != null"}) - private void printTimerInfo(final String label, final SimpleTimer timer) { - final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime() - + reduceTimer.getElapsedTime() + outsideSchedulerTimer.getElapsedTime(); - final double myTimeInSec = timer.getElapsedTime(); - final double myTimePercent = myTimeInSec / total * 100; - logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); - } - /** * @return true if this nanoScheduler is shutdown, or false if its still open for business */ @@ -246,7 +243,7 @@ public class NanoScheduler { if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); - outsideSchedulerTimer.stop(); + myNSRuntimeProfile.outsideSchedulerTimer.stop(); ReduceType result; if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { @@ -255,7 +252,7 @@ public class NanoScheduler { result = executeMultiThreaded(inputReader, map, initialValue, reduce); } - outsideSchedulerTimer.restart(); + myNSRuntimeProfile.outsideSchedulerTimer.restart(); return result; } @@ -272,28 +269,31 @@ public class NanoScheduler { ReduceType sum = initialValue; int i = 0; - // start timer to ensure that both hasNext and next are caught by the timer - if ( TIME_CALLS ) inputTimer.restart(); - while ( inputReader.hasNext() ) { - final InputType input = inputReader.next(); - if ( TIME_CALLS ) inputTimer.stop(); + while ( true ) { + // start timer to ensure that both hasNext and next are caught by the timer + myNSRuntimeProfile.inputTimer.restart(); + if ( ! inputReader.hasNext() ) { + myNSRuntimeProfile.inputTimer.stop(); + break; + } else { + final InputType input = inputReader.next(); + myNSRuntimeProfile.inputTimer.stop(); - // map - if ( TIME_CALLS ) mapTimer.restart(); - final long preMapTime = LOG_MAP_TIMES ? 0 : mapTimer.currentTimeNano(); - final MapType mapValue = map.apply(input); - if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime)); - if ( TIME_CALLS ) mapTimer.stop(); + // map + myNSRuntimeProfile.mapTimer.restart(); + final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano(); + final MapType mapValue = map.apply(input); + if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime)); + myNSRuntimeProfile.mapTimer.stop(); - if ( i++ % inputBufferSize == 0 && progressFunction != null ) - progressFunction.progress(input); + if ( i++ % inputBufferSize == 0 && progressFunction != null ) + progressFunction.progress(input); - // reduce - if ( TIME_CALLS ) reduceTimer.restart(); - sum = reduce.apply(mapValue, sum); - if ( TIME_CALLS ) reduceTimer.stop(); - - if ( TIME_CALLS ) inputTimer.restart(); + // reduce + myNSRuntimeProfile.reduceTimer.restart(); + sum = reduce.apply(mapValue, sum); + myNSRuntimeProfile.reduceTimer.stop(); + } } return sum; @@ -321,11 +321,11 @@ public class NanoScheduler { new LinkedBlockingDeque>>(mapBufferSize); // Start running the input reader thread - inputExecutor.submit(new InputProducer(inputReader, inputTimer, inputQueue)); + inputExecutor.submit(new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue)); // Start running the reducer thread final ReducerThread reducer - = new ReducerThread(reduce, reduceTimer, initialValue, mapResultQueue); + = new ReducerThread(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue); final Future reduceResult = reduceExecutor.submit(reducer); try { @@ -382,10 +382,10 @@ public class NanoScheduler { @Override public MapResult call() { - if ( TIME_CALLS ) mapTimer.restart(); if ( debug ) debugPrint("\t\tmap " + input); + myNSRuntimeProfile.mapTimer.restart(); final MapType result = map.apply(input); - if ( TIME_CALLS ) mapTimer.stop(); + myNSRuntimeProfile.mapTimer.stop(); return new MapResult(result, id); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java index 506e45453..dcdba3490 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java @@ -29,6 +29,7 @@ class ReducerThread implements Callable { final ReduceType sum, final BlockingQueue>> mapResultQueue) { if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); + if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); this.reduce = reduce; @@ -51,9 +52,9 @@ class ReducerThread implements Callable { } else { lastJobID = result.getJobID(); // apply reduce, keeping track of sum - if ( reduceTimer != null ) reduceTimer.restart(); + reduceTimer.restart(); sum = reduce.apply(result.getValue(), sum); - if ( reduceTimer != null ) reduceTimer.stop(); + reduceTimer.stop(); } } } catch (ExecutionException ex) { diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java new file mode 100755 index 000000000..69cf52fc2 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.utils.progressmeter; + +import com.google.java.contract.Invariant; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.*; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; + +/** + * A meter measuring progress on a calculation through a set of genomic regions that can + * print a few key metrics to a logger and optionally to a file + * + * The key information for assessing progress is a set of genome locs describing the total + * set of regions we will process. Whenever (at reasonable intervals) the processing unit + * can called notifyOfProgress and this logger may, depending on the metering delay, print + * a log message with the following metrics: + * + * -- Number of processed X (X = processing units) + * -- Runtime per.1M X + * -- Percent of regions to be processed completed + * -- The estimated total runtime based on previous performance + * -- The estimated time remaining for the entire process + * + * The optional file log an expanded set of metrics in tabular format + * suitable for subsequent analysis in R. + * + * This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent + * threads executing processors will be calling notifyOfProgress() simultaneously and this + * class does (and MUST) properly sort out the timings of logs without interlacing outputs + * because of these threads. + * + * Consequently, the fundamental model for when to print the logs is time based. We basically + * print a meter message every X seconds, minutes, hours, whatever is appropriate based on the + * estimated remaining runtime. + * + * @author depristo + * @since 2010 maybe, but written in 09/12 for clarity + */ +@Invariant({ + "targetSizeInBP >= 0", + "progressPrintFrequency > 0" +}) +public class ProgressMeter { + protected static final Logger logger = Logger.getLogger(ProgressMeter.class); + + // -------------------------------------------------------------------------------- + // static constants controlling overall system behavior + // -------------------------------------------------------------------------------- + + /** + * Min. milliseconds after we start up the meter before we will print our first meter message + */ + private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; + + /** + * How often should we print performance logging information, when we are sending this + * information to a file? Not dynamically updated as the logger meter is. + */ + private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000; + + private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0; + private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0; + + // -------------------------------------------------------------------------------- + // Variables we updating during running + // -------------------------------------------------------------------------------- + + /** + * When was the last time we printed progress log? In milleseconds + */ + private long lastProgressPrintTime = -1; + + /** + * How frequently should we be printing our meter messages? Dynamically updated + * depending on how long we think the run has left. + */ + private long progressPrintFrequency = 10 * 1000; // default value + + /** + * When was the last time we printed to the performance log? In millseconds + */ + private long lastPerformanceLogPrintTime = -1; + + // -------------------------------------------------------------------------------- + // final variables fixed at object creation time + // -------------------------------------------------------------------------------- + + /** + * The set of genome locs describing the total region we are processing with + * this GATK run. Used to determine how close we are to completing the run + */ + private final GenomeLocSortedSet regionsBeingProcessed; + + /** + * Size, in bp, of the area we are processing, derived from regionsBeingProcessed. + * Updated once in the system in initial for performance reasons + */ + private final long targetSizeInBP; + + /** + * A string describing the type of units being processes, so we can say things like + * "we are running at X processingUnitName per second" + */ + private final String processingUnitName; + + /** + * A potentially null file where we print a supplementary, R readable performance log + * file. + */ + private final PrintStream performanceLog; + + /** We use the SimpleTimer to time our run */ + private final SimpleTimer timer = new SimpleTimer(); + + /** + * Create a new ProgressMeter + * + * @param performanceLogFile an optional performance log file where a table of performance logs will be written + * @param processingUnitName the name of the unit type being processed, suitable for saying X seconds per processingUnitName + * @param processingIntervals the intervals being processed + */ + public ProgressMeter(final File performanceLogFile, + final String processingUnitName, + final GenomeLocSortedSet processingIntervals) { + if ( processingUnitName == null ) throw new IllegalArgumentException("processingUnitName cannot be null"); + if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null"); + + this.processingUnitName = processingUnitName; + this.regionsBeingProcessed = processingIntervals; + + // setup the performance logger output, if requested + if ( performanceLogFile != null ) { + try { + this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile)); + final List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", + "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e); + } + } else { + performanceLog = null; + } + + // cached for performance reasons + targetSizeInBP = processingIntervals.coveredSize(); + + // start up the timer + start(); + } + + /** + * Forward request to notifyOfProgress + * + * Assumes that one cycle has been completed + * + * @param loc our current location. Null means "in unmapped reads" + * @param nTotalRecordsProcessed the total number of records we've processed + */ + public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) { + notifyOfProgress(loc, false, nTotalRecordsProcessed); + } + + private synchronized void start() { + timer.start(); + lastProgressPrintTime = timer.currentTime(); + + logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]"); + logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", + "Location", processingUnitName, processingUnitName)); + } + + /** + * Utility routine that prints out process information (including timing) every N records or + * every M seconds, for N and M set in global variables. + * + * Synchronized to ensure that even with multiple threads calling notifyOfProgress we still + * get one clean stream of meter logs. + * + * @param loc Current location, can be null if you are at the end of the processing unit + * @param mustPrint If true, will print out info, regardless of time interval + * @param nTotalRecordsProcessed the total number of records we've processed + */ + private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) { + if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0"); + + final long curTime = timer.currentTime(); + final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); + final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); + + if ( printProgress || printLog ) { + final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed); + + final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds()); + final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); + final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements()); + final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP); + final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted); + final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds()); + + if ( printProgress ) { + lastProgressPrintTime = curTime; + updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds()); + + // a pretty name for our position + final String posName = loc == null + ? (mustPrint ? "done" : "unmapped reads") + : String.format("%s:%d", loc.getContig(), loc.getStart()); + + logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", + posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate, + 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); + + } + + if ( printLog ) { + lastPerformanceLogPrintTime = curTime; + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.getTimeInSeconds(), progressData.getUnitsProcessed(), unitRate.getTimeInSeconds(), + progressData.getBpProcessed(), bpRate.getTimeInSeconds(), + fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(), + timeToCompletion.getTimeInSeconds()); + } + } + } + + /** + * Determine, based on remaining runtime, how often to print the meter + * + * @param totalRuntimeSeconds kinda obvious, no? + */ + private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) { + // dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates + if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS ) + progressPrintFrequency = 60 * 1000; // in milliseconds + else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS ) + progressPrintFrequency = 30 * 1000; // in milliseconds + else + progressPrintFrequency = 10 * 1000; // in milliseconds + } + + /** + * Creates a new ProgressData object recording a snapshot of our progress at this instant + * + * @param loc our current position. If null, assumes we are done traversing + * @param nTotalRecordsProcessed the total number of records we've processed + * @return + */ + private ProgressMeterData takeProgressSnapshot(final GenomeLoc loc, final long nTotalRecordsProcessed) { + // null -> end of processing + final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc); + return new ProgressMeterData(timer.getElapsedTime(), nTotalRecordsProcessed, bpProcessed); + } + + /** + * Should be called when processing is done + */ + public void notifyDone(final long nTotalRecordsProcessed) { + // print out the progress meter + notifyOfProgress(null, true, nTotalRecordsProcessed); + + logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", + timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600)); + + if ( performanceLog != null ) + performanceLog.close(); + } + + /** + * @param curTime (current runtime, in millisecs) + * @param lastPrintTime the last time we printed, in machine milliseconds + * @param printFreq maximum permitted difference between last print and current times + * + * @return true if the maximum interval (in millisecs) has passed since the last printing + */ + private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) { + final long elapsed = curTime - lastPrintTime; + return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS; + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java new file mode 100644 index 000000000..096b55be2 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterData.java @@ -0,0 +1,54 @@ +package org.broadinstitute.sting.utils.progressmeter; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; + +/** + * a snapshot of our performance, suitable for storage and later analysis + */ +class ProgressMeterData { + private final double elapsedSeconds; + private final long unitsProcessed; + private final long bpProcessed; + + @Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"}) + public ProgressMeterData(double elapsedSeconds, long unitsProcessed, long bpProcessed) { + this.elapsedSeconds = elapsedSeconds; + this.unitsProcessed = unitsProcessed; + this.bpProcessed = bpProcessed; + } + + @Ensures("result >= 0.0") + public double getElapsedSeconds() { + return elapsedSeconds; + } + + @Ensures("result >= 0") + public long getUnitsProcessed() { + return unitsProcessed; + } + + @Ensures("result >= 0") + public long getBpProcessed() { + return bpProcessed; + } + + /** How long in seconds to process 1M traversal units? */ + @Ensures("result >= 0.0") + public double secondsPerMillionElements() { + return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1); + } + + /** How long in seconds to process 1M bp on the genome? */ + @Ensures("result >= 0.0") + public double secondsPerMillionBP() { + return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1); + } + + /** What fraction of the target intervals have we covered? */ + @Requires("targetSize >= 0") + @Ensures({"result >= 0.0", "result <= 1.0"}) + public double calculateFractionGenomeTargetCompleted(final long targetSize) { + return (1.0*bpProcessed) / Math.max(targetSize, 1); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java index 475f7de21..9632a687b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java +++ b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialReadsTraversal.java @@ -69,7 +69,7 @@ public class ArtificialReadsTraversal extends TraversalEngine reads, ReadProperties readAttributes) { + private LocusIteratorByStateExperimental makeLTBS(List reads, ReadProperties readAttributes) { return new LocusIteratorByStateExperimental(new FakeCloseableIterator(reads.iterator()), readAttributes, genomeLocParser, LocusIteratorByStateExperimental.sampleListForSAMWithoutReadGroups()); } - private static ReadProperties createTestReadProperties() { - return createTestReadProperties(null); - } - - private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) { - return new ReadProperties( - Collections.emptyList(), - new SAMFileHeader(), - false, - SAMFileReader.ValidationStringency.STRICT, - downsamplingMethod, - new ValidationExclusion(), - Collections.emptyList(), - Collections.emptyList(), - false, - (byte) -1 - ); - } - - private static class FakeCloseableIterator implements CloseableIterator { - Iterator iterator; - - public FakeCloseableIterator(Iterator it) { - iterator = it; - } - - @Override - public void close() { - return; - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public T next() { - return iterator.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Don't remove!"); - } - } - @Test public void testXandEQOperators() { final byte[] bases1 = new byte[] {'A','A','A','A','A','A','A','A','A','A'}; @@ -308,45 +259,36 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { // comprehensive LIBS/PileupElement tests // //////////////////////////////////////////// - private static final int IS_BEFORE_DELETED_BASE_FLAG = 1; - private static final int IS_BEFORE_DELETION_START_FLAG = 2; - private static final int IS_AFTER_DELETED_BASE_FLAG = 4; - private static final int IS_AFTER_DELETION_END_FLAG = 8; - private static final int IS_BEFORE_INSERTION_FLAG = 16; - private static final int IS_AFTER_INSERTION_FLAG = 32; - private static final int IS_NEXT_TO_SOFTCLIP_FLAG = 64; - private static class LIBSTest { final String cigar; final int readLength; - final List offsets; - final List flags; - private LIBSTest(final String cigar, final int readLength, final List offsets, final List flags) { + private LIBSTest(final String cigar, final int readLength) { this.cigar = cigar; this.readLength = readLength; - this.offsets = offsets; - this.flags = flags; } } @DataProvider(name = "LIBSTest") public Object[][] createLIBSTestData() { + + //TODO -- when LIBS is fixed this should be replaced to provide all possible permutations of CIGAR strings + return new Object[][]{ - {new LIBSTest("1I", 1, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))}, - {new LIBSTest("10I", 10, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))}, - {new LIBSTest("2M2I2M", 6, Arrays.asList(0,1,4,5), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG,IS_AFTER_INSERTION_FLAG,0))}, - {new LIBSTest("2M2I", 4, Arrays.asList(0,1), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG))}, + {new LIBSTest("1I", 1)}, + {new LIBSTest("10I", 10)}, + {new LIBSTest("2M2I2M", 6)}, + {new LIBSTest("2M2I", 4)}, //TODO -- uncomment these when LIBS is fixed //{new LIBSTest("2I2M", 4, Arrays.asList(2,3), Arrays.asList(IS_AFTER_INSERTION_FLAG,0))}, //{new LIBSTest("1I1M1D1M", 3, Arrays.asList(0,1), Arrays.asList(IS_AFTER_INSERTION_FLAG | IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG))}, //{new LIBSTest("1S1I1M", 3, Arrays.asList(2), Arrays.asList(IS_AFTER_INSERTION_FLAG))}, - {new LIBSTest("1M2D2M", 3, Arrays.asList(0,1,2), Arrays.asList(IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG,0))}, - {new LIBSTest("1S1M", 2, Arrays.asList(1), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))}, - {new LIBSTest("1M1S", 2, Arrays.asList(0), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))}, - {new LIBSTest("1S1M1I", 3, Arrays.asList(1), Arrays.asList(IS_BEFORE_INSERTION_FLAG | IS_NEXT_TO_SOFTCLIP_FLAG))} + //{new LIBSTest("1M2D2M", 3)}, + {new LIBSTest("1S1M", 2)}, + {new LIBSTest("1M1S", 2)}, + {new LIBSTest("1S1M1I", 3)} }; } @@ -361,26 +303,24 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { // create the iterator by state with the fake reads and fake records li = makeLTBS(Arrays.asList(read), createTestReadProperties()); + final LIBS_position tester = new LIBS_position(read); - int offset = 0; while ( li.hasNext() ) { AlignmentContext alignmentContext = li.next(); ReadBackedPileup p = alignmentContext.getBasePileup(); Assert.assertTrue(p.getNumberOfElements() == 1); PileupElement pe = p.iterator().next(); - final int flag = params.flags.get(offset); - Assert.assertEquals(pe.isBeforeDeletedBase(), (flag & IS_BEFORE_DELETED_BASE_FLAG) != 0); - Assert.assertEquals(pe.isBeforeDeletionStart(), (flag & IS_BEFORE_DELETION_START_FLAG) != 0); - Assert.assertEquals(pe.isAfterDeletedBase(), (flag & IS_AFTER_DELETED_BASE_FLAG) != 0); - Assert.assertEquals(pe.isAfterDeletionEnd(), (flag & IS_AFTER_DELETION_END_FLAG) != 0); - Assert.assertEquals(pe.isBeforeInsertion(), (flag & IS_BEFORE_INSERTION_FLAG) != 0); - Assert.assertEquals(pe.isAfterInsertion(), (flag & IS_AFTER_INSERTION_FLAG) != 0); - Assert.assertEquals(pe.isNextToSoftClip(), (flag & IS_NEXT_TO_SOFTCLIP_FLAG) != 0); + tester.stepForwardOnGenome(); - Assert.assertEquals(pe.getOffset(), params.offsets.get(offset).intValue()); - - offset++; + Assert.assertEquals(pe.isBeforeDeletedBase(), tester.isBeforeDeletedBase); + Assert.assertEquals(pe.isBeforeDeletionStart(), tester.isBeforeDeletionStart); + Assert.assertEquals(pe.isAfterDeletedBase(), tester.isAfterDeletedBase); + Assert.assertEquals(pe.isAfterDeletionEnd(), tester.isAfterDeletionEnd); + Assert.assertEquals(pe.isBeforeInsertion(), tester.isBeforeInsertion); + Assert.assertEquals(pe.isAfterInsertion(), tester.isAfterInsertion); + Assert.assertEquals(pe.isNextToSoftClip(), tester.isNextToSoftClip); + Assert.assertEquals(pe.getOffset(), tester.getCurrentReadOffset()); } } @@ -543,4 +483,165 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest { test.run(); } + + /////////////////////////////////////// + // End Read State Manager Tests // + /////////////////////////////////////// + + + + /////////////////////////////////////// + // Helper methods / classes // + /////////////////////////////////////// + + private static ReadProperties createTestReadProperties() { + return createTestReadProperties(null); + } + + private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) { + return new ReadProperties( + Collections.emptyList(), + new SAMFileHeader(), + false, + SAMFileReader.ValidationStringency.STRICT, + downsamplingMethod, + new ValidationExclusion(), + Collections.emptyList(), + Collections.emptyList(), + false, + (byte) -1 + ); + } + + private static class FakeCloseableIterator implements CloseableIterator { + Iterator iterator; + + public FakeCloseableIterator(Iterator it) { + iterator = it; + } + + @Override + public void close() {} + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Don't remove!"); + } + } + + private static final class LIBS_position { + + SAMRecord read; + + final int numOperators; + int currentOperatorIndex = 0; + int currentPositionOnOperator = 0; + int currentReadOffset = 0; + + boolean isBeforeDeletionStart = false; + boolean isBeforeDeletedBase = false; + boolean isAfterDeletionEnd = false; + boolean isAfterDeletedBase = false; + boolean isBeforeInsertion = false; + boolean isAfterInsertion = false; + boolean isNextToSoftClip = false; + + boolean sawMop = false; + + public LIBS_position(final SAMRecord read) { + this.read = read; + numOperators = read.getCigar().numCigarElements(); + } + + public int getCurrentReadOffset() { + return Math.max(0, currentReadOffset - 1); + } + + /** + * Steps forward on the genome. Returns false when done reading the read, true otherwise. + */ + public boolean stepForwardOnGenome() { + if ( currentOperatorIndex == numOperators ) + return false; + + CigarElement curElement = read.getCigar().getCigarElement(currentOperatorIndex); + if ( currentPositionOnOperator >= curElement.getLength() ) { + if ( ++currentOperatorIndex == numOperators ) + return false; + + curElement = read.getCigar().getCigarElement(currentOperatorIndex); + currentPositionOnOperator = 0; + } + + switch ( curElement.getOperator() ) { + case I: // insertion w.r.t. the reference + if ( !sawMop ) + break; + case S: // soft clip + currentReadOffset += curElement.getLength(); + case H: // hard clip + case P: // padding + currentOperatorIndex++; + return stepForwardOnGenome(); + + case D: // deletion w.r.t. the reference + case N: // reference skip (looks and gets processed just like a "deletion", just different logical meaning) + currentPositionOnOperator++; + break; + + case M: + case EQ: + case X: + sawMop = true; + currentReadOffset++; + currentPositionOnOperator++; + break; + default: + throw new IllegalStateException("No support for cigar op: " + curElement.getOperator()); + } + + final boolean isFirstOp = currentOperatorIndex == 0; + final boolean isLastOp = currentOperatorIndex == numOperators - 1; + final boolean isFirstBaseOfOp = currentPositionOnOperator == 1; + final boolean isLastBaseOfOp = currentPositionOnOperator == curElement.getLength(); + + isBeforeDeletionStart = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isLastOp, isLastBaseOfOp); + isBeforeDeletedBase = isBeforeDeletionStart || (!isLastBaseOfOp && curElement.getOperator() == CigarOperator.D); + isAfterDeletionEnd = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isFirstOp, isFirstBaseOfOp); + isAfterDeletedBase = isAfterDeletionEnd || (!isFirstBaseOfOp && curElement.getOperator() == CigarOperator.D); + isBeforeInsertion = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isLastOp, isLastBaseOfOp) + || (!sawMop && curElement.getOperator() == CigarOperator.I); + isAfterInsertion = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isFirstOp, isFirstBaseOfOp); + isNextToSoftClip = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isLastOp, isLastBaseOfOp) + || isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isFirstOp, isFirstBaseOfOp); + + return true; + } + + private static boolean isBeforeOp(final Cigar cigar, + final int currentOperatorIndex, + final CigarOperator op, + final boolean isLastOp, + final boolean isLastBaseOfOp) { + return !isLastOp && isLastBaseOfOp && cigar.getCigarElement(currentOperatorIndex+1).getOperator() == op; + } + + private static boolean isAfterOp(final Cigar cigar, + final int currentOperatorIndex, + final CigarOperator op, + final boolean isFirstOp, + final boolean isFirstBaseOfOp) { + return !isFirstOp && isFirstBaseOfOp && cigar.getCigarElement(currentOperatorIndex-1).getOperator() == op; + } + } } diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsUnitTest.java index 7845515d8..46f77c283 100755 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsUnitTest.java @@ -121,8 +121,6 @@ public class TraverseReadsUnitTest extends BaseTest { Object accumulator = countReadWalker.reduceInit(); for(Shard shard: shardStrategy) { - traversalEngine.startTimersIfNecessary(); - if (shard == null) { fail("Shard == null"); } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index b3365c13c..b3986e74e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -42,7 +43,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(queueSize); - final InputProducer ip = new InputProducer(elements.iterator(), null, readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(ip); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 47dcc1d5e..a0ab493c1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.BasicConfigurator; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -86,7 +87,7 @@ public class NanoSchedulerUnitTest extends BaseTest { static NanoSchedulerBasicTest exampleTest = null; @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { - for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000, 10000000) ) { for ( final int nt : Arrays.asList(1, 2, 4) ) { for ( final int start : Arrays.asList(0) ) { for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) { @@ -114,6 +115,7 @@ public class NanoSchedulerUnitTest extends BaseTest { } private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final SimpleTimer timer = new SimpleTimer().start(); final NanoScheduler nanoScheduler = new NanoScheduler(test.bufferSize, test.nThreads); @@ -129,6 +131,17 @@ public class NanoSchedulerUnitTest extends BaseTest { Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks); nanoScheduler.shutdown(); + + // TODO -- need to enable only in the case where there's serious time spend in + // TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up + final double myTimeEstimate = timer.getElapsedTime(); + final double tolerance = 0.1; + if ( false && myTimeEstimate > 0.1 ) { + Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance, + "NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime() + + " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of " + + tolerance); + } } @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME) diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java index 61d1330bc..08771e9ec 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -61,7 +62,7 @@ public class ReducerThreadUnitTest extends BaseTest { final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); final ReducerThread thread - = new ReducerThread(reduce, null, 0, mapResultsQueue); + = new ReducerThread(reduce, new SimpleTimer(), 0, mapResultsQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); final Future value = es.submit(thread);