Reorganized NanoScheduler so that main thread does the reduces

-- Enables us to run -nt 2 -nct 2 and get meaningful output
-- Uses a sleep / poll mechanism.  Not ideal -- will look into wait / notify instead.
This commit is contained in:
Mark DePristo 2012-09-20 17:07:49 -04:00
parent 747694f7c2
commit 7425ab9637
2 changed files with 138 additions and 132 deletions

View File

@ -48,7 +48,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int bufferSize;
final int nThreads;
final ExecutorService inputExecutor;
final ExecutorService masterExecutor;
final ExecutorService errorWatchingExecutor;
final ExecutorService mapExecutor;
final Semaphore runningMapJobSlots;
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
@ -85,14 +85,14 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
this.nThreads = nThreads;
if ( nThreads == 1 ) {
this.mapExecutor = this.inputExecutor = this.masterExecutor = null;
this.mapExecutor = this.inputExecutor = this.errorWatchingExecutor = null;
runningMapJobSlots = null;
} else {
this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d"));
runningMapJobSlots = new Semaphore(this.bufferSize);
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.errorWatchingExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
}
// start timing the time spent outside of the nanoScheduler
@ -131,7 +131,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( nThreads > 1 ) {
shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor);
shutdownExecutor("masterExecutor", masterExecutor);
shutdownExecutor("errorWatchingExecutor", errorWatchingExecutor);
}
shutdown = true;
@ -313,148 +313,154 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final NSReduceFunction<MapType, ReduceType> reduce) {
debugPrint("Executing nanoScheduler");
// start up the master job
final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce);
final Future<ReduceType> reduceResult = masterExecutor.submit(masterJob);
final ErrorWatcherThread errorWatcher = new ErrorWatcherThread();
errorWatchingExecutor.submit(errorWatcher);
while ( true ) {
// check that no errors occurred while we were waiting
handleErrors();
// a blocking queue that limits the number of input datum to the requested buffer size
// note we need +1 because we continue to enqueue the lastObject
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
= new LinkedBlockingDeque<InputProducer<InputType>.InputValue>(bufferSize+1);
try {
final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
// Create the input producer and start it running
final InputProducer<InputType> inputProducer =
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
inputExecutor.submit(inputProducer);
// in case an error occurred in the reduce
handleErrors();
// a priority queue that stores up to bufferSize elements
// produced by completed map jobs.
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
new PriorityBlockingQueue<MapResult<MapType>>();
// return our final reduce result
return result;
} catch (final TimeoutException ex ) {
// a normal case -- we just aren't done
} catch (final InterruptedException ex) {
errorTracker.notifyOfError(ex);
// will handle error in the next round of the for loop
} catch (final ExecutionException ex) {
errorTracker.notifyOfError(ex);
// will handle error in the next round of the for loop
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
try {
int nSubmittedJobs = 0;
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
runningMapJobSlots.acquire();
mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
nSubmittedJobs++;
}
// mark the last job id we've submitted so we now the id to wait for
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
reducer.setTotalJobCount(nSubmittedJobs);
// wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer, errorWatcher);
} catch (Exception ex) {
// occurs in general because the error watching thread shut us down
throw errorTracker.notifyOfError(ex);
}
}
private void handleErrors() {
/**
* Wait until the input thread and all map threads have completed running, and return the final reduce result
*/
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
final Reducer<MapType, ReduceType> reducer,
final ErrorWatcherThread errorWatcher) throws InterruptedException {
// wait until we have a final reduce result
// logger.warn("waiting for final reduce");
final ReduceType finalSum = reducer.waitForFinalReduce();
// now wait for the input provider thread to terminate
// logger.warn("waiting on inputProducer");
inputProducer.waitForDone();
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map");
runningMapJobSlots.acquire(bufferSize);
runningMapJobSlots.release(bufferSize);
// We are done with everything so shutdown the errorWatcher thread
errorWatcher.shutdown();
// everything is finally shutdown, return the final reduce value
return finalSum;
}
/**
* Should we continue to submit jobs given the number of jobs already submitted and the
* number of read items in inputProducer?
*
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
* of jobs we've enqueued isn't the number of read elements. This means that in
* some cases we submit more jobs than total read elements (cannot know because of
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
*
* @param nJobsSubmitted
* @param inputProducer
* @return
*/
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems;
}
/**
* A thread that periodically wakes up and checks to see if an error has occurred, and if
* so shuts down the NanoScheduler (via shutdownNow()), sending an InterruptedException to
* the main thread, which throws the error in the errorTracker.
*
* The main thread should call shutdown() when its ready to return itself, which will cause
* the run() method of this thread to abort in the next iteration. Uses a local latch to
* cause the thread calling shutdown to block until the run() method exits.
*/
private class ErrorWatcherThread implements Runnable {
boolean done = false;
final CountDownLatch latch = new CountDownLatch(1);
private boolean isDone() {
return done;
}
/**
* Shutdown this ErrorWatcher, blocking until the run() method of this thread exits
*
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
this.done = true;
latch.await();
}
@Override
public void run() {
while ( ! isDone() ) {
try {
Thread.sleep(100);
handleErrorsIfOneOccurred();
} catch (final InterruptedException ex) {
break; // just exit
}
}
// free the latch so the shutdown thread starts up
latch.countDown();
}
}
/**
* If an error has occurred in the tracker, shut down the executors and
* throw the occur, otherwise do nothing.
*/
private void handleErrorsIfOneOccurred() {
if ( errorTracker.hasAnErrorOccurred() ) {
masterExecutor.shutdownNow();
mapExecutor.shutdownNow();
inputExecutor.shutdownNow();
errorWatchingExecutor.shutdownNow();
errorTracker.throwErrorIfPending();
}
}
/**
* MasterJob has the task to enqueue Map jobs and wait for the final reduce
*
* It must be run in a separate thread in order to properly handle errors that may occur
* in the input, map, or reduce jobs without deadlocking.
*
* The result of this callable is the final reduce value for the input / map / reduce jobs
* Executes a single map job, reading the next element from the input inputQueue
* and after mapping runs reduce on as many elements as possible
*/
private class MasterJob implements Callable<ReduceType> {
final Iterator<InputType> inputReader;
final NSMapFunction<InputType, MapType> map;
final ReduceType initialValue;
final NSReduceFunction<MapType, ReduceType> reduce;
private MasterJob(Iterator<InputType> inputReader, NSMapFunction<InputType, MapType> map, ReduceType initialValue, NSReduceFunction<MapType, ReduceType> reduce) {
this.inputReader = inputReader;
this.map = map;
this.initialValue = initialValue;
this.reduce = reduce;
}
@Override
public ReduceType call() {
// a blocking queue that limits the number of input datum to the requested buffer size
// note we need +1 because we continue to enqueue the lastObject
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
= new LinkedBlockingDeque<InputProducer<InputType>.InputValue>(bufferSize+1);
// Create the input producer and start it running
final InputProducer<InputType> inputProducer =
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
inputExecutor.submit(inputProducer);
// a priority queue that stores up to bufferSize elements
// produced by completed map jobs.
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
new PriorityBlockingQueue<MapResult<MapType>>();
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
try {
int nSubmittedJobs = 0;
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
runningMapJobSlots.acquire();
mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
nSubmittedJobs++;
}
// mark the last job id we've submitted so we now the id to wait for
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
reducer.setTotalJobCount(nSubmittedJobs);
// wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer);
} catch (Exception ex) {
errorTracker.notifyOfError(ex);
return initialValue;
}
}
/**
* Wait until the input thread and all map threads have completed running, and return the final reduce result
*/
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
// wait until we have a final reduce result
// logger.warn("waiting for final reduce");
final ReduceType finalSum = reducer.waitForFinalReduce();
// now wait for the input provider thread to terminate
// logger.warn("waiting on inputProducer");
inputProducer.waitForDone();
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map");
runningMapJobSlots.acquire(bufferSize);
runningMapJobSlots.release(bufferSize);
// everything is finally shutdown, return the final reduce value
return finalSum;
}
/**
* Should we continue to submit jobs given the number of jobs already submitted and the
* number of read items in inputProducer?
*
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
* of jobs we've enqueued isn't the number of read elements. This means that in
* some cases we submit more jobs than total read elements (cannot know because of
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
*
* @param nJobsSubmitted
* @param inputProducer
* @return
*/
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems;
}
}
private class MapReduceJob implements Runnable {
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue;
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue;

View File

@ -24,7 +24,7 @@ import java.util.List;
*/
public class NanoSchedulerUnitTest extends BaseTest {
private final static boolean debug = false;
public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000;
public static final int NANO_SCHEDULE_MAX_RUNTIME = 10000;
private static class Map2x implements NSMapFunction<Integer, Integer> {
@Override public Integer apply(Integer input) { return input * 2; }
@ -228,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
}
@Test(expectedExceptions = NullPointerException.class, timeOut = 10000)
@Test(expectedExceptions = NullPointerException.class, timeOut = 10000, invocationCount = 50)
public void testInputErrorIsThrown_NPE() throws InterruptedException {
executeTestErrorThrowingInput(new NullPointerException());
}
@Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000)
@Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000, invocationCount = 50)
public void testInputErrorIsThrown_RSE() throws InterruptedException {
executeTestErrorThrowingInput(new ReviewedStingException("test"));
}