Merge branch 'master' of github.com:broadinstitute/gsa-unstable
This commit is contained in:
commit
00c23bf704
|
|
@ -436,7 +436,7 @@ public class UnifiedGenotyperIntegrationTest extends WalkerTest {
|
|||
@Test
|
||||
public void testNsInCigar() {
|
||||
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
|
||||
"-T UnifiedGenotyper -R " + b37KGReference + " --no_cmdline_in_header -I " + validationDataLocation + "testWithNs.bam -o %s -L 8:141813600-141813700 -out_mode EMIT_ALL_SITES", 1,
|
||||
"-T UnifiedGenotyper -R " + b37KGReference + " --no_cmdline_in_header -I " + privateTestDir + "testWithNs.bam -o %s -L 8:141813600-141813700 -out_mode EMIT_ALL_SITES", 1,
|
||||
Arrays.asList("32f18ba50406cd8c8069ba07f2f89558"));
|
||||
executeTest("test calling on reads with Ns in CIGAR", spec);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ import org.broadinstitute.sting.utils.AutoFormattingTime;
|
|||
import org.broadinstitute.sting.utils.MathUtils;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
||||
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
||||
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
|
||||
|
||||
|
|
@ -346,9 +345,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
for ( final TraversalEngine te : allCreatedTraversalEngines)
|
||||
te.shutdown();
|
||||
|
||||
// horrible hack to print nano scheduling information across all nano schedulers, if any were used
|
||||
NanoScheduler.printCombinedRuntimeProfile();
|
||||
|
||||
allCreatedTraversalEngines.clear();
|
||||
availableTraversalEngines.clear();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -495,4 +495,14 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Serializable, HasGenome
|
|||
public long sizeOfOverlap( final GenomeLoc that ) {
|
||||
return ( this.overlapsP(that) ? Math.min( getStop(), that.getStop() ) - Math.max( getStart(), that.getStart() ) + 1L : 0L );
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum GenomeLoc of this and other
|
||||
* @param other another non-null genome loc
|
||||
* @return the max of this and other
|
||||
*/
|
||||
public GenomeLoc max(final GenomeLoc other) {
|
||||
final int cmp = this.compareTo(other);
|
||||
return cmp == -1 ? other : this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
|
@ -19,11 +18,6 @@ class InputProducer<InputType> implements Runnable {
|
|||
*/
|
||||
final Iterator<InputType> inputReader;
|
||||
|
||||
/**
|
||||
* Our timer (may be null) that we use to track our input costs
|
||||
*/
|
||||
final SimpleTimer inputTimer;
|
||||
|
||||
/**
|
||||
* Where we put our input values for consumption
|
||||
*/
|
||||
|
|
@ -51,16 +45,13 @@ class InputProducer<InputType> implements Runnable {
|
|||
|
||||
public InputProducer(final Iterator<InputType> inputReader,
|
||||
final MultiThreadedErrorTracker errorTracker,
|
||||
final SimpleTimer inputTimer,
|
||||
final BlockingQueue<InputValue> outputQueue) {
|
||||
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
||||
if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null");
|
||||
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
|
||||
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
|
||||
|
||||
this.inputReader = inputReader;
|
||||
this.errorTracker = errorTracker;
|
||||
this.inputTimer = inputTimer;
|
||||
this.outputQueue = outputQueue;
|
||||
}
|
||||
|
||||
|
|
@ -94,16 +85,15 @@ class InputProducer<InputType> implements Runnable {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
private synchronized InputType readNextItem() throws InterruptedException {
|
||||
inputTimer.restart();
|
||||
if ( ! inputReader.hasNext() ) {
|
||||
// we are done, mark ourselves as such and return null
|
||||
readLastValue = true;
|
||||
inputTimer.stop();
|
||||
return null;
|
||||
} else {
|
||||
// get the next value, and return it
|
||||
final InputType input = inputReader.next();
|
||||
inputTimer.stop();
|
||||
if ( input == null )
|
||||
throw new IllegalStateException("inputReader.next() returned a null value, breaking our contract");
|
||||
nRead++;
|
||||
return input;
|
||||
}
|
||||
|
|
@ -121,6 +111,9 @@ class InputProducer<InputType> implements Runnable {
|
|||
final InputType value = readNextItem();
|
||||
|
||||
if ( value == null ) {
|
||||
if ( ! readLastValue )
|
||||
throw new IllegalStateException("value == null but readLastValue is false!");
|
||||
|
||||
// add the EOF object so our consumer knows we are done in all inputs
|
||||
// note that we do not increase inputID here, so that variable indicates the ID
|
||||
// of the last real value read from the queue
|
||||
|
|
@ -133,8 +126,10 @@ class InputProducer<InputType> implements Runnable {
|
|||
}
|
||||
|
||||
latch.countDown();
|
||||
} catch (Exception ex) {
|
||||
} catch (Throwable ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
} finally {
|
||||
// logger.info("Exiting input thread readLastValue = " + readLastValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.AutoFormattingTime;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
|
||||
/**
|
||||
* Holds runtime profile (input, read, map) times as tracked by NanoScheduler
|
||||
*
|
||||
* User: depristo
|
||||
* Date: 9/10/12
|
||||
* Time: 8:31 PM
|
||||
*/
|
||||
public class NSRuntimeProfile {
|
||||
final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside");
|
||||
final SimpleTimer inputTimer = new SimpleTimer("input");
|
||||
final SimpleTimer mapTimer = new SimpleTimer("map");
|
||||
final SimpleTimer reduceTimer = new SimpleTimer("reduce");
|
||||
|
||||
/**
|
||||
* Combine the elapsed time information from other with this profile
|
||||
*
|
||||
* @param other a non-null profile
|
||||
*/
|
||||
public void combine(final NSRuntimeProfile other) {
|
||||
outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer);
|
||||
inputTimer.addElapsed(other.inputTimer);
|
||||
mapTimer.addElapsed(other.mapTimer);
|
||||
reduceTimer.addElapsed(other.reduceTimer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the runtime profiling to logger
|
||||
*
|
||||
* @param logger
|
||||
*/
|
||||
public void log(final Logger logger) {
|
||||
log1(logger, "Input time", inputTimer);
|
||||
log1(logger, "Map time", mapTimer);
|
||||
log1(logger, "Reduce time", reduceTimer);
|
||||
log1(logger, "Outside time", outsideSchedulerTimer);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total runtime for all functions of this nano scheduler
|
||||
*/
|
||||
//@Ensures("result >= 0.0")
|
||||
public double totalRuntimeInSeconds() {
|
||||
return inputTimer.getElapsedTime()
|
||||
+ mapTimer.getElapsedTime()
|
||||
+ reduceTimer.getElapsedTime()
|
||||
+ outsideSchedulerTimer.getElapsedTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* Print to logger.info timing information from timer, with name label
|
||||
*
|
||||
* @param label the name of the timer to display. Should be human readable
|
||||
* @param timer the timer whose elapsed time we will display
|
||||
*/
|
||||
//@Requires({"label != null", "timer != null"})
|
||||
private void log1(final Logger logger, final String label, final SimpleTimer timer) {
|
||||
final double myTimeInSec = timer.getElapsedTime();
|
||||
final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100;
|
||||
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
|
||||
}
|
||||
}
|
||||
|
|
@ -57,16 +57,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
boolean debug = false;
|
||||
private NSProgressFunction<InputType> progressFunction = null;
|
||||
|
||||
/**
|
||||
* Tracks the combined runtime profiles across all created nano schedulers
|
||||
*/
|
||||
final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile();
|
||||
|
||||
/**
|
||||
* The profile specific to this nano scheduler
|
||||
*/
|
||||
final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile();
|
||||
|
||||
/**
|
||||
* Create a new nanoscheduler with the desire characteristics requested by the argument
|
||||
*
|
||||
|
|
@ -94,9 +84,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
|
||||
}
|
||||
|
||||
// start timing the time spent outside of the nanoScheduler
|
||||
myNSRuntimeProfile.outsideSchedulerTimer.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -123,11 +110,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
* After this call, execute cannot be invoked without throwing an error
|
||||
*/
|
||||
public void shutdown() {
|
||||
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||
|
||||
// add my timing information to the combined NS runtime profile
|
||||
combinedNSRuntimeProfiler.combine(myNSRuntimeProfile);
|
||||
|
||||
if ( nThreads > 1 ) {
|
||||
shutdownExecutor("inputExecutor", inputExecutor);
|
||||
shutdownExecutor("mapExecutor", mapExecutor);
|
||||
|
|
@ -137,19 +119,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
shutdown = true;
|
||||
}
|
||||
|
||||
public void printRuntimeProfile() {
|
||||
myNSRuntimeProfile.log(logger);
|
||||
}
|
||||
|
||||
public static void printCombinedRuntimeProfile() {
|
||||
if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 )
|
||||
combinedNSRuntimeProfiler.log(logger);
|
||||
}
|
||||
|
||||
protected double getTotalRuntime() {
|
||||
return myNSRuntimeProfile.totalRuntimeInSeconds();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to cleanly shutdown an execution service, checking that the execution
|
||||
* state is clean when it's done.
|
||||
|
|
@ -245,8 +214,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
||||
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
||||
|
||||
myNSRuntimeProfile.outsideSchedulerTimer.stop();
|
||||
|
||||
ReduceType result;
|
||||
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
|
||||
result = executeSingleThreaded(inputReader, map, initialValue, reduce);
|
||||
|
|
@ -254,7 +221,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
|
||||
}
|
||||
|
||||
myNSRuntimeProfile.outsideSchedulerTimer.restart();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -273,28 +239,19 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
|
||||
while ( true ) {
|
||||
// start timer to ensure that both hasNext and next are caught by the timer
|
||||
myNSRuntimeProfile.inputTimer.restart();
|
||||
if ( ! inputReader.hasNext() ) {
|
||||
myNSRuntimeProfile.inputTimer.stop();
|
||||
break;
|
||||
} else {
|
||||
final InputType input = inputReader.next();
|
||||
myNSRuntimeProfile.inputTimer.stop();
|
||||
|
||||
// map
|
||||
myNSRuntimeProfile.mapTimer.restart();
|
||||
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
|
||||
final MapType mapValue = map.apply(input);
|
||||
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
|
||||
myNSRuntimeProfile.mapTimer.stop();
|
||||
|
||||
if ( i++ % this.bufferSize == 0 && progressFunction != null )
|
||||
if ( progressFunction != null )
|
||||
progressFunction.progress(input);
|
||||
|
||||
// reduce
|
||||
myNSRuntimeProfile.reduceTimer.restart();
|
||||
sum = reduce.apply(mapValue, sum);
|
||||
myNSRuntimeProfile.reduceTimer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -320,6 +277,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
while ( true ) {
|
||||
// check that no errors occurred while we were waiting
|
||||
handleErrors();
|
||||
// checkForDeadlocks();
|
||||
|
||||
try {
|
||||
final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
|
||||
|
|
@ -341,6 +299,26 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
}
|
||||
}
|
||||
|
||||
// private void checkForDeadlocks() {
|
||||
// if ( deadLockCheckCounter++ % 100 == 0 ) {
|
||||
// logger.info("Checking for deadlocks...");
|
||||
// final ThreadMXBean bean = ManagementFactory.getThreadMXBean();
|
||||
// final long[] threadIds = bean.findDeadlockedThreads(); // Returns null if no threads are deadlocked.
|
||||
//
|
||||
// if (threadIds != null) {
|
||||
// final ThreadInfo[] infos = bean.getThreadInfo(threadIds);
|
||||
//
|
||||
// logger.error("!!! Deadlock detected !!!!");
|
||||
// for (final ThreadInfo info : infos) {
|
||||
// logger.error("Thread " + info);
|
||||
// for ( final StackTraceElement elt : info.getStackTrace() ) {
|
||||
// logger.error("\t" + elt.toString());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
private void handleErrors() {
|
||||
if ( errorTracker.hasAnErrorOccurred() ) {
|
||||
masterExecutor.shutdownNow();
|
||||
|
|
@ -380,7 +358,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
|
||||
// Create the input producer and start it running
|
||||
final InputProducer<InputType> inputProducer =
|
||||
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
|
||||
new InputProducer<InputType>(inputReader, errorTracker, inputQueue);
|
||||
inputExecutor.submit(inputProducer);
|
||||
|
||||
// a priority queue that stores up to bufferSize elements
|
||||
|
|
@ -389,7 +367,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
new PriorityBlockingQueue<MapResult<MapType>>();
|
||||
|
||||
final Reducer<MapType, ReduceType> reducer
|
||||
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
|
||||
= new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
|
||||
|
||||
try {
|
||||
int nSubmittedJobs = 0;
|
||||
|
|
@ -408,7 +386,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
|
||||
// wait for all of the input and map threads to finish
|
||||
return waitForCompletion(inputProducer, reducer);
|
||||
} catch (Exception ex) {
|
||||
} catch (Throwable ex) {
|
||||
// logger.warn("Reduce job got exception " + ex);
|
||||
errorTracker.notifyOfError(ex);
|
||||
return initialValue;
|
||||
}
|
||||
|
|
@ -486,16 +465,12 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final InputType input = inputWrapper.getValue();
|
||||
|
||||
// map
|
||||
myNSRuntimeProfile.mapTimer.restart();
|
||||
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
|
||||
final MapType mapValue = map.apply(input);
|
||||
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
|
||||
myNSRuntimeProfile.mapTimer.stop();
|
||||
|
||||
// enqueue the result into the mapResultQueue
|
||||
result = new MapResult<MapType>(mapValue, jobID);
|
||||
|
||||
if ( jobID % bufferSize == 0 && progressFunction != null )
|
||||
if ( progressFunction != null )
|
||||
progressFunction.progress(input);
|
||||
} else {
|
||||
// push back the EOF marker so other waiting threads can read it
|
||||
|
|
@ -508,7 +483,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
mapResultQueue.put(result);
|
||||
|
||||
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
|
||||
} catch (Exception ex) {
|
||||
} catch (Throwable ex) {
|
||||
// logger.warn("Map job got exception " + ex);
|
||||
errorTracker.notifyOfError(ex);
|
||||
} finally {
|
||||
// we finished a map job, release the job queue semaphore
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import com.google.java.contract.Ensures;
|
|||
import com.google.java.contract.Requires;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
|
@ -34,7 +33,6 @@ class Reducer<MapType, ReduceType> {
|
|||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final NSReduceFunction<MapType, ReduceType> reduce;
|
||||
final SimpleTimer reduceTimer;
|
||||
final MultiThreadedErrorTracker errorTracker;
|
||||
|
||||
/**
|
||||
|
|
@ -61,20 +59,16 @@ class Reducer<MapType, ReduceType> {
|
|||
* reduceTimer
|
||||
*
|
||||
* @param reduce the reduce function to apply
|
||||
* @param reduceTimer the timer to time the reduce function call
|
||||
* @param initialSum the initial reduce sum
|
||||
*/
|
||||
public Reducer(final NSReduceFunction<MapType, ReduceType> reduce,
|
||||
final MultiThreadedErrorTracker errorTracker,
|
||||
final SimpleTimer reduceTimer,
|
||||
final ReduceType initialSum) {
|
||||
if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null");
|
||||
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
|
||||
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
|
||||
|
||||
this.errorTracker = errorTracker;
|
||||
this.reduce = reduce;
|
||||
this.reduceTimer = reduceTimer;
|
||||
this.sum = initialSum;
|
||||
}
|
||||
|
||||
|
|
@ -125,10 +119,7 @@ class Reducer<MapType, ReduceType> {
|
|||
nReducesNow++;
|
||||
|
||||
// apply reduce, keeping track of sum
|
||||
reduceTimer.restart();
|
||||
sum = reduce.apply(result.getValue(), sum);
|
||||
reduceTimer.stop();
|
||||
|
||||
}
|
||||
|
||||
numJobsReduced++;
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ package org.broadinstitute.sting.utils.progressmeter;
|
|||
|
||||
import com.google.java.contract.Ensures;
|
||||
import com.google.java.contract.Invariant;
|
||||
import com.google.java.contract.Requires;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.*;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
|
@ -143,6 +144,12 @@ public class ProgressMeter {
|
|||
/** We use the SimpleTimer to time our run */
|
||||
private final SimpleTimer timer = new SimpleTimer();
|
||||
|
||||
private GenomeLoc maxGenomeLoc = null;
|
||||
private String positionMessage = "starting";
|
||||
private long nTotalRecordsProcessed = 0;
|
||||
|
||||
final ProgressMeterDaemon progressMeterDaemon;
|
||||
|
||||
/**
|
||||
* Create a new ProgressMeter
|
||||
*
|
||||
|
|
@ -177,21 +184,15 @@ public class ProgressMeter {
|
|||
targetSizeInBP = processingIntervals.coveredSize();
|
||||
|
||||
// start up the timer
|
||||
progressMeterDaemon = new ProgressMeterDaemon(this);
|
||||
start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Forward request to notifyOfProgress
|
||||
*
|
||||
* Assumes that one cycle has been completed
|
||||
*
|
||||
* @param loc our current location. Null means "in unmapped reads"
|
||||
* @param nTotalRecordsProcessed the total number of records we've processed
|
||||
* Start up the progress meter, printing initialization message and starting up the
|
||||
* daemon thread for periodic printing.
|
||||
*/
|
||||
public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) {
|
||||
notifyOfProgress(loc, false, nTotalRecordsProcessed);
|
||||
}
|
||||
|
||||
@Requires("progressMeterDaemon != null")
|
||||
private synchronized void start() {
|
||||
timer.start();
|
||||
lastProgressPrintTime = timer.currentTime();
|
||||
|
|
@ -199,6 +200,8 @@ public class ProgressMeter {
|
|||
logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]");
|
||||
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
|
||||
"Location", processingUnitName, processingUnitName));
|
||||
|
||||
progressMeterDaemon.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -216,19 +219,41 @@ public class ProgressMeter {
|
|||
* Synchronized to ensure that even with multiple threads calling notifyOfProgress we still
|
||||
* get one clean stream of meter logs.
|
||||
*
|
||||
* Note this thread doesn't actually print progress, unless must print is true, but just registers
|
||||
* the progress itself. A separate printing daemon periodically polls the meter to print out
|
||||
* progress
|
||||
*
|
||||
* @param loc Current location, can be null if you are at the end of the processing unit
|
||||
* @param mustPrint If true, will print out info, regardless of time interval
|
||||
* @param nTotalRecordsProcessed the total number of records we've processed
|
||||
*/
|
||||
private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) {
|
||||
public synchronized void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) {
|
||||
if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0");
|
||||
|
||||
// weird comparison to ensure that loc == null (in unmapped reads) is keep before maxGenomeLoc == null (on startup)
|
||||
this.maxGenomeLoc = loc == null ? loc : (maxGenomeLoc == null ? loc : loc.max(maxGenomeLoc));
|
||||
this.nTotalRecordsProcessed = Math.max(this.nTotalRecordsProcessed, nTotalRecordsProcessed);
|
||||
|
||||
// a pretty name for our position
|
||||
this.positionMessage = maxGenomeLoc == null
|
||||
? "unmapped reads"
|
||||
: String.format("%s:%d", maxGenomeLoc.getContig(), maxGenomeLoc.getStart());
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually try to print out progress
|
||||
*
|
||||
* This function may print out if the progress print is due, but if not enough time has elapsed
|
||||
* since the last print we will not print out information.
|
||||
*
|
||||
* @param mustPrint if true, progress will be printed regardless of the last time we printed progress
|
||||
*/
|
||||
protected synchronized void printProgress(final boolean mustPrint) {
|
||||
final long curTime = timer.currentTime();
|
||||
final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
|
||||
final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
|
||||
|
||||
if ( printProgress || printLog ) {
|
||||
final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed);
|
||||
final ProgressMeterData progressData = takeProgressSnapshot(maxGenomeLoc, nTotalRecordsProcessed);
|
||||
|
||||
final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds());
|
||||
final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP());
|
||||
|
|
@ -241,13 +266,8 @@ public class ProgressMeter {
|
|||
lastProgressPrintTime = curTime;
|
||||
updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds());
|
||||
|
||||
// a pretty name for our position
|
||||
final String posName = loc == null
|
||||
? (mustPrint ? "done" : "unmapped reads")
|
||||
: String.format("%s:%d", loc.getContig(), loc.getStart());
|
||||
|
||||
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
|
||||
posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate,
|
||||
positionMessage, progressData.getUnitsProcessed()*1.0, elapsed, unitRate,
|
||||
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
|
||||
|
||||
}
|
||||
|
|
@ -296,13 +316,18 @@ public class ProgressMeter {
|
|||
*/
|
||||
public void notifyDone(final long nTotalRecordsProcessed) {
|
||||
// print out the progress meter
|
||||
notifyOfProgress(null, true, nTotalRecordsProcessed);
|
||||
this.nTotalRecordsProcessed = nTotalRecordsProcessed;
|
||||
this.positionMessage = "done";
|
||||
printProgress(true);
|
||||
|
||||
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours",
|
||||
timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600));
|
||||
|
||||
if ( performanceLog != null )
|
||||
performanceLog.close();
|
||||
|
||||
// shutdown our daemon thread
|
||||
progressMeterDaemon.done();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,60 @@
|
|||
package org.broadinstitute.sting.utils.progressmeter;
|
||||
|
||||
/**
|
||||
* Daemon thread that periodically prints the progress of the progress meter
|
||||
*
|
||||
* User: depristo
|
||||
* Date: 12/4/12
|
||||
* Time: 9:16 PM
|
||||
*/
|
||||
public final class ProgressMeterDaemon extends Thread {
|
||||
/**
|
||||
* How frequently should we poll and print progress?
|
||||
*/
|
||||
private final static long POLL_FREQUENCY_MILLISECONDS = 10 * 1000;
|
||||
|
||||
/**
|
||||
* Are we to continue periodically printing status, or should we shut down?
|
||||
*/
|
||||
boolean done = false;
|
||||
|
||||
/**
|
||||
* The meter we will call print on
|
||||
*/
|
||||
final ProgressMeter meter;
|
||||
|
||||
/**
|
||||
* Create a new ProgressMeterDaemon printing progress for meter
|
||||
* @param meter the progress meter to print progress of
|
||||
*/
|
||||
public ProgressMeterDaemon(final ProgressMeter meter) {
|
||||
if ( meter == null ) throw new IllegalArgumentException("meter cannot be null");
|
||||
this.meter = meter;
|
||||
setDaemon(true);
|
||||
setName("ProgressMeterDaemon");
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells this daemon thread to shutdown at the next opportunity, as the progress
|
||||
* metering is complete.
|
||||
*/
|
||||
public final void done() {
|
||||
this.done = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start up the ProgressMeterDaemon, polling every tens of seconds to print, if
|
||||
* necessary, the provided progress meter. Never exits until the JVM is complete,
|
||||
* or done() is called, as the thread is a daemon thread
|
||||
*/
|
||||
public void run() {
|
||||
while (! done) {
|
||||
meter.printProgress(false);
|
||||
try {
|
||||
Thread.sleep(POLL_FREQUENCY_MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
|||
|
||||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
|
@ -46,7 +45,7 @@ public class InputProducerUnitTest extends BaseTest {
|
|||
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
||||
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
|
||||
|
||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
|
||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), readQueue);
|
||||
|
||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||
|
||||
|
|
@ -94,7 +93,7 @@ public class InputProducerUnitTest extends BaseTest {
|
|||
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
|
||||
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>();
|
||||
|
||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
|
||||
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), readQueue);
|
||||
|
||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||
es.submit(ip);
|
||||
|
|
|
|||
|
|
@ -188,17 +188,6 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
|
||||
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
|
||||
nanoScheduler.shutdown();
|
||||
|
||||
// TODO -- need to enable only in the case where there's serious time spend in
|
||||
// TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up
|
||||
final double myTimeEstimate = timer.getElapsedTime();
|
||||
final double tolerance = 0.1;
|
||||
if ( false && myTimeEstimate > 0.1 ) {
|
||||
Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance,
|
||||
"NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime()
|
||||
+ " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of "
|
||||
+ tolerance);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(enabled = true && ! DEBUG, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)
|
||||
|
|
@ -243,7 +232,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
for ( final int nThreads : Arrays.asList(8) ) {
|
||||
for ( final boolean addDelays : Arrays.asList(true, false) ) {
|
||||
final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(bufSize, nThreads, 1, 1000000, false);
|
||||
final int maxN = addDelays ? 10000 : 100000;
|
||||
final int maxN = addDelays ? 1000 : 10000;
|
||||
for ( int nElementsBeforeError = 0; nElementsBeforeError < maxN; nElementsBeforeError += Math.max(nElementsBeforeError / 10, 1) ) {
|
||||
tests.add(new Object[]{nElementsBeforeError, test, addDelays});
|
||||
}
|
||||
|
|
@ -259,17 +248,22 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
executeTestErrorThrowingInput(10, new NullPointerException(), exampleTest, false);
|
||||
}
|
||||
|
||||
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 10000)
|
||||
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 1000)
|
||||
public void testInputErrorIsThrown_RSE() throws InterruptedException {
|
||||
executeTestErrorThrowingInput(10, new ReviewedStingException("test"), exampleTest, false);
|
||||
}
|
||||
|
||||
@Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 10000, invocationCount = 1)
|
||||
public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
|
||||
@Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1)
|
||||
public void testInputRuntimeExceptionDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
|
||||
executeTestErrorThrowingInput(nElementsBeforeError, new NullPointerException(), test, addDelays);
|
||||
}
|
||||
|
||||
private void executeTestErrorThrowingInput(final int nElementsBeforeError, final RuntimeException ex, final NanoSchedulerBasicTest test, final boolean addDelays) {
|
||||
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1)
|
||||
public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
|
||||
executeTestErrorThrowingInput(nElementsBeforeError, new Error(), test, addDelays);
|
||||
}
|
||||
|
||||
private void executeTestErrorThrowingInput(final int nElementsBeforeError, final Throwable ex, final NanoSchedulerBasicTest test, final boolean addDelays) {
|
||||
logger.warn("executeTestErrorThrowingInput " + nElementsBeforeError + " ex=" + ex + " test=" + test + " addInputDelays=" + addDelays);
|
||||
final NanoScheduler<Integer, Integer, Integer> nanoScheduler = test.makeScheduler();
|
||||
nanoScheduler.execute(new ErrorThrowingIterator(nElementsBeforeError, ex, addDelays), test.makeMap(), test.initReduce(), test.makeReduce());
|
||||
|
|
@ -279,9 +273,9 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
final int nElementsBeforeError;
|
||||
final boolean addDelays;
|
||||
int i = 0;
|
||||
final RuntimeException ex;
|
||||
final Throwable ex;
|
||||
|
||||
private ErrorThrowingIterator(final int nElementsBeforeError, RuntimeException ex, boolean addDelays) {
|
||||
private ErrorThrowingIterator(final int nElementsBeforeError, Throwable ex, boolean addDelays) {
|
||||
this.nElementsBeforeError = nElementsBeforeError;
|
||||
this.ex = ex;
|
||||
this.addDelays = addDelays;
|
||||
|
|
@ -290,7 +284,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
@Override public boolean hasNext() { return true; }
|
||||
@Override public Integer next() {
|
||||
if ( i++ > nElementsBeforeError ) {
|
||||
throw ex;
|
||||
if ( ex instanceof Error )
|
||||
throw (Error)ex;
|
||||
else if ( ex instanceof RuntimeException )
|
||||
throw (RuntimeException)ex;
|
||||
else
|
||||
throw new RuntimeException("Bad exception " + ex);
|
||||
} else if ( addDelays ) {
|
||||
maybeDelayMe(i);
|
||||
return i;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
|
|||
|
||||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.DataProvider;
|
||||
|
|
@ -93,7 +92,7 @@ public class ReducerUnitTest extends BaseTest {
|
|||
|
||||
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
|
||||
final ReduceSumTest reduce = new ReduceSumTest();
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new MultiThreadedErrorTracker(), 0);
|
||||
|
||||
final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs));
|
||||
final ExecutorService es = Executors.newSingleThreadExecutor();
|
||||
|
|
@ -155,7 +154,7 @@ public class ReducerUnitTest extends BaseTest {
|
|||
private void runSettingJobIDTwice() throws Exception {
|
||||
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
|
||||
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
|
||||
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
|
||||
|
||||
reducer.setTotalJobCount(10);
|
||||
reducer.setTotalJobCount(15);
|
||||
|
|
|
|||
Loading…
Reference in New Issue