diff --git a/protected/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/UnifiedGenotyperIntegrationTest.java b/protected/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/UnifiedGenotyperIntegrationTest.java index 9e9c7e37e..7459d131b 100755 --- a/protected/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/UnifiedGenotyperIntegrationTest.java +++ b/protected/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/UnifiedGenotyperIntegrationTest.java @@ -436,7 +436,7 @@ public class UnifiedGenotyperIntegrationTest extends WalkerTest { @Test public void testNsInCigar() { WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec( - "-T UnifiedGenotyper -R " + b37KGReference + " --no_cmdline_in_header -I " + validationDataLocation + "testWithNs.bam -o %s -L 8:141813600-141813700 -out_mode EMIT_ALL_SITES", 1, + "-T UnifiedGenotyper -R " + b37KGReference + " --no_cmdline_in_header -I " + privateTestDir + "testWithNs.bam -o %s -L 8:141813600-141813700 -out_mode EMIT_ALL_SITES", 1, Arrays.asList("32f18ba50406cd8c8069ba07f2f89558")); executeTest("test calling on reads with Ns in CIGAR", spec); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 38170040a..8d0cefaa4 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -43,7 +43,6 @@ import org.broadinstitute.sting.utils.AutoFormattingTime; import org.broadinstitute.sting.utils.MathUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor; @@ -346,9 +345,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { for ( final TraversalEngine te : allCreatedTraversalEngines) te.shutdown(); - // horrible hack to print nano scheduling information across all nano schedulers, if any were used - NanoScheduler.printCombinedRuntimeProfile(); - allCreatedTraversalEngines.clear(); availableTraversalEngines.clear(); } diff --git a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java index 4d2c26a79..ec82cdef2 100644 --- a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java +++ b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java @@ -495,4 +495,14 @@ public class GenomeLoc implements Comparable, Serializable, HasGenome public long sizeOfOverlap( final GenomeLoc that ) { return ( this.overlapsP(that) ? Math.min( getStop(), that.getStop() ) - Math.max( getStart(), that.getStart() ) + 1L : 0L ); } + + /** + * Returns the maximum GenomeLoc of this and other + * @param other another non-null genome loc + * @return the max of this and other + */ + public GenomeLoc max(final GenomeLoc other) { + final int cmp = this.compareTo(other); + return cmp == -1 ? other : this; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index bd99a9266..0e0237412 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import org.broadinstitute.sting.utils.SimpleTimer; import java.util.Iterator; import java.util.concurrent.BlockingQueue; @@ -19,11 +18,6 @@ class InputProducer implements Runnable { */ final Iterator inputReader; - /** - * Our timer (may be null) that we use to track our input costs - */ - final SimpleTimer inputTimer; - /** * Where we put our input values for consumption */ @@ -51,16 +45,13 @@ class InputProducer implements Runnable { public InputProducer(final Iterator inputReader, final MultiThreadedErrorTracker errorTracker, - final SimpleTimer inputTimer, final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null"); - if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; this.errorTracker = errorTracker; - this.inputTimer = inputTimer; this.outputQueue = outputQueue; } @@ -94,16 +85,15 @@ class InputProducer implements Runnable { * @throws InterruptedException */ private synchronized InputType readNextItem() throws InterruptedException { - inputTimer.restart(); if ( ! inputReader.hasNext() ) { // we are done, mark ourselves as such and return null readLastValue = true; - inputTimer.stop(); return null; } else { // get the next value, and return it final InputType input = inputReader.next(); - inputTimer.stop(); + if ( input == null ) + throw new IllegalStateException("inputReader.next() returned a null value, breaking our contract"); nRead++; return input; } @@ -121,6 +111,9 @@ class InputProducer implements Runnable { final InputType value = readNextItem(); if ( value == null ) { + if ( ! readLastValue ) + throw new IllegalStateException("value == null but readLastValue is false!"); + // add the EOF object so our consumer knows we are done in all inputs // note that we do not increase inputID here, so that variable indicates the ID // of the last real value read from the queue @@ -133,8 +126,10 @@ class InputProducer implements Runnable { } latch.countDown(); - } catch (Exception ex) { + } catch (Throwable ex) { errorTracker.notifyOfError(ex); + } finally { +// logger.info("Exiting input thread readLastValue = " + readLastValue); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java deleted file mode 100644 index 0926b4c50..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSRuntimeProfile.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.AutoFormattingTime; -import org.broadinstitute.sting.utils.SimpleTimer; - -/** - * Holds runtime profile (input, read, map) times as tracked by NanoScheduler - * - * User: depristo - * Date: 9/10/12 - * Time: 8:31 PM - */ -public class NSRuntimeProfile { - final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside"); - final SimpleTimer inputTimer = new SimpleTimer("input"); - final SimpleTimer mapTimer = new SimpleTimer("map"); - final SimpleTimer reduceTimer = new SimpleTimer("reduce"); - - /** - * Combine the elapsed time information from other with this profile - * - * @param other a non-null profile - */ - public void combine(final NSRuntimeProfile other) { - outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer); - inputTimer.addElapsed(other.inputTimer); - mapTimer.addElapsed(other.mapTimer); - reduceTimer.addElapsed(other.reduceTimer); - } - - /** - * Print the runtime profiling to logger - * - * @param logger - */ - public void log(final Logger logger) { - log1(logger, "Input time", inputTimer); - log1(logger, "Map time", mapTimer); - log1(logger, "Reduce time", reduceTimer); - log1(logger, "Outside time", outsideSchedulerTimer); - } - - /** - * @return the total runtime for all functions of this nano scheduler - */ - //@Ensures("result >= 0.0") - public double totalRuntimeInSeconds() { - return inputTimer.getElapsedTime() - + mapTimer.getElapsedTime() - + reduceTimer.getElapsedTime() - + outsideSchedulerTimer.getElapsedTime(); - } - - /** - * Print to logger.info timing information from timer, with name label - * - * @param label the name of the timer to display. Should be human readable - * @param timer the timer whose elapsed time we will display - */ - //@Requires({"label != null", "timer != null"}) - private void log1(final Logger logger, final String label, final SimpleTimer timer) { - final double myTimeInSec = timer.getElapsedTime(); - final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100; - logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent)); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d83a23c0f..4cc91faa4 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -57,16 +57,6 @@ public class NanoScheduler { boolean debug = false; private NSProgressFunction progressFunction = null; - /** - * Tracks the combined runtime profiles across all created nano schedulers - */ - final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile(); - - /** - * The profile specific to this nano scheduler - */ - final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile(); - /** * Create a new nanoscheduler with the desire characteristics requested by the argument * @@ -94,9 +84,6 @@ public class NanoScheduler { this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d")); } - - // start timing the time spent outside of the nanoScheduler - myNSRuntimeProfile.outsideSchedulerTimer.start(); } /** @@ -123,11 +110,6 @@ public class NanoScheduler { * After this call, execute cannot be invoked without throwing an error */ public void shutdown() { - myNSRuntimeProfile.outsideSchedulerTimer.stop(); - - // add my timing information to the combined NS runtime profile - combinedNSRuntimeProfiler.combine(myNSRuntimeProfile); - if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); @@ -137,19 +119,6 @@ public class NanoScheduler { shutdown = true; } - public void printRuntimeProfile() { - myNSRuntimeProfile.log(logger); - } - - public static void printCombinedRuntimeProfile() { - if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 ) - combinedNSRuntimeProfiler.log(logger); - } - - protected double getTotalRuntime() { - return myNSRuntimeProfile.totalRuntimeInSeconds(); - } - /** * Helper function to cleanly shutdown an execution service, checking that the execution * state is clean when it's done. @@ -245,8 +214,6 @@ public class NanoScheduler { if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); - myNSRuntimeProfile.outsideSchedulerTimer.stop(); - ReduceType result; if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { result = executeSingleThreaded(inputReader, map, initialValue, reduce); @@ -254,7 +221,6 @@ public class NanoScheduler { result = executeMultiThreaded(inputReader, map, initialValue, reduce); } - myNSRuntimeProfile.outsideSchedulerTimer.restart(); return result; } @@ -273,28 +239,19 @@ public class NanoScheduler { while ( true ) { // start timer to ensure that both hasNext and next are caught by the timer - myNSRuntimeProfile.inputTimer.restart(); if ( ! inputReader.hasNext() ) { - myNSRuntimeProfile.inputTimer.stop(); break; } else { final InputType input = inputReader.next(); - myNSRuntimeProfile.inputTimer.stop(); // map - myNSRuntimeProfile.mapTimer.restart(); - final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano(); final MapType mapValue = map.apply(input); - if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime)); - myNSRuntimeProfile.mapTimer.stop(); - if ( i++ % this.bufferSize == 0 && progressFunction != null ) + if ( progressFunction != null ) progressFunction.progress(input); // reduce - myNSRuntimeProfile.reduceTimer.restart(); sum = reduce.apply(mapValue, sum); - myNSRuntimeProfile.reduceTimer.stop(); } } @@ -320,6 +277,7 @@ public class NanoScheduler { while ( true ) { // check that no errors occurred while we were waiting handleErrors(); +// checkForDeadlocks(); try { final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS); @@ -341,6 +299,26 @@ public class NanoScheduler { } } +// private void checkForDeadlocks() { +// if ( deadLockCheckCounter++ % 100 == 0 ) { +// logger.info("Checking for deadlocks..."); +// final ThreadMXBean bean = ManagementFactory.getThreadMXBean(); +// final long[] threadIds = bean.findDeadlockedThreads(); // Returns null if no threads are deadlocked. +// +// if (threadIds != null) { +// final ThreadInfo[] infos = bean.getThreadInfo(threadIds); +// +// logger.error("!!! Deadlock detected !!!!"); +// for (final ThreadInfo info : infos) { +// logger.error("Thread " + info); +// for ( final StackTraceElement elt : info.getStackTrace() ) { +// logger.error("\t" + elt.toString()); +// } +// } +// } +// } +// } + private void handleErrors() { if ( errorTracker.hasAnErrorOccurred() ) { masterExecutor.shutdownNow(); @@ -380,7 +358,7 @@ public class NanoScheduler { // Create the input producer and start it running final InputProducer inputProducer = - new InputProducer(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue); + new InputProducer(inputReader, errorTracker, inputQueue); inputExecutor.submit(inputProducer); // a priority queue that stores up to bufferSize elements @@ -389,7 +367,7 @@ public class NanoScheduler { new PriorityBlockingQueue>(); final Reducer reducer - = new Reducer(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue); + = new Reducer(reduce, errorTracker, initialValue); try { int nSubmittedJobs = 0; @@ -408,7 +386,8 @@ public class NanoScheduler { // wait for all of the input and map threads to finish return waitForCompletion(inputProducer, reducer); - } catch (Exception ex) { + } catch (Throwable ex) { +// logger.warn("Reduce job got exception " + ex); errorTracker.notifyOfError(ex); return initialValue; } @@ -486,16 +465,12 @@ public class NanoScheduler { final InputType input = inputWrapper.getValue(); // map - myNSRuntimeProfile.mapTimer.restart(); - final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano(); final MapType mapValue = map.apply(input); - if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime)); - myNSRuntimeProfile.mapTimer.stop(); // enqueue the result into the mapResultQueue result = new MapResult(mapValue, jobID); - if ( jobID % bufferSize == 0 && progressFunction != null ) + if ( progressFunction != null ) progressFunction.progress(input); } else { // push back the EOF marker so other waiting threads can read it @@ -508,7 +483,8 @@ public class NanoScheduler { mapResultQueue.put(result); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); - } catch (Exception ex) { + } catch (Throwable ex) { +// logger.warn("Map job got exception " + ex); errorTracker.notifyOfError(ex); } finally { // we finished a map job, release the job queue semaphore diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 92c1018eb..5cae28187 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -4,7 +4,6 @@ import com.google.java.contract.Ensures; import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import org.broadinstitute.sting.utils.SimpleTimer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; @@ -34,7 +33,6 @@ class Reducer { final CountDownLatch countDownLatch = new CountDownLatch(1); final NSReduceFunction reduce; - final SimpleTimer reduceTimer; final MultiThreadedErrorTracker errorTracker; /** @@ -61,20 +59,16 @@ class Reducer { * reduceTimer * * @param reduce the reduce function to apply - * @param reduceTimer the timer to time the reduce function call * @param initialSum the initial reduce sum */ public Reducer(final NSReduceFunction reduce, final MultiThreadedErrorTracker errorTracker, - final SimpleTimer reduceTimer, final ReduceType initialSum) { if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); - if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); this.errorTracker = errorTracker; this.reduce = reduce; - this.reduceTimer = reduceTimer; this.sum = initialSum; } @@ -125,10 +119,7 @@ class Reducer { nReducesNow++; // apply reduce, keeping track of sum - reduceTimer.restart(); sum = reduce.apply(result.getValue(), sum); - reduceTimer.stop(); - } numJobsReduced++; diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java index a8715e242..b69283b9d 100755 --- a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeter.java @@ -26,6 +26,7 @@ package org.broadinstitute.sting.utils.progressmeter; import com.google.java.contract.Ensures; import com.google.java.contract.Invariant; +import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -143,6 +144,12 @@ public class ProgressMeter { /** We use the SimpleTimer to time our run */ private final SimpleTimer timer = new SimpleTimer(); + private GenomeLoc maxGenomeLoc = null; + private String positionMessage = "starting"; + private long nTotalRecordsProcessed = 0; + + final ProgressMeterDaemon progressMeterDaemon; + /** * Create a new ProgressMeter * @@ -177,21 +184,15 @@ public class ProgressMeter { targetSizeInBP = processingIntervals.coveredSize(); // start up the timer + progressMeterDaemon = new ProgressMeterDaemon(this); start(); } /** - * Forward request to notifyOfProgress - * - * Assumes that one cycle has been completed - * - * @param loc our current location. Null means "in unmapped reads" - * @param nTotalRecordsProcessed the total number of records we've processed + * Start up the progress meter, printing initialization message and starting up the + * daemon thread for periodic printing. */ - public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) { - notifyOfProgress(loc, false, nTotalRecordsProcessed); - } - + @Requires("progressMeterDaemon != null") private synchronized void start() { timer.start(); lastProgressPrintTime = timer.currentTime(); @@ -199,6 +200,8 @@ public class ProgressMeter { logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]"); logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining", "Location", processingUnitName, processingUnitName)); + + progressMeterDaemon.start(); } /** @@ -216,19 +219,41 @@ public class ProgressMeter { * Synchronized to ensure that even with multiple threads calling notifyOfProgress we still * get one clean stream of meter logs. * + * Note this thread doesn't actually print progress, unless must print is true, but just registers + * the progress itself. A separate printing daemon periodically polls the meter to print out + * progress + * * @param loc Current location, can be null if you are at the end of the processing unit - * @param mustPrint If true, will print out info, regardless of time interval * @param nTotalRecordsProcessed the total number of records we've processed */ - private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) { + public synchronized void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) { if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0"); + // weird comparison to ensure that loc == null (in unmapped reads) is keep before maxGenomeLoc == null (on startup) + this.maxGenomeLoc = loc == null ? loc : (maxGenomeLoc == null ? loc : loc.max(maxGenomeLoc)); + this.nTotalRecordsProcessed = Math.max(this.nTotalRecordsProcessed, nTotalRecordsProcessed); + + // a pretty name for our position + this.positionMessage = maxGenomeLoc == null + ? "unmapped reads" + : String.format("%s:%d", maxGenomeLoc.getContig(), maxGenomeLoc.getStart()); + } + + /** + * Actually try to print out progress + * + * This function may print out if the progress print is due, but if not enough time has elapsed + * since the last print we will not print out information. + * + * @param mustPrint if true, progress will be printed regardless of the last time we printed progress + */ + protected synchronized void printProgress(final boolean mustPrint) { final long curTime = timer.currentTime(); final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency); final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY); if ( printProgress || printLog ) { - final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed); + final ProgressMeterData progressData = takeProgressSnapshot(maxGenomeLoc, nTotalRecordsProcessed); final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds()); final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP()); @@ -241,13 +266,8 @@ public class ProgressMeter { lastProgressPrintTime = curTime; updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds()); - // a pretty name for our position - final String posName = loc == null - ? (mustPrint ? "done" : "unmapped reads") - : String.format("%s:%d", loc.getContig(), loc.getStart()); - logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s", - posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate, + positionMessage, progressData.getUnitsProcessed()*1.0, elapsed, unitRate, 100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion)); } @@ -296,13 +316,18 @@ public class ProgressMeter { */ public void notifyDone(final long nTotalRecordsProcessed) { // print out the progress meter - notifyOfProgress(null, true, nTotalRecordsProcessed); + this.nTotalRecordsProcessed = nTotalRecordsProcessed; + this.positionMessage = "done"; + printProgress(true); logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours", timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600)); if ( performanceLog != null ) performanceLog.close(); + + // shutdown our daemon thread + progressMeterDaemon.done(); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java new file mode 100644 index 000000000..16887400a --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/progressmeter/ProgressMeterDaemon.java @@ -0,0 +1,60 @@ +package org.broadinstitute.sting.utils.progressmeter; + +/** + * Daemon thread that periodically prints the progress of the progress meter + * + * User: depristo + * Date: 12/4/12 + * Time: 9:16 PM + */ +public final class ProgressMeterDaemon extends Thread { + /** + * How frequently should we poll and print progress? + */ + private final static long POLL_FREQUENCY_MILLISECONDS = 10 * 1000; + + /** + * Are we to continue periodically printing status, or should we shut down? + */ + boolean done = false; + + /** + * The meter we will call print on + */ + final ProgressMeter meter; + + /** + * Create a new ProgressMeterDaemon printing progress for meter + * @param meter the progress meter to print progress of + */ + public ProgressMeterDaemon(final ProgressMeter meter) { + if ( meter == null ) throw new IllegalArgumentException("meter cannot be null"); + this.meter = meter; + setDaemon(true); + setName("ProgressMeterDaemon"); + } + + /** + * Tells this daemon thread to shutdown at the next opportunity, as the progress + * metering is complete. + */ + public final void done() { + this.done = true; + } + + /** + * Start up the ProgressMeterDaemon, polling every tens of seconds to print, if + * necessary, the provided progress meter. Never exits until the JVM is complete, + * or done() is called, as the thread is a daemon thread + */ + public void run() { + while (! done) { + meter.printProgress(false); + try { + Thread.sleep(POLL_FREQUENCY_MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 6c59f1585..489adab6b 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import org.broadinstitute.sting.utils.SimpleTimer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -46,7 +45,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(queueSize); - final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); @@ -94,7 +93,7 @@ public class InputProducerUnitTest extends BaseTest { final LinkedBlockingDeque.InputValue> readQueue = new LinkedBlockingDeque.InputValue>(); - final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue); + final InputProducer ip = new InputProducer(elements.iterator(), new MultiThreadedErrorTracker(), readQueue); final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(ip); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index af2e18ad9..61e8ec0a1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -188,17 +188,6 @@ public class NanoSchedulerUnitTest extends BaseTest { Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks); nanoScheduler.shutdown(); - - // TODO -- need to enable only in the case where there's serious time spend in - // TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up - final double myTimeEstimate = timer.getElapsedTime(); - final double tolerance = 0.1; - if ( false && myTimeEstimate > 0.1 ) { - Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance, - "NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime() - + " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of " - + tolerance); - } } @Test(enabled = true && ! DEBUG, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME) @@ -243,7 +232,7 @@ public class NanoSchedulerUnitTest extends BaseTest { for ( final int nThreads : Arrays.asList(8) ) { for ( final boolean addDelays : Arrays.asList(true, false) ) { final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(bufSize, nThreads, 1, 1000000, false); - final int maxN = addDelays ? 10000 : 100000; + final int maxN = addDelays ? 1000 : 10000; for ( int nElementsBeforeError = 0; nElementsBeforeError < maxN; nElementsBeforeError += Math.max(nElementsBeforeError / 10, 1) ) { tests.add(new Object[]{nElementsBeforeError, test, addDelays}); } @@ -259,17 +248,22 @@ public class NanoSchedulerUnitTest extends BaseTest { executeTestErrorThrowingInput(10, new NullPointerException(), exampleTest, false); } - @Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 10000) + @Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 1000) public void testInputErrorIsThrown_RSE() throws InterruptedException { executeTestErrorThrowingInput(10, new ReviewedStingException("test"), exampleTest, false); } - @Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 10000, invocationCount = 1) - public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { + @Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1) + public void testInputRuntimeExceptionDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { executeTestErrorThrowingInput(nElementsBeforeError, new NullPointerException(), test, addDelays); } - private void executeTestErrorThrowingInput(final int nElementsBeforeError, final RuntimeException ex, final NanoSchedulerBasicTest test, final boolean addDelays) { + @Test(enabled = true, expectedExceptions = ReviewedStingException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1) + public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { + executeTestErrorThrowingInput(nElementsBeforeError, new Error(), test, addDelays); + } + + private void executeTestErrorThrowingInput(final int nElementsBeforeError, final Throwable ex, final NanoSchedulerBasicTest test, final boolean addDelays) { logger.warn("executeTestErrorThrowingInput " + nElementsBeforeError + " ex=" + ex + " test=" + test + " addInputDelays=" + addDelays); final NanoScheduler nanoScheduler = test.makeScheduler(); nanoScheduler.execute(new ErrorThrowingIterator(nElementsBeforeError, ex, addDelays), test.makeMap(), test.initReduce(), test.makeReduce()); @@ -279,9 +273,9 @@ public class NanoSchedulerUnitTest extends BaseTest { final int nElementsBeforeError; final boolean addDelays; int i = 0; - final RuntimeException ex; + final Throwable ex; - private ErrorThrowingIterator(final int nElementsBeforeError, RuntimeException ex, boolean addDelays) { + private ErrorThrowingIterator(final int nElementsBeforeError, Throwable ex, boolean addDelays) { this.nElementsBeforeError = nElementsBeforeError; this.ex = ex; this.addDelays = addDelays; @@ -290,7 +284,12 @@ public class NanoSchedulerUnitTest extends BaseTest { @Override public boolean hasNext() { return true; } @Override public Integer next() { if ( i++ > nElementsBeforeError ) { - throw ex; + if ( ex instanceof Error ) + throw (Error)ex; + else if ( ex instanceof RuntimeException ) + throw (RuntimeException)ex; + else + throw new RuntimeException("Bad exception " + ex); } else if ( addDelays ) { maybeDelayMe(i); return i; diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 39133d1ed..6c17aa78d 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; -import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.Utils; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -93,7 +92,7 @@ public class ReducerUnitTest extends BaseTest { final List>> jobGroups = Utils.groupList(allJobs, groupSize); final ReduceSumTest reduce = new ReduceSumTest(); - final Reducer reducer = new Reducer(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0); + final Reducer reducer = new Reducer(reduce, new MultiThreadedErrorTracker(), 0); final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs)); final ExecutorService es = Executors.newSingleThreadExecutor(); @@ -155,7 +154,7 @@ public class ReducerUnitTest extends BaseTest { private void runSettingJobIDTwice() throws Exception { final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); - final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0); + final Reducer reducer = new Reducer(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0); reducer.setTotalJobCount(10); reducer.setTotalJobCount(15);