Merge branch 'master' of ssh://gsa3.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable
This commit is contained in:
commit
2ea28499e2
|
|
@ -335,31 +335,27 @@ public class LikelihoodCalculationEngine {
|
||||||
final GATKSAMRecord read = readsForThisSample.get(iii); // BUGBUG: assumes read order in this list and haplotype likelihood list are the same!
|
final GATKSAMRecord read = readsForThisSample.get(iii); // BUGBUG: assumes read order in this list and haplotype likelihood list are the same!
|
||||||
// only count the read if it overlaps the event, otherwise it is not added to the output read list at all
|
// only count the read if it overlaps the event, otherwise it is not added to the output read list at all
|
||||||
if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) {
|
if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) {
|
||||||
final double likelihoods[] = new double[call.getFirst().getAlleles().size()];
|
|
||||||
int count = 0;
|
|
||||||
|
|
||||||
for( final Allele a : call.getFirst().getAlleles() ) {
|
for( final Allele a : call.getFirst().getAlleles() ) {
|
||||||
|
double maxLikelihood = Double.NEGATIVE_INFINITY;
|
||||||
for( final Haplotype h : call.getSecond().get(a) ) { // use the max likelihood from all the haplotypes which mapped to this allele (achieved via the haplotype mapper object)
|
for( final Haplotype h : call.getSecond().get(a) ) { // use the max likelihood from all the haplotypes which mapped to this allele (achieved via the haplotype mapper object)
|
||||||
final double likelihood = h.getReadLikelihoods(sample.getKey())[iii];
|
final double likelihood = h.getReadLikelihoods(sample.getKey())[iii];
|
||||||
likelihoodMap.add(read, a, likelihood);
|
if( likelihood > maxLikelihood ) {
|
||||||
|
maxLikelihood = likelihood;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
likelihoodMap.add(read, a, maxLikelihood);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* // add all filtered reads to the NO_CALL list because they weren't given any likelihoods
|
// add all filtered reads to the NO_CALL list because they weren't given any likelihoods
|
||||||
List<GATKSAMRecord> readList = alleleReadMap.get(Allele.NO_CALL);
|
for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) {
|
||||||
if( readList == null ) {
|
|
||||||
readList = new ArrayList<GATKSAMRecord>();
|
|
||||||
alleleReadMap.put(Allele.NO_CALL, readList);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
/* for( final GATKSAMRecord read : perSampleFilteredReadList.get(sample.getKey()) ) {
|
|
||||||
// only count the read if it overlaps the event, otherwise it is not added to the output read list at all
|
// only count the read if it overlaps the event, otherwise it is not added to the output read list at all
|
||||||
if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) {
|
if( callLoc.overlapsP(parser.createGenomeLoc(read)) ) {
|
||||||
readList.add(read);
|
for( final Allele a : call.getFirst().getAlleles() )
|
||||||
|
likelihoodMap.add(read,a,0.0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
returnMap.put(sample.getKey(), likelihoodMap);
|
returnMap.put(sample.getKey(), likelihoodMap);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,17 +21,17 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerMultiSample() {
|
public void testHaplotypeCallerMultiSample() {
|
||||||
HCTest(CEUTRIO_BAM, "", "e5b4a0627a1d69b9356f8a7cd2260e89");
|
HCTest(CEUTRIO_BAM, "", "5b751474ad0aef4cdb53f094e605f97c");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerSingleSample() {
|
public void testHaplotypeCallerSingleSample() {
|
||||||
HCTest(NA12878_BAM, "", "202d5b6edaf74f411c170099749f202f");
|
HCTest(NA12878_BAM, "", "60efcd2d2722087e900f6365985d18bf");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerMultiSampleGGA() {
|
public void testHaplotypeCallerMultiSampleGGA() {
|
||||||
HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "561931ba3919808ec471e745cb3148c7");
|
HCTest(CEUTRIO_BAM, "-gt_mode GENOTYPE_GIVEN_ALLELES -alleles " + validationDataLocation + "combined.phase1.chr20.raw.indels.sites.vcf", "71bec55320a2f07af0d54be9d7735322");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HCTestComplexVariants(String bam, String args, String md5) {
|
private void HCTestComplexVariants(String bam, String args, String md5) {
|
||||||
|
|
@ -42,7 +42,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerMultiSampleComplex() {
|
public void testHaplotypeCallerMultiSampleComplex() {
|
||||||
HCTestComplexVariants(CEUTRIO_BAM, "", "3424b398a9f47c8ac606a5c56eb7d8a7");
|
HCTestComplexVariants(CEUTRIO_BAM, "", "f5a809e3fbd9998f79b75bb2973209e1");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HCTestSymbolicVariants(String bam, String args, String md5) {
|
private void HCTestSymbolicVariants(String bam, String args, String md5) {
|
||||||
|
|
@ -53,7 +53,7 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerSingleSampleSymbolic() {
|
public void testHaplotypeCallerSingleSampleSymbolic() {
|
||||||
HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "b71cfaea9390136c584c9671b149d573");
|
HCTestSymbolicVariants(NA12878_CHR20_BAM, "", "8043b0451a4384e678a93600b34afce7");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HCTestIndelQualityScores(String bam, String args, String md5) {
|
private void HCTestIndelQualityScores(String bam, String args, String md5) {
|
||||||
|
|
@ -64,13 +64,13 @@ public class HaplotypeCallerIntegrationTest extends WalkerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHaplotypeCallerSingleSampleIndelQualityScores() {
|
public void testHaplotypeCallerSingleSampleIndelQualityScores() {
|
||||||
HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "e1f88fac91424740c0eaac1de48b3970");
|
HCTestIndelQualityScores(NA12878_RECALIBRATED_BAM, "", "ea6539e05faf10ffaf76f2d16907c47a");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void HCTestProblematicReadsModifiedInActiveRegions() {
|
public void HCTestProblematicReadsModifiedInActiveRegions() {
|
||||||
final String base = String.format("-T HaplotypeCaller -R %s -I %s", REF, privateTestDir + "haplotype-problem-4.bam") + " --no_cmdline_in_header -o %s -minPruning 3";
|
final String base = String.format("-T HaplotypeCaller -R %s -I %s", REF, privateTestDir + "haplotype-problem-4.bam") + " --no_cmdline_in_header -o %s -minPruning 3";
|
||||||
final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("000fd36d5cf8090386bb2ac15e3ab0b5"));
|
final WalkerTestSpec spec = new WalkerTestSpec(base, Arrays.asList("8d092b25f40456e618eef91fdce8adf0"));
|
||||||
executeTest("HCTestProblematicReadsModifiedInActiveRegions: ", spec);
|
executeTest("HCTestProblematicReadsModifiedInActiveRegions: ", spec);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,13 @@ gsa.read.gatkreportv1 <- function(lines) {
|
||||||
headerRowCount = -1;
|
headerRowCount = -1;
|
||||||
|
|
||||||
finishTable <- function() {
|
finishTable <- function() {
|
||||||
.gsa.assignGATKTableToEnvironment(tableName, tableHeader, tableRows[1:rowCount,], tableEnv);
|
if ( rowCount == 1 )
|
||||||
|
# good I hate R. Work around to avoid collapsing into an unstructured vector when
|
||||||
|
# there's only 1 row
|
||||||
|
sub <- t(as.matrix(tableRows[1:rowCount,]))
|
||||||
|
else
|
||||||
|
sub <- tableRows[1:rowCount,]
|
||||||
|
.gsa.assignGATKTableToEnvironment(tableName, tableHeader, sub, tableEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (line in lines) {
|
for (line in lines) {
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
package org.broadinstitute.sting.gatk;
|
package org.broadinstitute.sting.gatk;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
import net.sf.picard.reference.IndexedFastaSequenceFile;
|
import net.sf.picard.reference.IndexedFastaSequenceFile;
|
||||||
import net.sf.picard.reference.ReferenceSequenceFile;
|
import net.sf.picard.reference.ReferenceSequenceFile;
|
||||||
import net.sf.samtools.SAMFileHeader;
|
import net.sf.samtools.SAMFileHeader;
|
||||||
|
|
@ -682,14 +683,14 @@ public class GenomeAnalysisEngine {
|
||||||
|
|
||||||
// if include argument isn't given, create new set of all possible intervals
|
// if include argument isn't given, create new set of all possible intervals
|
||||||
|
|
||||||
Pair<GenomeLocSortedSet, GenomeLocSortedSet> includeExcludePair = IntervalUtils.parseIntervalBindingsPair(
|
final Pair<GenomeLocSortedSet, GenomeLocSortedSet> includeExcludePair = IntervalUtils.parseIntervalBindingsPair(
|
||||||
this.referenceDataSource,
|
this.referenceDataSource,
|
||||||
argCollection.intervals,
|
argCollection.intervals,
|
||||||
argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding,
|
argCollection.intervalSetRule, argCollection.intervalMerging, argCollection.intervalPadding,
|
||||||
argCollection.excludeIntervals);
|
argCollection.excludeIntervals);
|
||||||
|
|
||||||
GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst();
|
final GenomeLocSortedSet includeSortedSet = includeExcludePair.getFirst();
|
||||||
GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond();
|
final GenomeLocSortedSet excludeSortedSet = includeExcludePair.getSecond();
|
||||||
|
|
||||||
// if no exclude arguments, can return parseIntervalArguments directly
|
// if no exclude arguments, can return parseIntervalArguments directly
|
||||||
if ( excludeSortedSet == null )
|
if ( excludeSortedSet == null )
|
||||||
|
|
@ -700,13 +701,15 @@ public class GenomeAnalysisEngine {
|
||||||
intervals = includeSortedSet.subtractRegions(excludeSortedSet);
|
intervals = includeSortedSet.subtractRegions(excludeSortedSet);
|
||||||
|
|
||||||
// logging messages only printed when exclude (-XL) arguments are given
|
// logging messages only printed when exclude (-XL) arguments are given
|
||||||
long toPruneSize = includeSortedSet.coveredSize();
|
final long toPruneSize = includeSortedSet.coveredSize();
|
||||||
long toExcludeSize = excludeSortedSet.coveredSize();
|
final long toExcludeSize = excludeSortedSet.coveredSize();
|
||||||
long intervalSize = intervals.coveredSize();
|
final long intervalSize = intervals.coveredSize();
|
||||||
logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize));
|
logger.info(String.format("Initial include intervals span %d loci; exclude intervals span %d loci", toPruneSize, toExcludeSize));
|
||||||
logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)",
|
logger.info(String.format("Excluding %d loci from original intervals (%.2f%% reduction)",
|
||||||
toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize)));
|
toPruneSize - intervalSize, (toPruneSize - intervalSize) / (0.01 * toPruneSize)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info(String.format("Processing %d bp from intervals", intervals.coveredSize()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -981,6 +984,22 @@ public class GenomeAnalysisEngine {
|
||||||
return this.intervals;
|
return this.intervals;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of regions of the genome being processed. If the user
|
||||||
|
* requested specific intervals, return those, otherwise return regions
|
||||||
|
* corresponding to the entire genome. Never returns null.
|
||||||
|
*
|
||||||
|
* @return a non-null set of intervals being processed
|
||||||
|
*/
|
||||||
|
@Ensures("result != null")
|
||||||
|
public GenomeLocSortedSet getRegionsOfGenomeBeingProcessed() {
|
||||||
|
if ( getIntervals() == null )
|
||||||
|
// if we don't have any intervals defined, create intervals from the reference itself
|
||||||
|
return GenomeLocSortedSet.createSetFromSequenceDictionary(getReferenceDataSource().getReference().getSequenceDictionary());
|
||||||
|
else
|
||||||
|
return getIntervals();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the list of filters employed by this engine.
|
* Gets the list of filters employed by this engine.
|
||||||
* @return Collection of filters (actual instances) used by this engine.
|
* @return Collection of filters (actual instances) used by this engine.
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,10 @@ import org.broadinstitute.sting.utils.GenomeLoc;
|
||||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
@ -149,7 +152,12 @@ public class ReadShard extends Shard {
|
||||||
if ( read.getAlignmentEnd() > stop ) stop = read.getAlignmentEnd();
|
if ( read.getAlignmentEnd() > stop ) stop = read.getAlignmentEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
return parser.createGenomeLoc(contig, start, stop);
|
assert contig != null;
|
||||||
|
|
||||||
|
if ( contig.equals("*") ) // all reads are unmapped
|
||||||
|
return GenomeLoc.UNMAPPED;
|
||||||
|
else
|
||||||
|
return parser.createGenomeLoc(contig, start, stop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -107,7 +107,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
|
|
||||||
this.traversalTasks = shardStrategy.iterator();
|
this.traversalTasks = shardStrategy.iterator();
|
||||||
|
|
||||||
ReduceTree reduceTree = new ReduceTree(this);
|
final ReduceTree reduceTree = new ReduceTree(this);
|
||||||
initializeWalker(walker);
|
initializeWalker(walker);
|
||||||
|
|
||||||
while (isShardTraversePending() || isTreeReducePending()) {
|
while (isShardTraversePending() || isTreeReducePending()) {
|
||||||
|
|
@ -186,7 +186,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
outputTracker.bypassThreadLocalStorage(true);
|
outputTracker.bypassThreadLocalStorage(true);
|
||||||
try {
|
try {
|
||||||
walker.onTraversalDone(result);
|
walker.onTraversalDone(result);
|
||||||
printOnTraversalDone(result);
|
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
outputTracker.bypassThreadLocalStorage(false);
|
outputTracker.bypassThreadLocalStorage(false);
|
||||||
|
|
@ -302,17 +301,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
if (!traversalTasks.hasNext())
|
if (!traversalTasks.hasNext())
|
||||||
throw new IllegalStateException("Cannot traverse; no pending traversals exist.");
|
throw new IllegalStateException("Cannot traverse; no pending traversals exist.");
|
||||||
|
|
||||||
Shard shard = traversalTasks.next();
|
final Shard shard = traversalTasks.next();
|
||||||
|
|
||||||
// todo -- add ownership claim here
|
// todo -- add ownership claim here
|
||||||
|
|
||||||
ShardTraverser traverser = new ShardTraverser(this,
|
final ShardTraverser traverser = new ShardTraverser(this, walker, shard, outputTracker);
|
||||||
traversalEngine,
|
|
||||||
walker,
|
|
||||||
shard,
|
|
||||||
outputTracker);
|
|
||||||
|
|
||||||
Future traverseResult = threadPool.submit(traverser);
|
final Future traverseResult = threadPool.submit(traverser);
|
||||||
|
|
||||||
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
||||||
reduceTree.addEntry(traverseResult);
|
reduceTree.addEntry(traverseResult);
|
||||||
|
|
@ -327,7 +322,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
protected void queueNextTreeReduce( Walker walker ) {
|
protected void queueNextTreeReduce( Walker walker ) {
|
||||||
if (reduceTasks.size() == 0)
|
if (reduceTasks.size() == 0)
|
||||||
throw new IllegalStateException("Cannot reduce; no pending reduces exist.");
|
throw new IllegalStateException("Cannot reduce; no pending reduces exist.");
|
||||||
TreeReduceTask reducer = reduceTasks.remove();
|
final TreeReduceTask reducer = reduceTasks.remove();
|
||||||
reducer.setWalker((TreeReducible) walker);
|
reducer.setWalker((TreeReducible) walker);
|
||||||
|
|
||||||
threadPool.submit(reducer);
|
threadPool.submit(reducer);
|
||||||
|
|
@ -335,7 +330,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
|
|
||||||
/** Blocks until a free slot appears in the thread queue. */
|
/** Blocks until a free slot appears in the thread queue. */
|
||||||
protected void waitForFreeQueueSlot() {
|
protected void waitForFreeQueueSlot() {
|
||||||
ThreadPoolMonitor monitor = new ThreadPoolMonitor();
|
final ThreadPoolMonitor monitor = new ThreadPoolMonitor();
|
||||||
synchronized (monitor) {
|
synchronized (monitor) {
|
||||||
threadPool.submit(monitor);
|
threadPool.submit(monitor);
|
||||||
monitor.watch();
|
monitor.watch();
|
||||||
|
|
@ -347,8 +342,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
*
|
*
|
||||||
* @return A new, composite future of the result of this reduce.
|
* @return A new, composite future of the result of this reduce.
|
||||||
*/
|
*/
|
||||||
public Future notifyReduce( Future lhs, Future rhs ) {
|
public Future notifyReduce( final Future lhs, final Future rhs ) {
|
||||||
TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs));
|
final TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs));
|
||||||
reduceTasks.add(reducer);
|
reduceTasks.add(reducer);
|
||||||
return reducer;
|
return reducer;
|
||||||
}
|
}
|
||||||
|
|
@ -376,7 +371,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
return this.error;
|
return this.error;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final RuntimeException toRuntimeException(final Throwable error) {
|
private RuntimeException toRuntimeException(final Throwable error) {
|
||||||
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
|
// If the error is already a Runtime, pass it along as is. Otherwise, wrap it.
|
||||||
if (error instanceof RuntimeException)
|
if (error instanceof RuntimeException)
|
||||||
return (RuntimeException)error;
|
return (RuntimeException)error;
|
||||||
|
|
@ -387,7 +382,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
||||||
|
|
||||||
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
|
/** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
|
||||||
private class TreeReduceTask extends FutureTask {
|
private class TreeReduceTask extends FutureTask {
|
||||||
private TreeReducer treeReducer = null;
|
final private TreeReducer treeReducer;
|
||||||
|
|
||||||
public TreeReduceTask( TreeReducer treeReducer ) {
|
public TreeReduceTask( TreeReducer treeReducer ) {
|
||||||
super(treeReducer);
|
super(treeReducer);
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive;
|
||||||
* An interface for retrieving runtime statistics about how the hierarchical
|
* An interface for retrieving runtime statistics about how the hierarchical
|
||||||
* microscheduler is behaving.
|
* microscheduler is behaving.
|
||||||
*/
|
*/
|
||||||
public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean {
|
public interface HierarchicalMicroSchedulerMBean {
|
||||||
/**
|
/**
|
||||||
* How many tree reduces are waiting in the tree reduce queue?
|
* How many tree reduces are waiting in the tree reduce queue?
|
||||||
* @return Total number of reduces waiting in the tree reduce queue?
|
* @return Total number of reduces waiting in the tree reduce queue?
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
|
||||||
import org.broadinstitute.sting.gatk.io.DirectOutputTracker;
|
import org.broadinstitute.sting.gatk.io.DirectOutputTracker;
|
||||||
import org.broadinstitute.sting.gatk.io.OutputTracker;
|
import org.broadinstitute.sting.gatk.io.OutputTracker;
|
||||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||||
|
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||||
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
|
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
import org.broadinstitute.sting.utils.SampleUtils;
|
import org.broadinstitute.sting.utils.SampleUtils;
|
||||||
|
|
@ -60,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
boolean done = walker.isDone();
|
boolean done = walker.isDone();
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
|
||||||
traversalEngine.startTimersIfNecessary();
|
final TraversalEngine traversalEngine = borrowTraversalEngine();
|
||||||
for (Shard shard : shardStrategy ) {
|
for (Shard shard : shardStrategy ) {
|
||||||
if ( done || shard == null ) // we ran out of shards that aren't owned
|
if ( done || shard == null ) // we ran out of shards that aren't owned
|
||||||
break;
|
break;
|
||||||
|
|
@ -95,9 +96,8 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
|
|
||||||
Object result = accumulator.finishTraversal();
|
Object result = accumulator.finishTraversal();
|
||||||
|
|
||||||
printOnTraversalDone(result);
|
|
||||||
|
|
||||||
outputTracker.close();
|
outputTracker.close();
|
||||||
|
returnTraversalEngine(traversalEngine);
|
||||||
cleanup();
|
cleanup();
|
||||||
executionIsDone();
|
executionIsDone();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,11 @@
|
||||||
|
|
||||||
package org.broadinstitute.sting.gatk.executive;
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
import net.sf.picard.reference.IndexedFastaSequenceFile;
|
import net.sf.picard.reference.IndexedFastaSequenceFile;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
|
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
|
||||||
|
import org.broadinstitute.sting.gatk.ReadMetrics;
|
||||||
import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
|
import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
|
||||||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||||
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
|
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
|
||||||
|
|
@ -37,15 +39,22 @@ import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||||
import org.broadinstitute.sting.gatk.traversals.*;
|
import org.broadinstitute.sting.gatk.traversals.*;
|
||||||
import org.broadinstitute.sting.gatk.walkers.*;
|
import org.broadinstitute.sting.gatk.walkers.*;
|
||||||
|
import org.broadinstitute.sting.utils.MathUtils;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||||
|
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
||||||
|
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
||||||
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
||||||
|
|
||||||
import javax.management.JMException;
|
import javax.management.JMException;
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
import java.io.File;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -55,14 +64,36 @@ import java.util.Collection;
|
||||||
* Time: 12:37:23 PM
|
* Time: 12:37:23 PM
|
||||||
*
|
*
|
||||||
* General base class for all scheduling algorithms
|
* General base class for all scheduling algorithms
|
||||||
|
* Shards and schedules data in manageable chunks.
|
||||||
|
*
|
||||||
|
* Creates N TraversalEngines for each data thread for the MicroScheduler. This is necessary
|
||||||
|
* because in the HMS case you have multiple threads executing a traversal engine independently, and
|
||||||
|
* these engines may need to create separate resources for efficiency or implementation reasons. For example,
|
||||||
|
* the nanoScheduler creates threads to implement the traversal, and this creation is instance specific.
|
||||||
|
* So each HMS thread needs to have it's own distinct copy of the traversal engine if it wants to have
|
||||||
|
* N data threads x M nano threads => N * M threads total. These are borrowed from this microscheduler
|
||||||
|
* and returned when done. Also allows us to tracks all created traversal engines so this microscheduler
|
||||||
|
* can properly shut them all down when the scheduling is done.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/** Shards and schedules data in manageable chunks. */
|
|
||||||
public abstract class MicroScheduler implements MicroSchedulerMBean {
|
public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
// TODO -- remove me and retire non nano scheduled versions of traversals
|
// TODO -- remove me and retire non nano scheduled versions of traversals
|
||||||
private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true;
|
private final static boolean USE_NANOSCHEDULER_FOR_EVERYTHING = true;
|
||||||
protected static final Logger logger = Logger.getLogger(MicroScheduler.class);
|
protected static final Logger logger = Logger.getLogger(MicroScheduler.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The list of all Traversal engines we've created in this micro scheduler
|
||||||
|
*/
|
||||||
|
final List<TraversalEngine> allCreatedTraversalEngines = new LinkedList<TraversalEngine>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All available engines. Engines are borrowed and returned when a subclass is actually
|
||||||
|
* going to execute the engine on some data. This allows us to have N copies for
|
||||||
|
* N data parallel executions, but without the dangerous code of having local
|
||||||
|
* ThreadLocal variables.
|
||||||
|
*/
|
||||||
|
final LinkedList<TraversalEngine> availableTraversalEngines = new LinkedList<TraversalEngine>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Counts the number of instances of the class that are currently alive.
|
* Counts the number of instances of the class that are currently alive.
|
||||||
*/
|
*/
|
||||||
|
|
@ -73,7 +104,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
*/
|
*/
|
||||||
protected final GenomeAnalysisEngine engine;
|
protected final GenomeAnalysisEngine engine;
|
||||||
|
|
||||||
protected final TraversalEngine traversalEngine;
|
|
||||||
protected final IndexedFastaSequenceFile reference;
|
protected final IndexedFastaSequenceFile reference;
|
||||||
|
|
||||||
private final SAMDataSource reads;
|
private final SAMDataSource reads;
|
||||||
|
|
@ -89,6 +119,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
*/
|
*/
|
||||||
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
|
ThreadEfficiencyMonitor threadEfficiencyMonitor = null;
|
||||||
|
|
||||||
|
final ProgressMeter progressMeter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
||||||
* selected walker.
|
* selected walker.
|
||||||
|
|
@ -103,11 +135,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
*/
|
*/
|
||||||
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
|
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
|
||||||
if ( threadAllocation.isRunningInParallelMode() ) {
|
if ( threadAllocation.isRunningInParallelMode() ) {
|
||||||
// TODO -- remove me when we fix running NCT within HMS
|
|
||||||
if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
|
|
||||||
throw new UserException("Currently the GATK does not support running CPU threads within data threads, " +
|
|
||||||
"please specify only one of NT and NCT");
|
|
||||||
|
|
||||||
logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)",
|
logger.info(String.format("Running the GATK in parallel mode with %d CPU thread(s) for each of %d data thread(s)",
|
||||||
threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads()));
|
threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads()));
|
||||||
}
|
}
|
||||||
|
|
@ -154,25 +181,25 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
this.reference = reference;
|
this.reference = reference;
|
||||||
this.rods = rods;
|
this.rods = rods;
|
||||||
|
|
||||||
if (walker instanceof ReadWalker) {
|
final File progressLogFile = engine.getArguments() == null ? null : engine.getArguments().performanceLog;
|
||||||
traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1
|
|
||||||
? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread())
|
|
||||||
: new TraverseReads();
|
|
||||||
} else if (walker instanceof LocusWalker) {
|
|
||||||
traversalEngine = USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1
|
|
||||||
? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread())
|
|
||||||
: new TraverseLociLinear();
|
|
||||||
} else if (walker instanceof DuplicateWalker) {
|
|
||||||
traversalEngine = new TraverseDuplicates();
|
|
||||||
} else if (walker instanceof ReadPairWalker) {
|
|
||||||
traversalEngine = new TraverseReadPairs();
|
|
||||||
} else if (walker instanceof ActiveRegionWalker) {
|
|
||||||
traversalEngine = new TraverseActiveRegions();
|
|
||||||
} else {
|
|
||||||
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
|
|
||||||
}
|
|
||||||
|
|
||||||
traversalEngine.initialize(engine);
|
// Creates uninitialized TraversalEngines appropriate for walker and threadAllocation,
|
||||||
|
// and adds it to the list of created engines for later shutdown.
|
||||||
|
for ( int i = 0; i < threadAllocation.getNumDataThreads(); i++ ) {
|
||||||
|
final TraversalEngine traversalEngine = createTraversalEngine(walker, threadAllocation);
|
||||||
|
allCreatedTraversalEngines.add(traversalEngine);
|
||||||
|
availableTraversalEngines.add(traversalEngine);
|
||||||
|
}
|
||||||
|
logger.info("Creating " + threadAllocation.getNumDataThreads() + " traversal engines");
|
||||||
|
|
||||||
|
// Create our progress meter
|
||||||
|
this.progressMeter = new ProgressMeter(progressLogFile,
|
||||||
|
availableTraversalEngines.peek().getTraversalUnits(),
|
||||||
|
engine.getRegionsOfGenomeBeingProcessed());
|
||||||
|
|
||||||
|
// Now that we have a progress meter, go through and initialize the traversal engines
|
||||||
|
for ( final TraversalEngine traversalEngine : allCreatedTraversalEngines )
|
||||||
|
traversalEngine.initialize(engine, progressMeter);
|
||||||
|
|
||||||
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
|
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
|
||||||
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
|
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
|
||||||
|
|
@ -188,6 +215,35 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Really make us a traversal engine of the appropriate type for walker and thread allocation
|
||||||
|
*
|
||||||
|
* @return a non-null uninitialized traversal engine
|
||||||
|
*/
|
||||||
|
@Ensures("result != null")
|
||||||
|
private TraversalEngine createTraversalEngine(final Walker walker, final ThreadAllocation threadAllocation) {
|
||||||
|
if (walker instanceof ReadWalker) {
|
||||||
|
if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 )
|
||||||
|
return new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread());
|
||||||
|
else
|
||||||
|
return new TraverseReads();
|
||||||
|
} else if (walker instanceof LocusWalker) {
|
||||||
|
if ( USE_NANOSCHEDULER_FOR_EVERYTHING || threadAllocation.getNumCPUThreadsPerDataThread() > 1 )
|
||||||
|
return new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread());
|
||||||
|
else
|
||||||
|
return new TraverseLociLinear();
|
||||||
|
} else if (walker instanceof DuplicateWalker) {
|
||||||
|
return new TraverseDuplicates();
|
||||||
|
} else if (walker instanceof ReadPairWalker) {
|
||||||
|
return new TraverseReadPairs();
|
||||||
|
} else if (walker instanceof ActiveRegionWalker) {
|
||||||
|
return new TraverseActiveRegions();
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the ThreadEfficiencyMonitor we are using to track our resource utilization, if there is one
|
* Return the ThreadEfficiencyMonitor we are using to track our resource utilization, if there is one
|
||||||
*
|
*
|
||||||
|
|
@ -231,18 +287,14 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator();
|
return (!reads.isEmpty()) ? reads.seek(shard) : new NullSAMIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Print summary information for the analysis.
|
|
||||||
* @param sum The final reduce output.
|
|
||||||
*/
|
|
||||||
protected void printOnTraversalDone(Object sum) {
|
|
||||||
traversalEngine.printOnTraversalDone();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Must be called by subclasses when execute is done
|
* Must be called by subclasses when execute is done
|
||||||
*/
|
*/
|
||||||
protected void executionIsDone() {
|
protected void executionIsDone() {
|
||||||
|
progressMeter.notifyDone(engine.getCumulativeMetrics().getNumIterations());
|
||||||
|
printReadFilteringStats();
|
||||||
|
shutdownTraversalEngines();
|
||||||
|
|
||||||
// Print out the threading efficiency of this HMS, if state monitoring is enabled
|
// Print out the threading efficiency of this HMS, if state monitoring is enabled
|
||||||
if ( threadEfficiencyMonitor != null ) {
|
if ( threadEfficiencyMonitor != null ) {
|
||||||
// include the master thread information
|
// include the master thread information
|
||||||
|
|
@ -251,6 +303,57 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown all of the created engines, and clear the list of created engines, dropping
|
||||||
|
* pointers to the traversal engines
|
||||||
|
*/
|
||||||
|
public synchronized void shutdownTraversalEngines() {
|
||||||
|
if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
|
||||||
|
throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
|
||||||
|
"have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
|
||||||
|
+ " have been returned");
|
||||||
|
|
||||||
|
for ( final TraversalEngine te : allCreatedTraversalEngines)
|
||||||
|
te.shutdown();
|
||||||
|
|
||||||
|
// horrible hack to print nano scheduling information across all nano schedulers, if any were used
|
||||||
|
NanoScheduler.printCombinedRuntimeProfile();
|
||||||
|
|
||||||
|
allCreatedTraversalEngines.clear();
|
||||||
|
availableTraversalEngines.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prints out information about number of reads observed and filtering, if any reads were used in the traversal
|
||||||
|
*
|
||||||
|
* Looks like:
|
||||||
|
*
|
||||||
|
* INFO 10:40:47,370 MicroScheduler - 22 reads were filtered out during traversal out of 101 total (21.78%)
|
||||||
|
* INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing BadMateFilter
|
||||||
|
* INFO 10:40:47,370 MicroScheduler - -> 20 reads (19.80% of total) failing DuplicateReadFilter
|
||||||
|
* INFO 10:40:47,370 MicroScheduler - -> 1 reads (0.99% of total) failing FailsVendorQualityCheckFilter
|
||||||
|
*/
|
||||||
|
private void printReadFilteringStats() {
|
||||||
|
final ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics();
|
||||||
|
if ( cumulativeMetrics.getNumReadsSeen() > 0 ) {
|
||||||
|
// count up the number of skipped reads by summing over all filters
|
||||||
|
long nSkippedReads = 0L;
|
||||||
|
for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values())
|
||||||
|
nSkippedReads += countsByFilter;
|
||||||
|
|
||||||
|
logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)",
|
||||||
|
nSkippedReads,
|
||||||
|
cumulativeMetrics.getNumReadsSeen(),
|
||||||
|
100.0 * MathUtils.ratio(nSkippedReads, cumulativeMetrics.getNumReadsSeen())));
|
||||||
|
|
||||||
|
for ( final Map.Entry<String, Long> filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) {
|
||||||
|
long count = filterCounts.getValue();
|
||||||
|
logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s",
|
||||||
|
count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the engine that created this microscheduler.
|
* Gets the engine that created this microscheduler.
|
||||||
* @return The engine owning this microscheduler.
|
* @return The engine owning this microscheduler.
|
||||||
|
|
@ -269,38 +372,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
*/
|
*/
|
||||||
public IndexedFastaSequenceFile getReference() { return reference; }
|
public IndexedFastaSequenceFile getReference() { return reference; }
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the filename to which performance data is currently being written.
|
|
||||||
* @return Filename to which performance data is currently being written.
|
|
||||||
*/
|
|
||||||
public String getPerformanceLogFileName() {
|
|
||||||
return traversalEngine.getPerformanceLogFileName();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the filename of the log for performance. If set,
|
|
||||||
* @param fileName filename to use when writing performance data.
|
|
||||||
*/
|
|
||||||
public void setPerformanceLogFileName(String fileName) {
|
|
||||||
traversalEngine.setPerformanceLogFileName(fileName);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the frequency with which performance data is written.
|
|
||||||
* @return Frequency, in seconds, of performance log writes.
|
|
||||||
*/
|
|
||||||
public long getPerformanceProgressPrintFrequencySeconds() {
|
|
||||||
return traversalEngine.getPerformanceProgressPrintFrequencySeconds();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How often should the performance log message be written?
|
|
||||||
* @param seconds number of seconds between messages indicating performance frequency.
|
|
||||||
*/
|
|
||||||
public void setPerformanceProgressPrintFrequencySeconds(long seconds) {
|
|
||||||
traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void cleanup() {
|
protected void cleanup() {
|
||||||
try {
|
try {
|
||||||
mBeanServer.unregisterMBean(mBeanName);
|
mBeanServer.unregisterMBean(mBeanName);
|
||||||
|
|
@ -309,4 +380,38 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex);
|
throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a traversal engine suitable for use in this thread.
|
||||||
|
*
|
||||||
|
* Pops the next available engine from the available ones maintained by this
|
||||||
|
* microscheduler. Note that it's a runtime error to pop a traversal engine
|
||||||
|
* from this scheduler if there are none available. Callers that
|
||||||
|
* once pop'd an engine for use must return it with returnTraversalEngine
|
||||||
|
*
|
||||||
|
* @return a non-null TraversalEngine suitable for execution in this scheduler
|
||||||
|
*/
|
||||||
|
@Ensures("result != null")
|
||||||
|
protected synchronized TraversalEngine borrowTraversalEngine() {
|
||||||
|
if ( availableTraversalEngines.isEmpty() )
|
||||||
|
throw new IllegalStateException("no traversal engines were available");
|
||||||
|
else {
|
||||||
|
return availableTraversalEngines.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a borrowed traversal engine to this MicroScheduler, for later use
|
||||||
|
* in another traversal execution
|
||||||
|
*
|
||||||
|
* @param traversalEngine the borrowed traversal engine. Must have been previously borrowed.
|
||||||
|
*/
|
||||||
|
protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) {
|
||||||
|
if ( traversalEngine == null )
|
||||||
|
throw new IllegalArgumentException("Attempting to push a null traversal engine");
|
||||||
|
if ( ! allCreatedTraversalEngines.contains(traversalEngine) )
|
||||||
|
throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine);
|
||||||
|
|
||||||
|
availableTraversalEngines.push(traversalEngine);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,27 +31,5 @@ package org.broadinstitute.sting.gatk.executive;
|
||||||
* To change this template use File | Settings | File Templates.
|
* To change this template use File | Settings | File Templates.
|
||||||
*/
|
*/
|
||||||
public interface MicroSchedulerMBean {
|
public interface MicroSchedulerMBean {
|
||||||
/**
|
// has nothing because we don't have anything we currently track
|
||||||
* Gets the filename to which performance data is currently being written.
|
|
||||||
* @return Filename to which performance data is currently being written.
|
|
||||||
*/
|
|
||||||
public String getPerformanceLogFileName();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the filename of the log for performance. If set,
|
|
||||||
* @param fileName filename to use when writing performance data.
|
|
||||||
*/
|
|
||||||
public void setPerformanceLogFileName(String fileName);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the frequency with which performance data is written.
|
|
||||||
* @return Frequency, in seconds, of performance log writes.
|
|
||||||
*/
|
|
||||||
public long getPerformanceProgressPrintFrequencySeconds();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How often should the performance log message be written?
|
|
||||||
* @param seconds number of seconds between messages indicating performance frequency.
|
|
||||||
*/
|
|
||||||
public void setPerformanceProgressPrintFrequencySeconds(long seconds);
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
/**
|
/**
|
||||||
* User: hanna
|
* User: hanna
|
||||||
* Date: Apr 29, 2009
|
* Date: Apr 29, 2009
|
||||||
|
|
@ -30,7 +29,6 @@ public class ShardTraverser implements Callable {
|
||||||
final private HierarchicalMicroScheduler microScheduler;
|
final private HierarchicalMicroScheduler microScheduler;
|
||||||
final private Walker walker;
|
final private Walker walker;
|
||||||
final private Shard shard;
|
final private Shard shard;
|
||||||
final private TraversalEngine traversalEngine;
|
|
||||||
final private ThreadLocalOutputTracker outputTracker;
|
final private ThreadLocalOutputTracker outputTracker;
|
||||||
private OutputMergeTask outputMergeTask;
|
private OutputMergeTask outputMergeTask;
|
||||||
|
|
||||||
|
|
@ -43,20 +41,18 @@ public class ShardTraverser implements Callable {
|
||||||
private boolean complete = false;
|
private boolean complete = false;
|
||||||
|
|
||||||
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
|
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
|
||||||
TraversalEngine traversalEngine,
|
|
||||||
Walker walker,
|
Walker walker,
|
||||||
Shard shard,
|
Shard shard,
|
||||||
ThreadLocalOutputTracker outputTracker) {
|
ThreadLocalOutputTracker outputTracker) {
|
||||||
this.microScheduler = microScheduler;
|
this.microScheduler = microScheduler;
|
||||||
this.walker = walker;
|
this.walker = walker;
|
||||||
this.traversalEngine = traversalEngine;
|
|
||||||
this.shard = shard;
|
this.shard = shard;
|
||||||
this.outputTracker = outputTracker;
|
this.outputTracker = outputTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object call() {
|
public Object call() {
|
||||||
|
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine();
|
||||||
try {
|
try {
|
||||||
traversalEngine.startTimersIfNecessary();
|
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
Object accumulator = walker.reduceInit();
|
Object accumulator = walker.reduceInit();
|
||||||
|
|
@ -67,7 +63,7 @@ public class ShardTraverser implements Callable {
|
||||||
|
|
||||||
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
|
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
|
||||||
final ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),microScheduler.getEngine().getGenomeLocParser(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods);
|
final ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),microScheduler.getEngine().getGenomeLocParser(),iterator.getLocus(),iterator,microScheduler.reference,microScheduler.rods);
|
||||||
accumulator = traversalEngine.traverse( walker, dataProvider, accumulator );
|
accumulator = traversalEngine.traverse(walker, dataProvider, accumulator);
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -85,6 +81,7 @@ public class ShardTraverser implements Callable {
|
||||||
} finally {
|
} finally {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
complete = true;
|
complete = true;
|
||||||
|
microScheduler.returnTraversalEngine(traversalEngine);
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,66 +30,29 @@ import org.broadinstitute.sting.gatk.ReadMetrics;
|
||||||
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
||||||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
import org.broadinstitute.sting.utils.*;
|
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,ProviderType extends ShardDataProvider> {
|
public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,ProviderType extends ShardDataProvider> {
|
||||||
/** our log, which we want to capture anything from this class */
|
/** our log, which we want to capture anything from this class */
|
||||||
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
|
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
|
||||||
|
|
||||||
// Time in milliseconds since we initialized this engine
|
|
||||||
private static final int HISTORY_WINDOW_SIZE = 50;
|
|
||||||
|
|
||||||
/** lock object to sure updates to history are consistent across threads */
|
|
||||||
private static final Object lock = new Object();
|
|
||||||
LinkedList<ProcessingHistory> history = new LinkedList<ProcessingHistory>();
|
|
||||||
|
|
||||||
/** We use the SimpleTimer to time our run */
|
|
||||||
private SimpleTimer timer = null;
|
|
||||||
|
|
||||||
// How long can we go without printing some progress info?
|
|
||||||
private long lastProgressPrintTime = -1; // When was the last time we printed progress log?
|
|
||||||
|
|
||||||
private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000; // in milliseconds
|
|
||||||
private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
|
|
||||||
private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
|
|
||||||
private long progressPrintFrequency = 10 * 1000; // in milliseconds
|
|
||||||
private boolean progressMeterInitialized = false;
|
|
||||||
|
|
||||||
// for performance log
|
|
||||||
private static final boolean PERFORMANCE_LOG_ENABLED = true;
|
|
||||||
private final Object performanceLogLock = new Object();
|
|
||||||
private File performanceLogFile;
|
|
||||||
private PrintStream performanceLog = null;
|
|
||||||
private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log?
|
|
||||||
private final long PERFORMANCE_LOG_PRINT_FREQUENCY = progressPrintFrequency; // in milliseconds
|
|
||||||
|
|
||||||
/** Size, in bp, of the area we are processing. Updated once in the system in initial for performance reasons */
|
|
||||||
long targetSize = -1;
|
|
||||||
GenomeLocSortedSet targetIntervals = null;
|
|
||||||
|
|
||||||
protected GenomeAnalysisEngine engine;
|
protected GenomeAnalysisEngine engine;
|
||||||
|
private ProgressMeter progressMeter;
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------------------------------
|
||||||
//
|
//
|
||||||
// ABSTRACT METHODS
|
// ABSTRACT METHODS
|
||||||
//
|
//
|
||||||
// ----------------------------------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the named traversal type associated with the given traversal.
|
* Gets the named traversal type associated with the given traversal, such as loci, reads, etc.
|
||||||
|
*
|
||||||
* @return A user-friendly name for the given traversal type.
|
* @return A user-friendly name for the given traversal type.
|
||||||
*/
|
*/
|
||||||
protected abstract String getTraversalType();
|
public abstract String getTraversalUnits();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* this method must be implemented by all traversal engines
|
* this method must be implemented by all traversal engines
|
||||||
|
|
@ -104,70 +67,36 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
ProviderType dataProvider,
|
ProviderType dataProvider,
|
||||||
T sum);
|
T sum);
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------------------------------
|
|
||||||
//
|
|
||||||
// Common timing routines
|
|
||||||
//
|
|
||||||
// ----------------------------------------------------------------------------------------------------
|
|
||||||
/**
|
/**
|
||||||
* Initialize the traversal engine. After this point traversals can be run over the data
|
* Initialize the traversal engine. After this point traversals can be run over the data
|
||||||
|
*
|
||||||
* @param engine GenomeAnalysisEngine for this traversal
|
* @param engine GenomeAnalysisEngine for this traversal
|
||||||
|
* @param progressMeter An optional (null == optional) meter to track our progress
|
||||||
*/
|
*/
|
||||||
public void initialize(GenomeAnalysisEngine engine) {
|
public void initialize(final GenomeAnalysisEngine engine, final ProgressMeter progressMeter) {
|
||||||
if ( engine == null )
|
if ( engine == null )
|
||||||
throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!");
|
throw new ReviewedStingException("BUG: GenomeAnalysisEngine cannot be null!");
|
||||||
|
|
||||||
this.engine = engine;
|
this.engine = engine;
|
||||||
|
this.progressMeter = progressMeter;
|
||||||
if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) {
|
|
||||||
synchronized(this.performanceLogLock) {
|
|
||||||
performanceLogFile = engine.getArguments().performanceLog;
|
|
||||||
createNewPerformanceLog();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we don't have any intervals defined, create intervals from the reference itself
|
|
||||||
if ( this.engine.getIntervals() == null )
|
|
||||||
targetIntervals = GenomeLocSortedSet.createSetFromSequenceDictionary(engine.getReferenceDataSource().getReference().getSequenceDictionary());
|
|
||||||
else
|
|
||||||
targetIntervals = this.engine.getIntervals();
|
|
||||||
targetSize = targetIntervals.coveredSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createNewPerformanceLog() {
|
|
||||||
synchronized(performanceLogLock) {
|
|
||||||
try {
|
|
||||||
performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog));
|
|
||||||
List<String> pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining");
|
|
||||||
performanceLog.println(Utils.join("\t", pLogHeader));
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Should be called to indicate that we're going to process records and the timer should start ticking. This
|
|
||||||
* function should be called right before any traversal work is done, to avoid counting setup costs in the
|
|
||||||
* processing costs and inflating the estimated runtime.
|
|
||||||
*/
|
|
||||||
public void startTimersIfNecessary() {
|
|
||||||
if ( timer == null ) {
|
|
||||||
timer = new SimpleTimer("Traversal");
|
|
||||||
timer.start();
|
|
||||||
lastProgressPrintTime = timer.currentTime();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param curTime (current runtime, in millisecs)
|
* For testing only. Does not initialize the progress meter
|
||||||
* @param lastPrintTime the last time we printed, in machine milliseconds
|
|
||||||
* @param printFreq maximum permitted difference between last print and current times
|
|
||||||
*
|
*
|
||||||
* @return true if the maximum interval (in millisecs) has passed since the last printing
|
* @param engine
|
||||||
*/
|
*/
|
||||||
private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) {
|
protected void initialize(final GenomeAnalysisEngine engine) {
|
||||||
long elapsed = curTime - lastPrintTime;
|
initialize(engine, null);
|
||||||
return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS;
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by the MicroScheduler when all work is done and the GATK is shutting down.
|
||||||
|
*
|
||||||
|
* To be used by subclasses that need to free up resources (such as threads)
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
// by default there's nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -190,201 +119,15 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forward request to printProgress
|
* Forward request to notifyOfProgress
|
||||||
*
|
*
|
||||||
* Assumes that one cycle has been completed
|
* Assumes that one cycle has been completed
|
||||||
*
|
*
|
||||||
* @param loc the location
|
* @param loc the location
|
||||||
*/
|
*/
|
||||||
public void printProgress(final GenomeLoc loc) {
|
public void printProgress(final GenomeLoc loc) {
|
||||||
// A bypass is inserted here for unit testing.
|
if ( progressMeter != null )
|
||||||
printProgress(loc, false);
|
progressMeter.notifyOfProgress(loc, engine.getCumulativeMetrics().getNumIterations());
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility routine that prints out process information (including timing) every N records or
|
|
||||||
* every M seconds, for N and M set in global variables.
|
|
||||||
*
|
|
||||||
* @param loc Current location, can be null if you are at the end of the traversal
|
|
||||||
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
|
|
||||||
*/
|
|
||||||
private synchronized void printProgress(final GenomeLoc loc, boolean mustPrint) {
|
|
||||||
if( ! progressMeterInitialized ) {
|
|
||||||
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
|
|
||||||
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
|
|
||||||
"Location", getTraversalType(), getTraversalType()));
|
|
||||||
progressMeterInitialized = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
final long curTime = timer.currentTime();
|
|
||||||
boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
|
|
||||||
boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
|
|
||||||
|
|
||||||
if ( printProgress || printLog ) {
|
|
||||||
final ProcessingHistory last = updateHistory(loc, engine.getCumulativeMetrics());
|
|
||||||
|
|
||||||
final AutoFormattingTime elapsed = new AutoFormattingTime(last.elapsedSeconds);
|
|
||||||
final AutoFormattingTime bpRate = new AutoFormattingTime(last.secondsPerMillionBP());
|
|
||||||
final AutoFormattingTime unitRate = new AutoFormattingTime(last.secondsPerMillionElements());
|
|
||||||
final double fractionGenomeTargetCompleted = last.calculateFractionGenomeTargetCompleted(targetSize);
|
|
||||||
final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted);
|
|
||||||
final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds());
|
|
||||||
final long nRecords = engine.getCumulativeMetrics().getNumIterations();
|
|
||||||
|
|
||||||
if ( printProgress ) {
|
|
||||||
lastProgressPrintTime = curTime;
|
|
||||||
|
|
||||||
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
|
|
||||||
if ( estTotalRuntime.getTimeInSeconds() > TWELVE_HOURS_IN_SECONDS )
|
|
||||||
progressPrintFrequency = 60 * 1000; // in milliseconds
|
|
||||||
else if ( estTotalRuntime.getTimeInSeconds() > TWO_HOURS_IN_SECONDS )
|
|
||||||
progressPrintFrequency = 30 * 1000; // in milliseconds
|
|
||||||
else
|
|
||||||
progressPrintFrequency = 10 * 1000; // in milliseconds
|
|
||||||
|
|
||||||
final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : String.format("%s:%d", loc.getContig(), loc.getStart());
|
|
||||||
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
|
|
||||||
posName, nRecords*1.0, elapsed, unitRate,
|
|
||||||
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( printLog ) {
|
|
||||||
lastPerformanceLogPrintTime = curTime;
|
|
||||||
synchronized(performanceLogLock) {
|
|
||||||
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
|
|
||||||
elapsed.getTimeInSeconds(), nRecords, unitRate.getTimeInSeconds(), last.bpProcessed,
|
|
||||||
bpRate.getTimeInSeconds(), fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(),
|
|
||||||
timeToCompletion.getTimeInSeconds());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Keeps track of the last HISTORY_WINDOW_SIZE data points for the progress meter. Currently the
|
|
||||||
* history isn't used in any way, but in the future it'll become valuable for more accurate estimates
|
|
||||||
* for when a process will complete.
|
|
||||||
*
|
|
||||||
* @param loc our current position. If null, assumes we are done traversing
|
|
||||||
* @param metrics information about what's been processed already
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private ProcessingHistory updateHistory(GenomeLoc loc, ReadMetrics metrics) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if ( history.size() > HISTORY_WINDOW_SIZE )
|
|
||||||
history.pop();
|
|
||||||
|
|
||||||
long nRecords = metrics.getNumIterations();
|
|
||||||
long bpProcessed = loc == null ? targetSize : targetIntervals.sizeBeforeLoc(loc); // null -> end of processing
|
|
||||||
history.add(new ProcessingHistory(timer.getElapsedTime(), loc, nRecords, bpProcessed));
|
|
||||||
|
|
||||||
return history.getLast();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Called after a traversal to print out information about the traversal process
|
|
||||||
*/
|
|
||||||
public void printOnTraversalDone() {
|
|
||||||
printProgress(null, true);
|
|
||||||
|
|
||||||
final double elapsed = timer == null ? 0 : timer.getElapsedTime();
|
|
||||||
|
|
||||||
ReadMetrics cumulativeMetrics = engine.getCumulativeMetrics();
|
|
||||||
|
|
||||||
// count up the number of skipped reads by summing over all filters
|
|
||||||
long nSkippedReads = 0L;
|
|
||||||
for ( final long countsByFilter : cumulativeMetrics.getCountsByFilter().values())
|
|
||||||
nSkippedReads += countsByFilter;
|
|
||||||
|
|
||||||
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", elapsed, elapsed / 60, elapsed / 3600));
|
|
||||||
if ( cumulativeMetrics.getNumReadsSeen() > 0 )
|
|
||||||
logger.info(String.format("%d reads were filtered out during traversal out of %d total (%.2f%%)",
|
|
||||||
nSkippedReads,
|
|
||||||
cumulativeMetrics.getNumReadsSeen(),
|
|
||||||
100.0 * MathUtils.ratio(nSkippedReads,cumulativeMetrics.getNumReadsSeen())));
|
|
||||||
for ( Map.Entry<String, Long> filterCounts : cumulativeMetrics.getCountsByFilter().entrySet() ) {
|
|
||||||
long count = filterCounts.getValue();
|
|
||||||
logger.info(String.format(" -> %d reads (%.2f%% of total) failing %s",
|
|
||||||
count, 100.0 * MathUtils.ratio(count,cumulativeMetrics.getNumReadsSeen()), filterCounts.getKey()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( performanceLog != null ) performanceLog.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the filename to which performance data is currently being written.
|
|
||||||
* @return Filename to which performance data is currently being written.
|
|
||||||
*/
|
|
||||||
public String getPerformanceLogFileName() {
|
|
||||||
synchronized(performanceLogLock) {
|
|
||||||
return performanceLogFile.getAbsolutePath();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the filename of the log for performance. If set, will write performance data.
|
|
||||||
* @param fileName filename to use when writing performance data.
|
|
||||||
*/
|
|
||||||
public void setPerformanceLogFileName(String fileName) {
|
|
||||||
File file = new File(fileName);
|
|
||||||
|
|
||||||
synchronized(performanceLogLock) {
|
|
||||||
// Ignore multiple calls to reset the same lock.
|
|
||||||
if(performanceLogFile != null && performanceLogFile.equals(file))
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Close an existing log
|
|
||||||
if(performanceLog != null) performanceLog.close();
|
|
||||||
|
|
||||||
performanceLogFile = file;
|
|
||||||
createNewPerformanceLog();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the frequency with which performance data is written.
|
|
||||||
* @return Frequency, in seconds, of performance log writes.
|
|
||||||
*/
|
|
||||||
public long getPerformanceProgressPrintFrequencySeconds() {
|
|
||||||
return progressPrintFrequency;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How often should the performance log message be written?
|
|
||||||
* @param seconds number of seconds between messages indicating performance frequency.
|
|
||||||
*/
|
|
||||||
public void setPerformanceProgressPrintFrequencySeconds(long seconds) {
|
|
||||||
progressPrintFrequency = seconds;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ProcessingHistory {
|
|
||||||
double elapsedSeconds;
|
|
||||||
long unitsProcessed;
|
|
||||||
long bpProcessed;
|
|
||||||
GenomeLoc loc;
|
|
||||||
|
|
||||||
public ProcessingHistory(double elapsedSeconds, GenomeLoc loc, long unitsProcessed, long bpProcessed) {
|
|
||||||
this.elapsedSeconds = elapsedSeconds;
|
|
||||||
this.loc = loc;
|
|
||||||
this.unitsProcessed = unitsProcessed;
|
|
||||||
this.bpProcessed = bpProcessed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** How long in seconds to process 1M traversal units? */
|
|
||||||
private double secondsPerMillionElements() {
|
|
||||||
return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** How long in seconds to process 1M bp on the genome? */
|
|
||||||
private double secondsPerMillionBP() {
|
|
||||||
return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** What fractoin of the target intervals have we covered? */
|
|
||||||
private double calculateFractionGenomeTargetCompleted(final long targetSize) {
|
|
||||||
return (1.0*bpProcessed) / targetSize;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ public class TraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegio
|
||||||
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
|
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "active regions";
|
return "active regions";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ public class TraverseDuplicates<M,T> extends TraversalEngine<M,T,DuplicateWalker
|
||||||
private final boolean DEBUG = false;
|
private final boolean DEBUG = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "dups";
|
return "dups";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ public abstract class TraverseLociBase<M,T> extends TraversalEngine<M,T,LocusWal
|
||||||
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
|
protected static final Logger logger = Logger.getLogger(TraversalEngine.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected final String getTraversalType() {
|
public final String getTraversalUnits() {
|
||||||
return "sites";
|
return "sites";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -92,9 +92,8 @@ public class TraverseLociNano<M,T> extends TraverseLociBase<M,T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void printOnTraversalDone() {
|
public void shutdown() {
|
||||||
nanoScheduler.shutdown();
|
nanoScheduler.shutdown();
|
||||||
super.printOnTraversalDone();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ public class TraverseReadPairs<M,T> extends TraversalEngine<M,T, ReadPairWalker<
|
||||||
protected static final Logger logger = Logger.getLogger(TraverseReadPairs.class);
|
protected static final Logger logger = Logger.getLogger(TraverseReadPairs.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "read pairs";
|
return "read pairs";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ public class TraverseReads<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,Read
|
||||||
protected static final Logger logger = Logger.getLogger(TraverseReads.class);
|
protected static final Logger logger = Logger.getLogger(TraverseReads.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "reads";
|
return "reads";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "reads";
|
return "reads";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -135,9 +135,8 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void printOnTraversalDone() {
|
public void shutdown() {
|
||||||
nanoScheduler.shutdown();
|
nanoScheduler.shutdown();
|
||||||
super.printOnTraversalDone();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -145,4 +145,13 @@ public class SimpleTimer {
|
||||||
public synchronized long getElapsedTimeNano() {
|
public synchronized long getElapsedTimeNano() {
|
||||||
return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano;
|
return running ? (currentTimeNano() - startTimeNano + elapsedTimeNano) : elapsedTimeNano;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the elapsed time from toAdd to this elapsed time
|
||||||
|
*
|
||||||
|
* @param toAdd the timer whose elapsed time we want to add to this timer
|
||||||
|
*/
|
||||||
|
public synchronized void addElapsed(final SimpleTimer toAdd) {
|
||||||
|
elapsedTimeNano += toAdd.getElapsedTimeNano();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ class InputProducer<InputType> implements Runnable {
|
||||||
final SimpleTimer inputTimer,
|
final SimpleTimer inputTimer,
|
||||||
final BlockingQueue<InputValue> outputQueue) {
|
final BlockingQueue<InputValue> outputQueue) {
|
||||||
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
||||||
|
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
|
||||||
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
|
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
|
||||||
|
|
||||||
this.inputReader = inputReader;
|
this.inputReader = inputReader;
|
||||||
|
|
@ -38,11 +39,16 @@ class InputProducer<InputType> implements Runnable {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while ( inputReader.hasNext() ) {
|
while ( true ) {
|
||||||
if ( inputTimer != null ) inputTimer.restart();
|
inputTimer.restart();
|
||||||
final InputType input = inputReader.next();
|
if ( ! inputReader.hasNext() ) {
|
||||||
if ( inputTimer != null ) inputTimer.stop();
|
inputTimer.stop();
|
||||||
outputQueue.put(new InputValue(input));
|
break;
|
||||||
|
} else {
|
||||||
|
final InputType input = inputReader.next();
|
||||||
|
inputTimer.stop();
|
||||||
|
outputQueue.put(new InputValue(input));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add the EOF object so our consumer knows we are done in all inputs
|
// add the EOF object so our consumer knows we are done in all inputs
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,69 @@
|
||||||
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
|
import com.google.java.contract.Requires;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds runtime profile (input, read, map) times as tracked by NanoScheduler
|
||||||
|
*
|
||||||
|
* User: depristo
|
||||||
|
* Date: 9/10/12
|
||||||
|
* Time: 8:31 PM
|
||||||
|
*/
|
||||||
|
public class NSRuntimeProfile {
|
||||||
|
final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside");
|
||||||
|
final SimpleTimer inputTimer = new SimpleTimer("input");
|
||||||
|
final SimpleTimer mapTimer = new SimpleTimer("map");
|
||||||
|
final SimpleTimer reduceTimer = new SimpleTimer("reduce");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combine the elapsed time information from other with this profile
|
||||||
|
*
|
||||||
|
* @param other a non-null profile
|
||||||
|
*/
|
||||||
|
public void combine(final NSRuntimeProfile other) {
|
||||||
|
outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer);
|
||||||
|
inputTimer.addElapsed(other.inputTimer);
|
||||||
|
mapTimer.addElapsed(other.mapTimer);
|
||||||
|
reduceTimer.addElapsed(other.reduceTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print the runtime profiling to logger
|
||||||
|
*
|
||||||
|
* @param logger
|
||||||
|
*/
|
||||||
|
public void log(final Logger logger) {
|
||||||
|
log1(logger, "Input time", inputTimer);
|
||||||
|
log1(logger, "Map time", mapTimer);
|
||||||
|
log1(logger, "Reduce time", reduceTimer);
|
||||||
|
log1(logger, "Outside time", outsideSchedulerTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the total runtime for all functions of this nano scheduler
|
||||||
|
*/
|
||||||
|
@Ensures("result >= 0.0")
|
||||||
|
public double totalRuntimeInSeconds() {
|
||||||
|
return inputTimer.getElapsedTime()
|
||||||
|
+ mapTimer.getElapsedTime()
|
||||||
|
+ reduceTimer.getElapsedTime()
|
||||||
|
+ outsideSchedulerTimer.getElapsedTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print to logger.info timing information from timer, with name label
|
||||||
|
*
|
||||||
|
* @param label the name of the timer to display. Should be human readable
|
||||||
|
* @param timer the timer whose elapsed time we will display
|
||||||
|
*/
|
||||||
|
@Requires({"label != null", "timer != null"})
|
||||||
|
private void log1(final Logger logger, final String label, final SimpleTimer timer) {
|
||||||
|
final double myTimeInSec = timer.getElapsedTime();
|
||||||
|
final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100;
|
||||||
|
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,8 +3,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
import com.google.java.contract.Ensures;
|
import com.google.java.contract.Ensures;
|
||||||
import com.google.java.contract.Requires;
|
import com.google.java.contract.Requires;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
|
||||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
|
||||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||||
import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
|
import org.broadinstitute.sting.utils.threading.NamedThreadFactory;
|
||||||
|
|
||||||
|
|
@ -46,7 +44,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
|
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
|
||||||
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
|
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
|
||||||
private final static boolean LOG_MAP_TIMES = false;
|
private final static boolean LOG_MAP_TIMES = false;
|
||||||
private final static boolean TIME_CALLS = true;
|
|
||||||
|
|
||||||
private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100;
|
private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100;
|
||||||
|
|
||||||
|
|
@ -61,10 +58,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
boolean debug = false;
|
boolean debug = false;
|
||||||
private NSProgressFunction<InputType> progressFunction = null;
|
private NSProgressFunction<InputType> progressFunction = null;
|
||||||
|
|
||||||
final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null;
|
/**
|
||||||
final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null;
|
* Tracks the combined runtime profiles across all created nano schedulers
|
||||||
final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null;
|
*/
|
||||||
final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null;
|
final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The profile specific to this nano scheduler
|
||||||
|
*/
|
||||||
|
final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new nanoscheduler with the desire characteristics requested by the argument
|
* Create a new nanoscheduler with the desire characteristics requested by the argument
|
||||||
|
|
@ -92,7 +94,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start timing the time spent outside of the nanoScheduler
|
// start timing the time spent outside of the nanoScheduler
|
||||||
outsideSchedulerTimer.start();
|
myNSRuntimeProfile.outsideSchedulerTimer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -119,21 +121,31 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
* After this call, execute cannot be invoked without throwing an error
|
* After this call, execute cannot be invoked without throwing an error
|
||||||
*/
|
*/
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
outsideSchedulerTimer.stop();
|
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||||
|
|
||||||
|
// add my timing information to the combined NS runtime profile
|
||||||
|
combinedNSRuntimeProfiler.combine(myNSRuntimeProfile);
|
||||||
|
|
||||||
if ( nThreads > 1 ) {
|
if ( nThreads > 1 ) {
|
||||||
shutdownExecutor("inputExecutor", inputExecutor);
|
shutdownExecutor("inputExecutor", inputExecutor);
|
||||||
shutdownExecutor("mapExecutor", mapExecutor);
|
shutdownExecutor("mapExecutor", mapExecutor);
|
||||||
shutdownExecutor("reduceExecutor", reduceExecutor);
|
shutdownExecutor("reduceExecutor", reduceExecutor);
|
||||||
}
|
}
|
||||||
shutdown = true;
|
|
||||||
|
|
||||||
if (TIME_CALLS) {
|
shutdown = true;
|
||||||
printTimerInfo("Input time", inputTimer);
|
}
|
||||||
printTimerInfo("Map time", mapTimer);
|
|
||||||
printTimerInfo("Reduce time", reduceTimer);
|
public void printRuntimeProfile() {
|
||||||
printTimerInfo("Outside time", outsideSchedulerTimer);
|
myNSRuntimeProfile.log(logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void printCombinedRuntimeProfile() {
|
||||||
|
if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 )
|
||||||
|
combinedNSRuntimeProfiler.log(logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected double getTotalRuntime() {
|
||||||
|
return myNSRuntimeProfile.totalRuntimeInSeconds();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -154,21 +166,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
|
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Print to logger.info timing information from timer, with name label
|
|
||||||
*
|
|
||||||
* @param label the name of the timer to display. Should be human readable
|
|
||||||
* @param timer the timer whose elapsed time we will display
|
|
||||||
*/
|
|
||||||
@Requires({"label != null", "timer != null"})
|
|
||||||
private void printTimerInfo(final String label, final SimpleTimer timer) {
|
|
||||||
final double total = inputTimer.getElapsedTime() + mapTimer.getElapsedTime()
|
|
||||||
+ reduceTimer.getElapsedTime() + outsideSchedulerTimer.getElapsedTime();
|
|
||||||
final double myTimeInSec = timer.getElapsedTime();
|
|
||||||
final double myTimePercent = myTimeInSec / total * 100;
|
|
||||||
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if this nanoScheduler is shutdown, or false if its still open for business
|
* @return true if this nanoScheduler is shutdown, or false if its still open for business
|
||||||
*/
|
*/
|
||||||
|
|
@ -246,7 +243,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
||||||
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
||||||
|
|
||||||
outsideSchedulerTimer.stop();
|
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||||
|
|
||||||
ReduceType result;
|
ReduceType result;
|
||||||
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
|
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
|
||||||
|
|
@ -255,7 +252,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
|
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
|
||||||
}
|
}
|
||||||
|
|
||||||
outsideSchedulerTimer.restart();
|
myNSRuntimeProfile.outsideSchedulerTimer.restart();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,28 +269,31 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
ReduceType sum = initialValue;
|
ReduceType sum = initialValue;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
// start timer to ensure that both hasNext and next are caught by the timer
|
while ( true ) {
|
||||||
if ( TIME_CALLS ) inputTimer.restart();
|
// start timer to ensure that both hasNext and next are caught by the timer
|
||||||
while ( inputReader.hasNext() ) {
|
myNSRuntimeProfile.inputTimer.restart();
|
||||||
final InputType input = inputReader.next();
|
if ( ! inputReader.hasNext() ) {
|
||||||
if ( TIME_CALLS ) inputTimer.stop();
|
myNSRuntimeProfile.inputTimer.stop();
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
final InputType input = inputReader.next();
|
||||||
|
myNSRuntimeProfile.inputTimer.stop();
|
||||||
|
|
||||||
// map
|
// map
|
||||||
if ( TIME_CALLS ) mapTimer.restart();
|
myNSRuntimeProfile.mapTimer.restart();
|
||||||
final long preMapTime = LOG_MAP_TIMES ? 0 : mapTimer.currentTimeNano();
|
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
|
||||||
final MapType mapValue = map.apply(input);
|
final MapType mapValue = map.apply(input);
|
||||||
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime));
|
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
|
||||||
if ( TIME_CALLS ) mapTimer.stop();
|
myNSRuntimeProfile.mapTimer.stop();
|
||||||
|
|
||||||
if ( i++ % inputBufferSize == 0 && progressFunction != null )
|
if ( i++ % inputBufferSize == 0 && progressFunction != null )
|
||||||
progressFunction.progress(input);
|
progressFunction.progress(input);
|
||||||
|
|
||||||
// reduce
|
// reduce
|
||||||
if ( TIME_CALLS ) reduceTimer.restart();
|
myNSRuntimeProfile.reduceTimer.restart();
|
||||||
sum = reduce.apply(mapValue, sum);
|
sum = reduce.apply(mapValue, sum);
|
||||||
if ( TIME_CALLS ) reduceTimer.stop();
|
myNSRuntimeProfile.reduceTimer.stop();
|
||||||
|
}
|
||||||
if ( TIME_CALLS ) inputTimer.restart();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return sum;
|
return sum;
|
||||||
|
|
@ -321,11 +321,11 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
new LinkedBlockingDeque<Future<MapResult<MapType>>>(mapBufferSize);
|
new LinkedBlockingDeque<Future<MapResult<MapType>>>(mapBufferSize);
|
||||||
|
|
||||||
// Start running the input reader thread
|
// Start running the input reader thread
|
||||||
inputExecutor.submit(new InputProducer<InputType>(inputReader, inputTimer, inputQueue));
|
inputExecutor.submit(new InputProducer<InputType>(inputReader, myNSRuntimeProfile.inputTimer, inputQueue));
|
||||||
|
|
||||||
// Start running the reducer thread
|
// Start running the reducer thread
|
||||||
final ReducerThread<MapType, ReduceType> reducer
|
final ReducerThread<MapType, ReduceType> reducer
|
||||||
= new ReducerThread<MapType, ReduceType>(reduce, reduceTimer, initialValue, mapResultQueue);
|
= new ReducerThread<MapType, ReduceType>(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue);
|
||||||
final Future<ReduceType> reduceResult = reduceExecutor.submit(reducer);
|
final Future<ReduceType> reduceResult = reduceExecutor.submit(reducer);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -382,10 +382,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MapResult<MapType> call() {
|
public MapResult<MapType> call() {
|
||||||
if ( TIME_CALLS ) mapTimer.restart();
|
|
||||||
if ( debug ) debugPrint("\t\tmap " + input);
|
if ( debug ) debugPrint("\t\tmap " + input);
|
||||||
|
myNSRuntimeProfile.mapTimer.restart();
|
||||||
final MapType result = map.apply(input);
|
final MapType result = map.apply(input);
|
||||||
if ( TIME_CALLS ) mapTimer.stop();
|
myNSRuntimeProfile.mapTimer.stop();
|
||||||
return new MapResult<MapType>(result, id);
|
return new MapResult<MapType>(result, id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ class ReducerThread<MapType, ReduceType> implements Callable<ReduceType> {
|
||||||
final ReduceType sum,
|
final ReduceType sum,
|
||||||
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue) {
|
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue) {
|
||||||
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
|
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
|
||||||
|
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
|
||||||
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
|
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
|
||||||
|
|
||||||
this.reduce = reduce;
|
this.reduce = reduce;
|
||||||
|
|
@ -51,9 +52,9 @@ class ReducerThread<MapType, ReduceType> implements Callable<ReduceType> {
|
||||||
} else {
|
} else {
|
||||||
lastJobID = result.getJobID();
|
lastJobID = result.getJobID();
|
||||||
// apply reduce, keeping track of sum
|
// apply reduce, keeping track of sum
|
||||||
if ( reduceTimer != null ) reduceTimer.restart();
|
reduceTimer.restart();
|
||||||
sum = reduce.apply(result.getValue(), sum);
|
sum = reduce.apply(result.getValue(), sum);
|
||||||
if ( reduceTimer != null ) reduceTimer.stop();
|
reduceTimer.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ExecutionException ex) {
|
} catch (ExecutionException ex) {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,310 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2010, The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.sting.utils.progressmeter;
|
||||||
|
|
||||||
|
import com.google.java.contract.Invariant;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.broadinstitute.sting.utils.*;
|
||||||
|
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A meter measuring progress on a calculation through a set of genomic regions that can
|
||||||
|
* print a few key metrics to a logger and optionally to a file
|
||||||
|
*
|
||||||
|
* The key information for assessing progress is a set of genome locs describing the total
|
||||||
|
* set of regions we will process. Whenever (at reasonable intervals) the processing unit
|
||||||
|
* can called notifyOfProgress and this logger may, depending on the metering delay, print
|
||||||
|
* a log message with the following metrics:
|
||||||
|
*
|
||||||
|
* -- Number of processed X (X = processing units)
|
||||||
|
* -- Runtime per.1M X
|
||||||
|
* -- Percent of regions to be processed completed
|
||||||
|
* -- The estimated total runtime based on previous performance
|
||||||
|
* -- The estimated time remaining for the entire process
|
||||||
|
*
|
||||||
|
* The optional file log an expanded set of metrics in tabular format
|
||||||
|
* suitable for subsequent analysis in R.
|
||||||
|
*
|
||||||
|
* This class is -- and MUST BE -- thread-safe for use in the GATK. Multiple independent
|
||||||
|
* threads executing processors will be calling notifyOfProgress() simultaneously and this
|
||||||
|
* class does (and MUST) properly sort out the timings of logs without interlacing outputs
|
||||||
|
* because of these threads.
|
||||||
|
*
|
||||||
|
* Consequently, the fundamental model for when to print the logs is time based. We basically
|
||||||
|
* print a meter message every X seconds, minutes, hours, whatever is appropriate based on the
|
||||||
|
* estimated remaining runtime.
|
||||||
|
*
|
||||||
|
* @author depristo
|
||||||
|
* @since 2010 maybe, but written in 09/12 for clarity
|
||||||
|
*/
|
||||||
|
@Invariant({
|
||||||
|
"targetSizeInBP >= 0",
|
||||||
|
"progressPrintFrequency > 0"
|
||||||
|
})
|
||||||
|
public class ProgressMeter {
|
||||||
|
protected static final Logger logger = Logger.getLogger(ProgressMeter.class);
|
||||||
|
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
// static constants controlling overall system behavior
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Min. milliseconds after we start up the meter before we will print our first meter message
|
||||||
|
*/
|
||||||
|
private final static long MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS = 30 * 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How often should we print performance logging information, when we are sending this
|
||||||
|
* information to a file? Not dynamically updated as the logger meter is.
|
||||||
|
*/
|
||||||
|
private final static long PERFORMANCE_LOG_PRINT_FREQUENCY = 10 * 1000;
|
||||||
|
|
||||||
|
private final static double TWO_HOURS_IN_SECONDS = 2.0 * 60.0 * 60.0;
|
||||||
|
private final static double TWELVE_HOURS_IN_SECONDS = 12.0 * 60.0 * 60.0;
|
||||||
|
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
// Variables we updating during running
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When was the last time we printed progress log? In milleseconds
|
||||||
|
*/
|
||||||
|
private long lastProgressPrintTime = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How frequently should we be printing our meter messages? Dynamically updated
|
||||||
|
* depending on how long we think the run has left.
|
||||||
|
*/
|
||||||
|
private long progressPrintFrequency = 10 * 1000; // default value
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When was the last time we printed to the performance log? In millseconds
|
||||||
|
*/
|
||||||
|
private long lastPerformanceLogPrintTime = -1;
|
||||||
|
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
// final variables fixed at object creation time
|
||||||
|
// --------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The set of genome locs describing the total region we are processing with
|
||||||
|
* this GATK run. Used to determine how close we are to completing the run
|
||||||
|
*/
|
||||||
|
private final GenomeLocSortedSet regionsBeingProcessed;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Size, in bp, of the area we are processing, derived from regionsBeingProcessed.
|
||||||
|
* Updated once in the system in initial for performance reasons
|
||||||
|
*/
|
||||||
|
private final long targetSizeInBP;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A string describing the type of units being processes, so we can say things like
|
||||||
|
* "we are running at X processingUnitName per second"
|
||||||
|
*/
|
||||||
|
private final String processingUnitName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A potentially null file where we print a supplementary, R readable performance log
|
||||||
|
* file.
|
||||||
|
*/
|
||||||
|
private final PrintStream performanceLog;
|
||||||
|
|
||||||
|
/** We use the SimpleTimer to time our run */
|
||||||
|
private final SimpleTimer timer = new SimpleTimer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new ProgressMeter
|
||||||
|
*
|
||||||
|
* @param performanceLogFile an optional performance log file where a table of performance logs will be written
|
||||||
|
* @param processingUnitName the name of the unit type being processed, suitable for saying X seconds per processingUnitName
|
||||||
|
* @param processingIntervals the intervals being processed
|
||||||
|
*/
|
||||||
|
public ProgressMeter(final File performanceLogFile,
|
||||||
|
final String processingUnitName,
|
||||||
|
final GenomeLocSortedSet processingIntervals) {
|
||||||
|
if ( processingUnitName == null ) throw new IllegalArgumentException("processingUnitName cannot be null");
|
||||||
|
if ( processingIntervals == null ) throw new IllegalArgumentException("Target intervals cannot be null");
|
||||||
|
|
||||||
|
this.processingUnitName = processingUnitName;
|
||||||
|
this.regionsBeingProcessed = processingIntervals;
|
||||||
|
|
||||||
|
// setup the performance logger output, if requested
|
||||||
|
if ( performanceLogFile != null ) {
|
||||||
|
try {
|
||||||
|
this.performanceLog = new PrintStream(new FileOutputStream(performanceLogFile));
|
||||||
|
final List<String> pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed",
|
||||||
|
"bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining");
|
||||||
|
performanceLog.println(Utils.join("\t", pLogHeader));
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
throw new UserException.CouldNotCreateOutputFile(performanceLogFile, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
performanceLog = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// cached for performance reasons
|
||||||
|
targetSizeInBP = processingIntervals.coveredSize();
|
||||||
|
|
||||||
|
// start up the timer
|
||||||
|
start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forward request to notifyOfProgress
|
||||||
|
*
|
||||||
|
* Assumes that one cycle has been completed
|
||||||
|
*
|
||||||
|
* @param loc our current location. Null means "in unmapped reads"
|
||||||
|
* @param nTotalRecordsProcessed the total number of records we've processed
|
||||||
|
*/
|
||||||
|
public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) {
|
||||||
|
notifyOfProgress(loc, false, nTotalRecordsProcessed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void start() {
|
||||||
|
timer.start();
|
||||||
|
lastProgressPrintTime = timer.currentTime();
|
||||||
|
|
||||||
|
logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]");
|
||||||
|
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
|
||||||
|
"Location", processingUnitName, processingUnitName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility routine that prints out process information (including timing) every N records or
|
||||||
|
* every M seconds, for N and M set in global variables.
|
||||||
|
*
|
||||||
|
* Synchronized to ensure that even with multiple threads calling notifyOfProgress we still
|
||||||
|
* get one clean stream of meter logs.
|
||||||
|
*
|
||||||
|
* @param loc Current location, can be null if you are at the end of the processing unit
|
||||||
|
* @param mustPrint If true, will print out info, regardless of time interval
|
||||||
|
* @param nTotalRecordsProcessed the total number of records we've processed
|
||||||
|
*/
|
||||||
|
private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) {
|
||||||
|
if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0");
|
||||||
|
|
||||||
|
final long curTime = timer.currentTime();
|
||||||
|
final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
|
||||||
|
final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
|
||||||
|
|
||||||
|
if ( printProgress || printLog ) {
|
||||||
|
final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed);
|
||||||
|
|
||||||
|
final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds());
|
||||||
|
final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP());
|
||||||
|
final AutoFormattingTime unitRate = new AutoFormattingTime(progressData.secondsPerMillionElements());
|
||||||
|
final double fractionGenomeTargetCompleted = progressData.calculateFractionGenomeTargetCompleted(targetSizeInBP);
|
||||||
|
final AutoFormattingTime estTotalRuntime = new AutoFormattingTime(elapsed.getTimeInSeconds() / fractionGenomeTargetCompleted);
|
||||||
|
final AutoFormattingTime timeToCompletion = new AutoFormattingTime(estTotalRuntime.getTimeInSeconds() - elapsed.getTimeInSeconds());
|
||||||
|
|
||||||
|
if ( printProgress ) {
|
||||||
|
lastProgressPrintTime = curTime;
|
||||||
|
updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds());
|
||||||
|
|
||||||
|
// a pretty name for our position
|
||||||
|
final String posName = loc == null
|
||||||
|
? (mustPrint ? "done" : "unmapped reads")
|
||||||
|
: String.format("%s:%d", loc.getContig(), loc.getStart());
|
||||||
|
|
||||||
|
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
|
||||||
|
posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate,
|
||||||
|
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( printLog ) {
|
||||||
|
lastPerformanceLogPrintTime = curTime;
|
||||||
|
performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n",
|
||||||
|
elapsed.getTimeInSeconds(), progressData.getUnitsProcessed(), unitRate.getTimeInSeconds(),
|
||||||
|
progressData.getBpProcessed(), bpRate.getTimeInSeconds(),
|
||||||
|
fractionGenomeTargetCompleted, estTotalRuntime.getTimeInSeconds(),
|
||||||
|
timeToCompletion.getTimeInSeconds());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine, based on remaining runtime, how often to print the meter
|
||||||
|
*
|
||||||
|
* @param totalRuntimeSeconds kinda obvious, no?
|
||||||
|
*/
|
||||||
|
private void updateLoggerPrintFrequency(final double totalRuntimeSeconds) {
|
||||||
|
// dynamically change the update rate so that short running jobs receive frequent updates while longer jobs receive fewer updates
|
||||||
|
if ( totalRuntimeSeconds > TWELVE_HOURS_IN_SECONDS )
|
||||||
|
progressPrintFrequency = 60 * 1000; // in milliseconds
|
||||||
|
else if ( totalRuntimeSeconds > TWO_HOURS_IN_SECONDS )
|
||||||
|
progressPrintFrequency = 30 * 1000; // in milliseconds
|
||||||
|
else
|
||||||
|
progressPrintFrequency = 10 * 1000; // in milliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new ProgressData object recording a snapshot of our progress at this instant
|
||||||
|
*
|
||||||
|
* @param loc our current position. If null, assumes we are done traversing
|
||||||
|
* @param nTotalRecordsProcessed the total number of records we've processed
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private ProgressMeterData takeProgressSnapshot(final GenomeLoc loc, final long nTotalRecordsProcessed) {
|
||||||
|
// null -> end of processing
|
||||||
|
final long bpProcessed = loc == null ? targetSizeInBP : regionsBeingProcessed.sizeBeforeLoc(loc);
|
||||||
|
return new ProgressMeterData(timer.getElapsedTime(), nTotalRecordsProcessed, bpProcessed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Should be called when processing is done
|
||||||
|
*/
|
||||||
|
public void notifyDone(final long nTotalRecordsProcessed) {
|
||||||
|
// print out the progress meter
|
||||||
|
notifyOfProgress(null, true, nTotalRecordsProcessed);
|
||||||
|
|
||||||
|
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours",
|
||||||
|
timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600));
|
||||||
|
|
||||||
|
if ( performanceLog != null )
|
||||||
|
performanceLog.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param curTime (current runtime, in millisecs)
|
||||||
|
* @param lastPrintTime the last time we printed, in machine milliseconds
|
||||||
|
* @param printFreq maximum permitted difference between last print and current times
|
||||||
|
*
|
||||||
|
* @return true if the maximum interval (in millisecs) has passed since the last printing
|
||||||
|
*/
|
||||||
|
private boolean maxElapsedIntervalForPrinting(final long curTime, long lastPrintTime, long printFreq) {
|
||||||
|
final long elapsed = curTime - lastPrintTime;
|
||||||
|
return elapsed > printFreq && elapsed > MIN_ELAPSED_TIME_BEFORE_FIRST_PROGRESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
package org.broadinstitute.sting.utils.progressmeter;
|
||||||
|
|
||||||
|
import com.google.java.contract.Ensures;
|
||||||
|
import com.google.java.contract.Requires;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a snapshot of our performance, suitable for storage and later analysis
|
||||||
|
*/
|
||||||
|
class ProgressMeterData {
|
||||||
|
private final double elapsedSeconds;
|
||||||
|
private final long unitsProcessed;
|
||||||
|
private final long bpProcessed;
|
||||||
|
|
||||||
|
@Requires({"unitsProcessed >= 0", "bpProcessed >= 0", "elapsedSeconds >= 0"})
|
||||||
|
public ProgressMeterData(double elapsedSeconds, long unitsProcessed, long bpProcessed) {
|
||||||
|
this.elapsedSeconds = elapsedSeconds;
|
||||||
|
this.unitsProcessed = unitsProcessed;
|
||||||
|
this.bpProcessed = bpProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ensures("result >= 0.0")
|
||||||
|
public double getElapsedSeconds() {
|
||||||
|
return elapsedSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ensures("result >= 0")
|
||||||
|
public long getUnitsProcessed() {
|
||||||
|
return unitsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ensures("result >= 0")
|
||||||
|
public long getBpProcessed() {
|
||||||
|
return bpProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** How long in seconds to process 1M traversal units? */
|
||||||
|
@Ensures("result >= 0.0")
|
||||||
|
public double secondsPerMillionElements() {
|
||||||
|
return (elapsedSeconds * 1000000.0) / Math.max(unitsProcessed, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** How long in seconds to process 1M bp on the genome? */
|
||||||
|
@Ensures("result >= 0.0")
|
||||||
|
public double secondsPerMillionBP() {
|
||||||
|
return (elapsedSeconds * 1000000.0) / Math.max(bpProcessed, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** What fraction of the target intervals have we covered? */
|
||||||
|
@Requires("targetSize >= 0")
|
||||||
|
@Ensures({"result >= 0.0", "result <= 1.0"})
|
||||||
|
public double calculateFractionGenomeTargetCompleted(final long targetSize) {
|
||||||
|
return (1.0*bpProcessed) / Math.max(targetSize, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -69,7 +69,7 @@ public class ArtificialReadsTraversal<M,T> extends TraversalEngine<M,T,Walker<M,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getTraversalType() {
|
public String getTraversalUnits() {
|
||||||
return "reads";
|
return "reads";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,6 @@
|
||||||
package org.broadinstitute.sting.gatk.iterators;
|
package org.broadinstitute.sting.gatk.iterators;
|
||||||
|
|
||||||
import net.sf.samtools.SAMFileHeader;
|
import net.sf.samtools.*;
|
||||||
import net.sf.samtools.SAMFileReader;
|
|
||||||
import net.sf.samtools.SAMRecord;
|
|
||||||
import net.sf.samtools.util.CloseableIterator;
|
import net.sf.samtools.util.CloseableIterator;
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
import org.broadinstitute.sting.gatk.ReadProperties;
|
import org.broadinstitute.sting.gatk.ReadProperties;
|
||||||
|
|
@ -39,57 +37,10 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
|
||||||
genomeLocParser = new GenomeLocParser(header.getSequenceDictionary());
|
genomeLocParser = new GenomeLocParser(header.getSequenceDictionary());
|
||||||
}
|
}
|
||||||
|
|
||||||
private final LocusIteratorByStateExperimental makeLTBS(List<SAMRecord> reads, ReadProperties readAttributes) {
|
private LocusIteratorByStateExperimental makeLTBS(List<SAMRecord> reads, ReadProperties readAttributes) {
|
||||||
return new LocusIteratorByStateExperimental(new FakeCloseableIterator<SAMRecord>(reads.iterator()), readAttributes, genomeLocParser, LocusIteratorByStateExperimental.sampleListForSAMWithoutReadGroups());
|
return new LocusIteratorByStateExperimental(new FakeCloseableIterator<SAMRecord>(reads.iterator()), readAttributes, genomeLocParser, LocusIteratorByStateExperimental.sampleListForSAMWithoutReadGroups());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ReadProperties createTestReadProperties() {
|
|
||||||
return createTestReadProperties(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) {
|
|
||||||
return new ReadProperties(
|
|
||||||
Collections.<SAMReaderID>emptyList(),
|
|
||||||
new SAMFileHeader(),
|
|
||||||
false,
|
|
||||||
SAMFileReader.ValidationStringency.STRICT,
|
|
||||||
downsamplingMethod,
|
|
||||||
new ValidationExclusion(),
|
|
||||||
Collections.<ReadFilter>emptyList(),
|
|
||||||
Collections.<ReadTransformer>emptyList(),
|
|
||||||
false,
|
|
||||||
(byte) -1
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class FakeCloseableIterator<T> implements CloseableIterator<T> {
|
|
||||||
Iterator<T> iterator;
|
|
||||||
|
|
||||||
public FakeCloseableIterator(Iterator<T> it) {
|
|
||||||
iterator = it;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return iterator.hasNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T next() {
|
|
||||||
return iterator.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remove() {
|
|
||||||
throw new UnsupportedOperationException("Don't remove!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testXandEQOperators() {
|
public void testXandEQOperators() {
|
||||||
final byte[] bases1 = new byte[] {'A','A','A','A','A','A','A','A','A','A'};
|
final byte[] bases1 = new byte[] {'A','A','A','A','A','A','A','A','A','A'};
|
||||||
|
|
@ -308,45 +259,36 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
|
||||||
// comprehensive LIBS/PileupElement tests //
|
// comprehensive LIBS/PileupElement tests //
|
||||||
////////////////////////////////////////////
|
////////////////////////////////////////////
|
||||||
|
|
||||||
private static final int IS_BEFORE_DELETED_BASE_FLAG = 1;
|
|
||||||
private static final int IS_BEFORE_DELETION_START_FLAG = 2;
|
|
||||||
private static final int IS_AFTER_DELETED_BASE_FLAG = 4;
|
|
||||||
private static final int IS_AFTER_DELETION_END_FLAG = 8;
|
|
||||||
private static final int IS_BEFORE_INSERTION_FLAG = 16;
|
|
||||||
private static final int IS_AFTER_INSERTION_FLAG = 32;
|
|
||||||
private static final int IS_NEXT_TO_SOFTCLIP_FLAG = 64;
|
|
||||||
|
|
||||||
private static class LIBSTest {
|
private static class LIBSTest {
|
||||||
|
|
||||||
|
|
||||||
final String cigar;
|
final String cigar;
|
||||||
final int readLength;
|
final int readLength;
|
||||||
final List<Integer> offsets;
|
|
||||||
final List<Integer> flags;
|
|
||||||
|
|
||||||
private LIBSTest(final String cigar, final int readLength, final List<Integer> offsets, final List<Integer> flags) {
|
private LIBSTest(final String cigar, final int readLength) {
|
||||||
this.cigar = cigar;
|
this.cigar = cigar;
|
||||||
this.readLength = readLength;
|
this.readLength = readLength;
|
||||||
this.offsets = offsets;
|
|
||||||
this.flags = flags;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@DataProvider(name = "LIBSTest")
|
@DataProvider(name = "LIBSTest")
|
||||||
public Object[][] createLIBSTestData() {
|
public Object[][] createLIBSTestData() {
|
||||||
|
|
||||||
|
//TODO -- when LIBS is fixed this should be replaced to provide all possible permutations of CIGAR strings
|
||||||
|
|
||||||
return new Object[][]{
|
return new Object[][]{
|
||||||
{new LIBSTest("1I", 1, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))},
|
{new LIBSTest("1I", 1)},
|
||||||
{new LIBSTest("10I", 10, Arrays.asList(0), Arrays.asList(IS_BEFORE_INSERTION_FLAG))},
|
{new LIBSTest("10I", 10)},
|
||||||
{new LIBSTest("2M2I2M", 6, Arrays.asList(0,1,4,5), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG,IS_AFTER_INSERTION_FLAG,0))},
|
{new LIBSTest("2M2I2M", 6)},
|
||||||
{new LIBSTest("2M2I", 4, Arrays.asList(0,1), Arrays.asList(0,IS_BEFORE_INSERTION_FLAG))},
|
{new LIBSTest("2M2I", 4)},
|
||||||
//TODO -- uncomment these when LIBS is fixed
|
//TODO -- uncomment these when LIBS is fixed
|
||||||
//{new LIBSTest("2I2M", 4, Arrays.asList(2,3), Arrays.asList(IS_AFTER_INSERTION_FLAG,0))},
|
//{new LIBSTest("2I2M", 4, Arrays.asList(2,3), Arrays.asList(IS_AFTER_INSERTION_FLAG,0))},
|
||||||
//{new LIBSTest("1I1M1D1M", 3, Arrays.asList(0,1), Arrays.asList(IS_AFTER_INSERTION_FLAG | IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG))},
|
//{new LIBSTest("1I1M1D1M", 3, Arrays.asList(0,1), Arrays.asList(IS_AFTER_INSERTION_FLAG | IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG))},
|
||||||
//{new LIBSTest("1S1I1M", 3, Arrays.asList(2), Arrays.asList(IS_AFTER_INSERTION_FLAG))},
|
//{new LIBSTest("1S1I1M", 3, Arrays.asList(2), Arrays.asList(IS_AFTER_INSERTION_FLAG))},
|
||||||
{new LIBSTest("1M2D2M", 3, Arrays.asList(0,1,2), Arrays.asList(IS_BEFORE_DELETION_START_FLAG | IS_BEFORE_DELETED_BASE_FLAG,IS_AFTER_DELETED_BASE_FLAG | IS_AFTER_DELETION_END_FLAG,0))},
|
//{new LIBSTest("1M2D2M", 3)},
|
||||||
{new LIBSTest("1S1M", 2, Arrays.asList(1), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))},
|
{new LIBSTest("1S1M", 2)},
|
||||||
{new LIBSTest("1M1S", 2, Arrays.asList(0), Arrays.asList(IS_NEXT_TO_SOFTCLIP_FLAG))},
|
{new LIBSTest("1M1S", 2)},
|
||||||
{new LIBSTest("1S1M1I", 3, Arrays.asList(1), Arrays.asList(IS_BEFORE_INSERTION_FLAG | IS_NEXT_TO_SOFTCLIP_FLAG))}
|
{new LIBSTest("1S1M1I", 3)}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -361,26 +303,24 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
|
||||||
|
|
||||||
// create the iterator by state with the fake reads and fake records
|
// create the iterator by state with the fake reads and fake records
|
||||||
li = makeLTBS(Arrays.asList(read), createTestReadProperties());
|
li = makeLTBS(Arrays.asList(read), createTestReadProperties());
|
||||||
|
final LIBS_position tester = new LIBS_position(read);
|
||||||
|
|
||||||
int offset = 0;
|
|
||||||
while ( li.hasNext() ) {
|
while ( li.hasNext() ) {
|
||||||
AlignmentContext alignmentContext = li.next();
|
AlignmentContext alignmentContext = li.next();
|
||||||
ReadBackedPileup p = alignmentContext.getBasePileup();
|
ReadBackedPileup p = alignmentContext.getBasePileup();
|
||||||
Assert.assertTrue(p.getNumberOfElements() == 1);
|
Assert.assertTrue(p.getNumberOfElements() == 1);
|
||||||
PileupElement pe = p.iterator().next();
|
PileupElement pe = p.iterator().next();
|
||||||
|
|
||||||
final int flag = params.flags.get(offset);
|
tester.stepForwardOnGenome();
|
||||||
Assert.assertEquals(pe.isBeforeDeletedBase(), (flag & IS_BEFORE_DELETED_BASE_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isBeforeDeletionStart(), (flag & IS_BEFORE_DELETION_START_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isAfterDeletedBase(), (flag & IS_AFTER_DELETED_BASE_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isAfterDeletionEnd(), (flag & IS_AFTER_DELETION_END_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isBeforeInsertion(), (flag & IS_BEFORE_INSERTION_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isAfterInsertion(), (flag & IS_AFTER_INSERTION_FLAG) != 0);
|
|
||||||
Assert.assertEquals(pe.isNextToSoftClip(), (flag & IS_NEXT_TO_SOFTCLIP_FLAG) != 0);
|
|
||||||
|
|
||||||
Assert.assertEquals(pe.getOffset(), params.offsets.get(offset).intValue());
|
Assert.assertEquals(pe.isBeforeDeletedBase(), tester.isBeforeDeletedBase);
|
||||||
|
Assert.assertEquals(pe.isBeforeDeletionStart(), tester.isBeforeDeletionStart);
|
||||||
offset++;
|
Assert.assertEquals(pe.isAfterDeletedBase(), tester.isAfterDeletedBase);
|
||||||
|
Assert.assertEquals(pe.isAfterDeletionEnd(), tester.isAfterDeletionEnd);
|
||||||
|
Assert.assertEquals(pe.isBeforeInsertion(), tester.isBeforeInsertion);
|
||||||
|
Assert.assertEquals(pe.isAfterInsertion(), tester.isAfterInsertion);
|
||||||
|
Assert.assertEquals(pe.isNextToSoftClip(), tester.isNextToSoftClip);
|
||||||
|
Assert.assertEquals(pe.getOffset(), tester.getCurrentReadOffset());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -543,4 +483,165 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
|
||||||
|
|
||||||
test.run();
|
test.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////
|
||||||
|
// End Read State Manager Tests //
|
||||||
|
///////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////
|
||||||
|
// Helper methods / classes //
|
||||||
|
///////////////////////////////////////
|
||||||
|
|
||||||
|
private static ReadProperties createTestReadProperties() {
|
||||||
|
return createTestReadProperties(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ReadProperties createTestReadProperties( DownsamplingMethod downsamplingMethod ) {
|
||||||
|
return new ReadProperties(
|
||||||
|
Collections.<SAMReaderID>emptyList(),
|
||||||
|
new SAMFileHeader(),
|
||||||
|
false,
|
||||||
|
SAMFileReader.ValidationStringency.STRICT,
|
||||||
|
downsamplingMethod,
|
||||||
|
new ValidationExclusion(),
|
||||||
|
Collections.<ReadFilter>emptyList(),
|
||||||
|
Collections.<ReadTransformer>emptyList(),
|
||||||
|
false,
|
||||||
|
(byte) -1
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FakeCloseableIterator<T> implements CloseableIterator<T> {
|
||||||
|
Iterator<T> iterator;
|
||||||
|
|
||||||
|
public FakeCloseableIterator(Iterator<T> it) {
|
||||||
|
iterator = it;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T next() {
|
||||||
|
return iterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException("Don't remove!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class LIBS_position {
|
||||||
|
|
||||||
|
SAMRecord read;
|
||||||
|
|
||||||
|
final int numOperators;
|
||||||
|
int currentOperatorIndex = 0;
|
||||||
|
int currentPositionOnOperator = 0;
|
||||||
|
int currentReadOffset = 0;
|
||||||
|
|
||||||
|
boolean isBeforeDeletionStart = false;
|
||||||
|
boolean isBeforeDeletedBase = false;
|
||||||
|
boolean isAfterDeletionEnd = false;
|
||||||
|
boolean isAfterDeletedBase = false;
|
||||||
|
boolean isBeforeInsertion = false;
|
||||||
|
boolean isAfterInsertion = false;
|
||||||
|
boolean isNextToSoftClip = false;
|
||||||
|
|
||||||
|
boolean sawMop = false;
|
||||||
|
|
||||||
|
public LIBS_position(final SAMRecord read) {
|
||||||
|
this.read = read;
|
||||||
|
numOperators = read.getCigar().numCigarElements();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCurrentReadOffset() {
|
||||||
|
return Math.max(0, currentReadOffset - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Steps forward on the genome. Returns false when done reading the read, true otherwise.
|
||||||
|
*/
|
||||||
|
public boolean stepForwardOnGenome() {
|
||||||
|
if ( currentOperatorIndex == numOperators )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
CigarElement curElement = read.getCigar().getCigarElement(currentOperatorIndex);
|
||||||
|
if ( currentPositionOnOperator >= curElement.getLength() ) {
|
||||||
|
if ( ++currentOperatorIndex == numOperators )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
curElement = read.getCigar().getCigarElement(currentOperatorIndex);
|
||||||
|
currentPositionOnOperator = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ( curElement.getOperator() ) {
|
||||||
|
case I: // insertion w.r.t. the reference
|
||||||
|
if ( !sawMop )
|
||||||
|
break;
|
||||||
|
case S: // soft clip
|
||||||
|
currentReadOffset += curElement.getLength();
|
||||||
|
case H: // hard clip
|
||||||
|
case P: // padding
|
||||||
|
currentOperatorIndex++;
|
||||||
|
return stepForwardOnGenome();
|
||||||
|
|
||||||
|
case D: // deletion w.r.t. the reference
|
||||||
|
case N: // reference skip (looks and gets processed just like a "deletion", just different logical meaning)
|
||||||
|
currentPositionOnOperator++;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case M:
|
||||||
|
case EQ:
|
||||||
|
case X:
|
||||||
|
sawMop = true;
|
||||||
|
currentReadOffset++;
|
||||||
|
currentPositionOnOperator++;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalStateException("No support for cigar op: " + curElement.getOperator());
|
||||||
|
}
|
||||||
|
|
||||||
|
final boolean isFirstOp = currentOperatorIndex == 0;
|
||||||
|
final boolean isLastOp = currentOperatorIndex == numOperators - 1;
|
||||||
|
final boolean isFirstBaseOfOp = currentPositionOnOperator == 1;
|
||||||
|
final boolean isLastBaseOfOp = currentPositionOnOperator == curElement.getLength();
|
||||||
|
|
||||||
|
isBeforeDeletionStart = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isLastOp, isLastBaseOfOp);
|
||||||
|
isBeforeDeletedBase = isBeforeDeletionStart || (!isLastBaseOfOp && curElement.getOperator() == CigarOperator.D);
|
||||||
|
isAfterDeletionEnd = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.D, isFirstOp, isFirstBaseOfOp);
|
||||||
|
isAfterDeletedBase = isAfterDeletionEnd || (!isFirstBaseOfOp && curElement.getOperator() == CigarOperator.D);
|
||||||
|
isBeforeInsertion = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isLastOp, isLastBaseOfOp)
|
||||||
|
|| (!sawMop && curElement.getOperator() == CigarOperator.I);
|
||||||
|
isAfterInsertion = isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.I, isFirstOp, isFirstBaseOfOp);
|
||||||
|
isNextToSoftClip = isBeforeOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isLastOp, isLastBaseOfOp)
|
||||||
|
|| isAfterOp(read.getCigar(), currentOperatorIndex, CigarOperator.S, isFirstOp, isFirstBaseOfOp);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isBeforeOp(final Cigar cigar,
|
||||||
|
final int currentOperatorIndex,
|
||||||
|
final CigarOperator op,
|
||||||
|
final boolean isLastOp,
|
||||||
|
final boolean isLastBaseOfOp) {
|
||||||
|
return !isLastOp && isLastBaseOfOp && cigar.getCigarElement(currentOperatorIndex+1).getOperator() == op;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isAfterOp(final Cigar cigar,
|
||||||
|
final int currentOperatorIndex,
|
||||||
|
final CigarOperator op,
|
||||||
|
final boolean isFirstOp,
|
||||||
|
final boolean isFirstBaseOfOp) {
|
||||||
|
return !isFirstOp && isFirstBaseOfOp && cigar.getCigarElement(currentOperatorIndex-1).getOperator() == op;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -121,8 +121,6 @@ public class TraverseReadsUnitTest extends BaseTest {
|
||||||
Object accumulator = countReadWalker.reduceInit();
|
Object accumulator = countReadWalker.reduceInit();
|
||||||
|
|
||||||
for(Shard shard: shardStrategy) {
|
for(Shard shard: shardStrategy) {
|
||||||
traversalEngine.startTimersIfNecessary();
|
|
||||||
|
|
||||||
if (shard == null) {
|
if (shard == null) {
|
||||||
fail("Shard == null");
|
fail("Shard == null");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package org.broadinstitute.sting.utils.nanoScheduler;
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -42,7 +43,7 @@ public class InputProducerUnitTest extends BaseTest {
|
||||||
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
||||||
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
|
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
|
||||||
|
|
||||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), null, readQueue);
|
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue);
|
||||||
|
|
||||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||||
es.submit(ip);
|
es.submit(ip);
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.apache.log4j.BasicConfigurator;
|
import org.apache.log4j.BasicConfigurator;
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -86,7 +87,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
static NanoSchedulerBasicTest exampleTest = null;
|
static NanoSchedulerBasicTest exampleTest = null;
|
||||||
@DataProvider(name = "NanoSchedulerBasicTest")
|
@DataProvider(name = "NanoSchedulerBasicTest")
|
||||||
public Object[][] createNanoSchedulerBasicTest() {
|
public Object[][] createNanoSchedulerBasicTest() {
|
||||||
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) {
|
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000, 10000000) ) {
|
||||||
for ( final int nt : Arrays.asList(1, 2, 4) ) {
|
for ( final int nt : Arrays.asList(1, 2, 4) ) {
|
||||||
for ( final int start : Arrays.asList(0) ) {
|
for ( final int start : Arrays.asList(0) ) {
|
||||||
for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) {
|
for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) {
|
||||||
|
|
@ -114,6 +115,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
|
private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
|
||||||
|
final SimpleTimer timer = new SimpleTimer().start();
|
||||||
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
|
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
|
||||||
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
|
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
|
||||||
|
|
||||||
|
|
@ -129,6 +131,17 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
||||||
|
|
||||||
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
|
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
|
||||||
nanoScheduler.shutdown();
|
nanoScheduler.shutdown();
|
||||||
|
|
||||||
|
// TODO -- need to enable only in the case where there's serious time spend in
|
||||||
|
// TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up
|
||||||
|
final double myTimeEstimate = timer.getElapsedTime();
|
||||||
|
final double tolerance = 0.1;
|
||||||
|
if ( false && myTimeEstimate > 0.1 ) {
|
||||||
|
Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance,
|
||||||
|
"NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime()
|
||||||
|
+ " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of "
|
||||||
|
+ tolerance);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)
|
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package org.broadinstitute.sting.utils.nanoScheduler;
|
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||||
|
|
||||||
import org.broadinstitute.sting.BaseTest;
|
import org.broadinstitute.sting.BaseTest;
|
||||||
|
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
@ -61,7 +62,7 @@ public class ReducerThreadUnitTest extends BaseTest {
|
||||||
|
|
||||||
final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue);
|
final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue);
|
||||||
final ReducerThread<Integer, Integer> thread
|
final ReducerThread<Integer, Integer> thread
|
||||||
= new ReducerThread<Integer, Integer>(reduce, null, 0, mapResultsQueue);
|
= new ReducerThread<Integer, Integer>(reduce, new SimpleTimer(), 0, mapResultsQueue);
|
||||||
|
|
||||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||||
final Future<Integer> value = es.submit(thread);
|
final Future<Integer> value = es.submit(thread);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue