Better error handling in NanoScheduler

-- The previous nanoscheduler would deadlock in the case where an Error, not an Exception, was thrown.  Errors, like out of memory, would cause the whole system to die.  This bugfix resolves that issue
This commit is contained in:
Mark DePristo 2012-12-04 20:59:07 -05:00
parent 51dbb562c9
commit 2b601571e7
3 changed files with 52 additions and 12 deletions

View File

@ -103,6 +103,8 @@ class InputProducer<InputType> implements Runnable {
} else {
// get the next value, and return it
final InputType input = inputReader.next();
if ( input == null )
throw new IllegalStateException("inputReader.next() returned a null value, breaking our contract");
inputTimer.stop();
nRead++;
return input;
@ -121,6 +123,9 @@ class InputProducer<InputType> implements Runnable {
final InputType value = readNextItem();
if ( value == null ) {
if ( ! readLastValue )
throw new IllegalStateException("value == null but readLastValue is false!");
// add the EOF object so our consumer knows we are done in all inputs
// note that we do not increase inputID here, so that variable indicates the ID
// of the last real value read from the queue
@ -133,8 +138,10 @@ class InputProducer<InputType> implements Runnable {
}
latch.countDown();
} catch (Exception ex) {
} catch (Throwable ex) {
errorTracker.notifyOfError(ex);
} finally {
// logger.info("Exiting input thread readLastValue = " + readLastValue);
}
}

View File

@ -320,6 +320,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
while ( true ) {
// check that no errors occurred while we were waiting
handleErrors();
// checkForDeadlocks();
try {
final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
@ -341,6 +342,26 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
}
}
// private void checkForDeadlocks() {
// if ( deadLockCheckCounter++ % 100 == 0 ) {
// logger.info("Checking for deadlocks...");
// final ThreadMXBean bean = ManagementFactory.getThreadMXBean();
// final long[] threadIds = bean.findDeadlockedThreads(); // Returns null if no threads are deadlocked.
//
// if (threadIds != null) {
// final ThreadInfo[] infos = bean.getThreadInfo(threadIds);
//
// logger.error("!!! Deadlock detected !!!!");
// for (final ThreadInfo info : infos) {
// logger.error("Thread " + info);
// for ( final StackTraceElement elt : info.getStackTrace() ) {
// logger.error("\t" + elt.toString());
// }
// }
// }
// }
// }
private void handleErrors() {
if ( errorTracker.hasAnErrorOccurred() ) {
masterExecutor.shutdownNow();
@ -408,7 +429,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer);
} catch (Exception ex) {
} catch (Throwable ex) {
// logger.warn("Reduce job got exception " + ex);
errorTracker.notifyOfError(ex);
return initialValue;
}
@ -495,7 +517,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, jobID);
if ( jobID % bufferSize == 0 && progressFunction != null )
if ( progressFunction != null )
progressFunction.progress(input);
} else {
// push back the EOF marker so other waiting threads can read it
@ -508,7 +530,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
mapResultQueue.put(result);
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Exception ex) {
} catch (Throwable ex) {
// logger.warn("Map job got exception " + ex);
errorTracker.notifyOfError(ex);
} finally {
// we finished a map job, release the job queue semaphore

View File

@ -243,7 +243,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
for ( final int nThreads : Arrays.asList(8) ) {
for ( final boolean addDelays : Arrays.asList(true, false) ) {
final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(bufSize, nThreads, 1, 1000000, false);
final int maxN = addDelays ? 10000 : 100000;
final int maxN = addDelays ? 1000 : 10000;
for ( int nElementsBeforeError = 0; nElementsBeforeError < maxN; nElementsBeforeError += Math.max(nElementsBeforeError / 10, 1) ) {
tests.add(new Object[]{nElementsBeforeError, test, addDelays});
}
@ -259,17 +259,22 @@ public class NanoSchedulerUnitTest extends BaseTest {
executeTestErrorThrowingInput(10, new NullPointerException(), exampleTest, false);
}
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 10000)
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, timeOut = 1000)
public void testInputErrorIsThrown_RSE() throws InterruptedException {
executeTestErrorThrowingInput(10, new ReviewedStingException("test"), exampleTest, false);
}
@Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 10000, invocationCount = 1)
public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
@Test(enabled = true, expectedExceptions = NullPointerException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1)
public void testInputRuntimeExceptionDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
executeTestErrorThrowingInput(nElementsBeforeError, new NullPointerException(), test, addDelays);
}
private void executeTestErrorThrowingInput(final int nElementsBeforeError, final RuntimeException ex, final NanoSchedulerBasicTest test, final boolean addDelays) {
@Test(enabled = true, expectedExceptions = ReviewedStingException.class, dataProvider = "NanoSchedulerInputExceptionTest", timeOut = 1000, invocationCount = 1)
public void testInputErrorDoesntDeadlock(final int nElementsBeforeError, final NanoSchedulerBasicTest test, final boolean addDelays ) throws InterruptedException {
executeTestErrorThrowingInput(nElementsBeforeError, new Error(), test, addDelays);
}
private void executeTestErrorThrowingInput(final int nElementsBeforeError, final Throwable ex, final NanoSchedulerBasicTest test, final boolean addDelays) {
logger.warn("executeTestErrorThrowingInput " + nElementsBeforeError + " ex=" + ex + " test=" + test + " addInputDelays=" + addDelays);
final NanoScheduler<Integer, Integer, Integer> nanoScheduler = test.makeScheduler();
nanoScheduler.execute(new ErrorThrowingIterator(nElementsBeforeError, ex, addDelays), test.makeMap(), test.initReduce(), test.makeReduce());
@ -279,9 +284,9 @@ public class NanoSchedulerUnitTest extends BaseTest {
final int nElementsBeforeError;
final boolean addDelays;
int i = 0;
final RuntimeException ex;
final Throwable ex;
private ErrorThrowingIterator(final int nElementsBeforeError, RuntimeException ex, boolean addDelays) {
private ErrorThrowingIterator(final int nElementsBeforeError, Throwable ex, boolean addDelays) {
this.nElementsBeforeError = nElementsBeforeError;
this.ex = ex;
this.addDelays = addDelays;
@ -290,7 +295,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
@Override public boolean hasNext() { return true; }
@Override public Integer next() {
if ( i++ > nElementsBeforeError ) {
throw ex;
if ( ex instanceof Error )
throw (Error)ex;
else if ( ex instanceof RuntimeException )
throw (RuntimeException)ex;
else
throw new RuntimeException("Bad exception " + ex);
} else if ( addDelays ) {
maybeDelayMe(i);
return i;