NanoScheduler now supports printProgress. Bugfixes to printProgress

-- TraverseReadsNano prints progress at the end of each traversal unit
-- Fix bugs in TraversalEngine printProgress
    -- Synchronize the method so we don't get multiple logged outputs when two or more HMSs call printProgress before initialization at the start!
    -- Fix the logic for mustPrint, which actually had the logic of mustNotPrint.  Now we see the done log line that was always supposed to be there
    -- Fix output formatting, as the done() line was incorrectly shifting over the % complete by 1 char as 100.0% didn't fit in %4.1f
-- Add clearer doc on -PF argument so that people know that the performance log can be generated to standard out if one wants
This commit is contained in:
Mark DePristo 2012-09-01 11:51:31 -04:00
parent 6055101df8
commit e01258b261
3 changed files with 48 additions and 13 deletions

View File

@ -41,7 +41,9 @@ import org.broadinstitute.sting.utils.interval.IntervalMergingRule;
import org.broadinstitute.sting.utils.interval.IntervalSetRule;
import java.io.File;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* @author aaron
@ -197,6 +199,12 @@ public class GATKArgumentCollection {
// performance log arguments
//
// --------------------------------------------------------------------------------------------------------------
/**
* The file name for the GATK performance log output, or null if you don't want to generate the
* detailed performance logging table. This table is suitable for importing into R or any
* other analysis software that can read tsv files
*/
@Argument(fullName = "performanceLog", shortName="PF", doc="If provided, a GATK runtime performance log will be written to this file", required = false)
public File performanceLog = null;

View File

@ -189,12 +189,26 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
/**
* Forward request to printProgress
*
* Assumes that one cycle has been completed
*
* @param shard the given shard currently being processed.
* @param loc the location
*/
public void printProgress(Shard shard, GenomeLoc loc) {
// A bypass is inserted here for unit testing.
printProgress(loc,shard.getReadMetrics(),false);
printProgress(loc,shard.getReadMetrics(),false, 1);
}
/**
* Forward request to printProgress
*
* @param shard the given shard currently being processed.
* @param loc the location
* @param nElapsedCycles the number of cycles (turns of map) that have occurred since the last call
*/
public void printProgress(Shard shard, GenomeLoc loc, int nElapsedCycles) {
// A bypass is inserted here for unit testing.
printProgress(loc,shard.getReadMetrics(),false, nElapsedCycles);
}
/**
@ -205,12 +219,16 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
* @param metrics Data processed since the last cumulative
* @param mustPrint If true, will print out info, regardless of nRecords or time interval
*/
private void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint) {
if ( mustPrint || printProgressCheckCounter++ % PRINT_PROGRESS_CHECK_FREQUENCY_IN_CYCLES != 0 )
private synchronized void printProgress(GenomeLoc loc, ReadMetrics metrics, boolean mustPrint, int nElapsedCycles) {
final int previousPrintCycle = printProgressCheckCounter / PRINT_PROGRESS_CHECK_FREQUENCY_IN_CYCLES;
final int newPrintCycle = (printProgressCheckCounter+nElapsedCycles) / PRINT_PROGRESS_CHECK_FREQUENCY_IN_CYCLES;
printProgressCheckCounter += nElapsedCycles; // keep track of our number of cycles through printProgress
if ( newPrintCycle == previousPrintCycle && ! mustPrint )
// don't do any work more often than PRINT_PROGRESS_CHECK_FREQUENCY_IN_CYCLES
return;
if(!progressMeterInitialized && mustPrint == false ) {
if( ! progressMeterInitialized ) {
logger.info("[INITIALIZATION COMPLETE; TRAVERSAL STARTING]");
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
"Location", getTraversalType(), getTraversalType()));
@ -250,8 +268,9 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
else
PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
logger.info(String.format("%15s %5.2e %s %s %4.1f%% %s %s",
loc == null ? "done with mapped reads" : loc, nRecords*1.0, elapsed, unitRate,
final String posName = loc == null ? (mustPrint ? "done" : "unmapped reads") : Integer.toString(loc.getStart());
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
posName, nRecords*1.0, elapsed, unitRate,
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
}
@ -309,7 +328,7 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
* Called after a traversal to print out information about the traversal process
*/
public void printOnTraversalDone() {
printProgress(null, null, true);
printProgress(null, null, true, 1);
final double elapsed = timer == null ? 0 : timer.getElapsedTime();

View File

@ -34,6 +34,7 @@ import org.broadinstitute.sting.gatk.datasources.providers.ReadView;
import org.broadinstitute.sting.gatk.datasources.reads.ReadShard;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.nanoScheduler.MapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction;
@ -87,9 +88,15 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
final TraverseReadsMap myMap = new TraverseReadsMap(walker);
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
T result = nanoScheduler.execute(aggregateMapData(dataProvider).iterator(), myMap, sum, myReduce);
// TODO -- how do we print progress?
//printProgress(dataProvider.getShard(), ???);
final List<MapData> aggregatedInputs = aggregateMapData(dataProvider);
final T result = nanoScheduler.execute(aggregatedInputs.iterator(), myMap, sum, myReduce);
final GATKSAMRecord lastRead = aggregatedInputs.get(aggregatedInputs.size() - 1).read;
final GenomeLoc locus = engine.getGenomeLocParser().createGenomeLoc(lastRead);
printProgress(dataProvider.getShard(), locus, aggregatedInputs.size());
// TODO -- how can I get done value?
// done = walker.isDone();
return result;
}
@ -165,8 +172,9 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
return walker.map(data.refContext, data.read, data.tracker);
}
}
return null; // TODO -- what should we return in the case where the walker is done or the read is filtered?
// TODO -- how can we cleanly support done and filtered. Need to return
// TODO -- a MapResult object that says the status
return null;
}
}
}