Revert "Reorganized NanoScheduler so that main thread does the reduces"
Doesn't actually fix the problem, and adds an unnecessary delay in closing down NanoScheduler, so reverting. This reverts commit 66b820bf94ae755a8a0c71ea16f4cae56fd3e852.
This commit is contained in:
parent
7425ab9637
commit
ba9e95a8fe
|
|
@ -48,7 +48,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final int bufferSize;
|
||||
final int nThreads;
|
||||
final ExecutorService inputExecutor;
|
||||
final ExecutorService errorWatchingExecutor;
|
||||
final ExecutorService masterExecutor;
|
||||
final ExecutorService mapExecutor;
|
||||
final Semaphore runningMapJobSlots;
|
||||
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
|
||||
|
|
@ -85,14 +85,14 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
this.nThreads = nThreads;
|
||||
|
||||
if ( nThreads == 1 ) {
|
||||
this.mapExecutor = this.inputExecutor = this.errorWatchingExecutor = null;
|
||||
this.mapExecutor = this.inputExecutor = this.masterExecutor = null;
|
||||
runningMapJobSlots = null;
|
||||
} else {
|
||||
this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d"));
|
||||
runningMapJobSlots = new Semaphore(this.bufferSize);
|
||||
|
||||
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
this.errorWatchingExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
}
|
||||
|
||||
// start timing the time spent outside of the nanoScheduler
|
||||
|
|
@ -131,7 +131,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
if ( nThreads > 1 ) {
|
||||
shutdownExecutor("inputExecutor", inputExecutor);
|
||||
shutdownExecutor("mapExecutor", mapExecutor);
|
||||
shutdownExecutor("errorWatchingExecutor", errorWatchingExecutor);
|
||||
shutdownExecutor("masterExecutor", masterExecutor);
|
||||
}
|
||||
|
||||
shutdown = true;
|
||||
|
|
@ -313,154 +313,148 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final NSReduceFunction<MapType, ReduceType> reduce) {
|
||||
debugPrint("Executing nanoScheduler");
|
||||
|
||||
final ErrorWatcherThread errorWatcher = new ErrorWatcherThread();
|
||||
errorWatchingExecutor.submit(errorWatcher);
|
||||
// start up the master job
|
||||
final MasterJob masterJob = new MasterJob(inputReader, map, initialValue, reduce);
|
||||
final Future<ReduceType> reduceResult = masterExecutor.submit(masterJob);
|
||||
|
||||
// a blocking queue that limits the number of input datum to the requested buffer size
|
||||
// note we need +1 because we continue to enqueue the lastObject
|
||||
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
|
||||
= new LinkedBlockingDeque<InputProducer<InputType>.InputValue>(bufferSize+1);
|
||||
while ( true ) {
|
||||
// check that no errors occurred while we were waiting
|
||||
handleErrors();
|
||||
|
||||
// Create the input producer and start it running
|
||||
final InputProducer<InputType> inputProducer =
|
||||
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
|
||||
inputExecutor.submit(inputProducer);
|
||||
try {
|
||||
final ReduceType result = reduceResult.get(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
// a priority queue that stores up to bufferSize elements
|
||||
// produced by completed map jobs.
|
||||
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
|
||||
new PriorityBlockingQueue<MapResult<MapType>>();
|
||||
// in case an error occurred in the reduce
|
||||
handleErrors();
|
||||
|
||||
final Reducer<MapType, ReduceType> reducer
|
||||
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
|
||||
|
||||
try {
|
||||
int nSubmittedJobs = 0;
|
||||
|
||||
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
|
||||
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
|
||||
runningMapJobSlots.acquire();
|
||||
|
||||
mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
|
||||
nSubmittedJobs++;
|
||||
// return our final reduce result
|
||||
return result;
|
||||
} catch (final TimeoutException ex ) {
|
||||
// a normal case -- we just aren't done
|
||||
} catch (final InterruptedException ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
// will handle error in the next round of the for loop
|
||||
} catch (final ExecutionException ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
// will handle error in the next round of the for loop
|
||||
}
|
||||
|
||||
// mark the last job id we've submitted so we now the id to wait for
|
||||
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
|
||||
reducer.setTotalJobCount(nSubmittedJobs);
|
||||
|
||||
// wait for all of the input and map threads to finish
|
||||
return waitForCompletion(inputProducer, reducer, errorWatcher);
|
||||
} catch (Exception ex) {
|
||||
// occurs in general because the error watching thread shut us down
|
||||
throw errorTracker.notifyOfError(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the input thread and all map threads have completed running, and return the final reduce result
|
||||
*/
|
||||
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
|
||||
final Reducer<MapType, ReduceType> reducer,
|
||||
final ErrorWatcherThread errorWatcher) throws InterruptedException {
|
||||
// wait until we have a final reduce result
|
||||
// logger.warn("waiting for final reduce");
|
||||
final ReduceType finalSum = reducer.waitForFinalReduce();
|
||||
|
||||
// now wait for the input provider thread to terminate
|
||||
// logger.warn("waiting on inputProducer");
|
||||
inputProducer.waitForDone();
|
||||
|
||||
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
|
||||
// logger.warn("waiting on map");
|
||||
runningMapJobSlots.acquire(bufferSize);
|
||||
runningMapJobSlots.release(bufferSize);
|
||||
|
||||
// We are done with everything so shutdown the errorWatcher thread
|
||||
errorWatcher.shutdown();
|
||||
|
||||
// everything is finally shutdown, return the final reduce value
|
||||
return finalSum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we continue to submit jobs given the number of jobs already submitted and the
|
||||
* number of read items in inputProducer?
|
||||
*
|
||||
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
|
||||
* of jobs we've enqueued isn't the number of read elements. This means that in
|
||||
* some cases we submit more jobs than total read elements (cannot know because of
|
||||
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
|
||||
*
|
||||
* @param nJobsSubmitted
|
||||
* @param inputProducer
|
||||
* @return
|
||||
*/
|
||||
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
|
||||
final int nReadItems = inputProducer.getNumInputValues();
|
||||
return nReadItems == -1 || nJobsSubmitted < nReadItems;
|
||||
}
|
||||
|
||||
/**
|
||||
* A thread that periodically wakes up and checks to see if an error has occurred, and if
|
||||
* so shuts down the NanoScheduler (via shutdownNow()), sending an InterruptedException to
|
||||
* the main thread, which throws the error in the errorTracker.
|
||||
*
|
||||
* The main thread should call shutdown() when its ready to return itself, which will cause
|
||||
* the run() method of this thread to abort in the next iteration. Uses a local latch to
|
||||
* cause the thread calling shutdown to block until the run() method exits.
|
||||
*/
|
||||
private class ErrorWatcherThread implements Runnable {
|
||||
boolean done = false;
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private boolean isDone() {
|
||||
return done;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown this ErrorWatcher, blocking until the run() method of this thread exits
|
||||
*
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void shutdown() throws InterruptedException {
|
||||
this.done = true;
|
||||
latch.await();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while ( ! isDone() ) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
handleErrorsIfOneOccurred();
|
||||
} catch (final InterruptedException ex) {
|
||||
break; // just exit
|
||||
}
|
||||
}
|
||||
|
||||
// free the latch so the shutdown thread starts up
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If an error has occurred in the tracker, shut down the executors and
|
||||
* throw the occur, otherwise do nothing.
|
||||
*/
|
||||
private void handleErrorsIfOneOccurred() {
|
||||
private void handleErrors() {
|
||||
if ( errorTracker.hasAnErrorOccurred() ) {
|
||||
masterExecutor.shutdownNow();
|
||||
mapExecutor.shutdownNow();
|
||||
inputExecutor.shutdownNow();
|
||||
errorWatchingExecutor.shutdownNow();
|
||||
errorTracker.throwErrorIfPending();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single map job, reading the next element from the input inputQueue
|
||||
* and after mapping runs reduce on as many elements as possible
|
||||
* MasterJob has the task to enqueue Map jobs and wait for the final reduce
|
||||
*
|
||||
* It must be run in a separate thread in order to properly handle errors that may occur
|
||||
* in the input, map, or reduce jobs without deadlocking.
|
||||
*
|
||||
* The result of this callable is the final reduce value for the input / map / reduce jobs
|
||||
*/
|
||||
private class MasterJob implements Callable<ReduceType> {
|
||||
final Iterator<InputType> inputReader;
|
||||
final NSMapFunction<InputType, MapType> map;
|
||||
final ReduceType initialValue;
|
||||
final NSReduceFunction<MapType, ReduceType> reduce;
|
||||
|
||||
private MasterJob(Iterator<InputType> inputReader, NSMapFunction<InputType, MapType> map, ReduceType initialValue, NSReduceFunction<MapType, ReduceType> reduce) {
|
||||
this.inputReader = inputReader;
|
||||
this.map = map;
|
||||
this.initialValue = initialValue;
|
||||
this.reduce = reduce;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReduceType call() {
|
||||
// a blocking queue that limits the number of input datum to the requested buffer size
|
||||
// note we need +1 because we continue to enqueue the lastObject
|
||||
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
|
||||
= new LinkedBlockingDeque<InputProducer<InputType>.InputValue>(bufferSize+1);
|
||||
|
||||
// Create the input producer and start it running
|
||||
final InputProducer<InputType> inputProducer =
|
||||
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
|
||||
inputExecutor.submit(inputProducer);
|
||||
|
||||
// a priority queue that stores up to bufferSize elements
|
||||
// produced by completed map jobs.
|
||||
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
|
||||
new PriorityBlockingQueue<MapResult<MapType>>();
|
||||
|
||||
final Reducer<MapType, ReduceType> reducer
|
||||
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
|
||||
|
||||
try {
|
||||
int nSubmittedJobs = 0;
|
||||
|
||||
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
|
||||
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
|
||||
runningMapJobSlots.acquire();
|
||||
|
||||
mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
|
||||
nSubmittedJobs++;
|
||||
}
|
||||
|
||||
// mark the last job id we've submitted so we now the id to wait for
|
||||
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
|
||||
reducer.setTotalJobCount(nSubmittedJobs);
|
||||
|
||||
// wait for all of the input and map threads to finish
|
||||
return waitForCompletion(inputProducer, reducer);
|
||||
} catch (Exception ex) {
|
||||
errorTracker.notifyOfError(ex);
|
||||
return initialValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the input thread and all map threads have completed running, and return the final reduce result
|
||||
*/
|
||||
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
|
||||
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
|
||||
// wait until we have a final reduce result
|
||||
// logger.warn("waiting for final reduce");
|
||||
final ReduceType finalSum = reducer.waitForFinalReduce();
|
||||
|
||||
// now wait for the input provider thread to terminate
|
||||
// logger.warn("waiting on inputProducer");
|
||||
inputProducer.waitForDone();
|
||||
|
||||
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
|
||||
// logger.warn("waiting on map");
|
||||
runningMapJobSlots.acquire(bufferSize);
|
||||
runningMapJobSlots.release(bufferSize);
|
||||
|
||||
// everything is finally shutdown, return the final reduce value
|
||||
return finalSum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we continue to submit jobs given the number of jobs already submitted and the
|
||||
* number of read items in inputProducer?
|
||||
*
|
||||
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
|
||||
* of jobs we've enqueued isn't the number of read elements. This means that in
|
||||
* some cases we submit more jobs than total read elements (cannot know because of
|
||||
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
|
||||
*
|
||||
* @param nJobsSubmitted
|
||||
* @param inputProducer
|
||||
* @return
|
||||
*/
|
||||
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
|
||||
final int nReadItems = inputProducer.getNumInputValues();
|
||||
return nReadItems == -1 || nJobsSubmitted < nReadItems;
|
||||
}
|
||||
}
|
||||
|
||||
private class MapReduceJob implements Runnable {
|
||||
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue;
|
||||
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue;
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import java.util.List;
|
|||
*/
|
||||
public class NanoSchedulerUnitTest extends BaseTest {
|
||||
private final static boolean debug = false;
|
||||
public static final int NANO_SCHEDULE_MAX_RUNTIME = 10000;
|
||||
public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000;
|
||||
|
||||
private static class Map2x implements NSMapFunction<Integer, Integer> {
|
||||
@Override public Integer apply(Integer input) { return input * 2; }
|
||||
|
|
@ -228,12 +228,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = NullPointerException.class, timeOut = 10000, invocationCount = 50)
|
||||
@Test(expectedExceptions = NullPointerException.class, timeOut = 10000)
|
||||
public void testInputErrorIsThrown_NPE() throws InterruptedException {
|
||||
executeTestErrorThrowingInput(new NullPointerException());
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000, invocationCount = 50)
|
||||
@Test(expectedExceptions = ReviewedStingException.class, timeOut = 10000)
|
||||
public void testInputErrorIsThrown_RSE() throws InterruptedException {
|
||||
executeTestErrorThrowingInput(new ReviewedStingException("test"));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue