diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index bd99a9266..45c7c5096 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -103,6 +103,8 @@ class InputProducer implements Runnable { } else { // get the next value, and return it final InputType input = inputReader.next(); + if ( input == null ) + throw new IllegalStateException("inputReader.next() returned a null value, breaking our contract"); inputTimer.stop(); nRead++; return input; @@ -121,6 +123,9 @@ class InputProducer implements Runnable { final InputType value = readNextItem(); if ( value == null ) { + if ( ! readLastValue ) + throw new IllegalStateException("value == null but readLastValue is false!"); + // add the EOF object so our consumer knows we are done in all inputs // note that we do not increase inputID here, so that variable indicates the ID // of the last real value read from the queue @@ -133,8 +138,10 @@ class InputProducer implements Runnable { } latch.countDown(); - } catch (Exception ex) { + } catch (Throwable ex) { errorTracker.notifyOfError(ex); + } finally { +// logger.info("Exiting input thread readLastValue = " + readLastValue); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index d83a23c0f..6d769c2cf 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -320,6 +320,7 @@ public class NanoScheduler { while ( true ) { // check that no errors occurred while we were waiting handleErrors(); +// checkForDeadlocks(); try { final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS); @@ -341,6 +342,26 @@ public class NanoScheduler { } } +// private void checkForDeadlocks() { +// if ( deadLockCheckCounter++ % 100 == 0 ) { +// logger.info("Checking for deadlocks..."); +// final ThreadMXBean bean = ManagementFactory.getThreadMXBean(); +// final long[] threadIds = bean.findDeadlockedThreads(); // Returns null if no threads are deadlocked. +// +// if (threadIds != null) { +// final ThreadInfo[] infos = bean.getThreadInfo(threadIds); +// +// logger.error("!!! Deadlock detected !!!!"); +// for (final ThreadInfo info : infos) { +// logger.error("Thread " + info); +// for ( final StackTraceElement elt : info.getStackTrace() ) { +// logger.error("\t" + elt.toString()); +// } +// } +// } +// } +// } + private void handleErrors() { if ( errorTracker.hasAnErrorOccurred() ) { masterExecutor.shutdownNow(); @@ -408,7 +429,8 @@ public class NanoScheduler { // wait for all of the input and map threads to finish return waitForCompletion(inputProducer, reducer); - } catch (Exception ex) { + } catch (Throwable ex) { +// logger.warn("Reduce job got exception " + ex); errorTracker.notifyOfError(ex); return initialValue; } @@ -495,7 +517,7 @@ public class NanoScheduler { // enqueue the result into the mapResultQueue result = new MapResult(mapValue, jobID); - if ( jobID % bufferSize == 0 && progressFunction != null ) + if ( progressFunction != null ) progressFunction.progress(input); } else { // push back the EOF marker so other waiting threads can read it @@ -508,7 +530,8 @@ public class NanoScheduler { mapResultQueue.put(result); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); - } catch (Exception ex) { + } catch (Throwable ex) { +// logger.warn("Map job got exception " + ex); errorTracker.notifyOfError(ex); } finally { // we finished a map job, release the job queue semaphore diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index af2e18ad9..d415b8b4c 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -243,7 +243,7 @@ public class NanoSchedulerUnitTest extends BaseTest { for ( final int nThreads : Arrays.asList(8) ) { for ( final boolean addDelays : Arrays.asList(true, false) ) { final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(bufSize, nThreads, 1, 1000000, false); - final int maxN = addDelays ? 10000 : 100000; + final int maxN = addDelays ? 1000 : 10000; for ( int nElementsBeforeError = 0; nElementsBeforeError < maxN; nElementsBeforeError += Math.max(nElementsBeforeError / 10, 1) ) { tests.add(new Object[]{nElementsBeforeError, test, addDelays}); } @@ -259,17 +259,22 @@ public class NanoSchedulerUnitTest extends BaseTest { executeTestErrorThrowingInput(10, new NullPointerException(), exampleTest, false); } - @Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 10000) + @Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 1000) public void testInputErrorIsThrown_RSE() throws InterruptedException { executeTestErrorThrowingInput(10, new ReviewedStingException("test"), exampleTest, false); } - @Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 10000, invocationCount = 1) - public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { + @Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1) + public void testInputRuntimeExceptionDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { executeTestErrorThrowingInput(nElementsBeforeError, new NullPointerException(), test, addDelays); } - private void executeTestErrorThrowingInput(final int nElementsBeforeError, final RuntimeException ex, final NanoSchedulerBasicTest test, final boolean addDelays) { + @Test(enabled = true, expectedExceptions = ReviewedStingException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1) + public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException { + executeTestErrorThrowingInput(nElementsBeforeError, new Error(), test, addDelays); + } + + private void executeTestErrorThrowingInput(final int nElementsBeforeError, final Throwable ex, final NanoSchedulerBasicTest test, final boolean addDelays) { logger.warn("executeTestErrorThrowingInput " + nElementsBeforeError + " ex=" + ex + " test=" + test + " addInputDelays=" + addDelays); final NanoScheduler nanoScheduler = test.makeScheduler(); nanoScheduler.execute(new ErrorThrowingIterator(nElementsBeforeError, ex, addDelays), test.makeMap(), test.initReduce(), test.makeReduce()); @@ -279,9 +284,9 @@ public class NanoSchedulerUnitTest extends BaseTest { final int nElementsBeforeError; final boolean addDelays; int i = 0; - final RuntimeException ex; + final Throwable ex; - private ErrorThrowingIterator(final int nElementsBeforeError, RuntimeException ex, boolean addDelays) { + private ErrorThrowingIterator(final int nElementsBeforeError, Throwable ex, boolean addDelays) { this.nElementsBeforeError = nElementsBeforeError; this.ex = ex; this.addDelays = addDelays; @@ -290,7 +295,12 @@ public class NanoSchedulerUnitTest extends BaseTest { @Override public boolean hasNext() { return true; } @Override public Integer next() { if ( i++ > nElementsBeforeError ) { - throw ex; + if ( ex instanceof Error ) + throw (Error)ex; + else if ( ex instanceof RuntimeException ) + throw (RuntimeException)ex; + else + throw new RuntimeException("Bad exception " + ex); } else if ( addDelays ) { maybeDelayMe(i); return i;