NanoScheduler uses ExecutorService to run input reader thread

This commit is contained in:
Mark DePristo 2012-09-04 18:07:08 -04:00
parent 71d9ebcb0d
commit 1e55475adc
1 changed files with 11 additions and 9 deletions

View File

@ -51,7 +51,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int bufferSize; final int bufferSize;
final int nThreads; final int nThreads;
final ExecutorService executor; final ExecutorService inputExecutor;
final ExecutorService mapExecutor;
boolean shutdown = false; boolean shutdown = false;
boolean debug = false; boolean debug = false;
@ -75,7 +76,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.nThreads = nThreads; this.nThreads = nThreads;
this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1); this.mapExecutor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1);
this.inputExecutor = Executors.newSingleThreadExecutor();
// start timing the time spent outside of the nanoScheduler // start timing the time spent outside of the nanoScheduler
outsideSchedulerTimer.start(); outsideSchedulerTimer.start();
@ -107,10 +109,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
public void shutdown() { public void shutdown() {
outsideSchedulerTimer.stop(); outsideSchedulerTimer.stop();
if ( executor != null ) { if ( mapExecutor != null ) {
final List<Runnable> remaining = executor.shutdownNow(); final List<Runnable> remaining = mapExecutor.shutdownNow();
if ( ! remaining.isEmpty() ) if ( ! remaining.isEmpty() )
throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); throw new IllegalStateException("Remaining tasks found in the mapExecutor, unexpected behavior!");
} }
shutdown = true; shutdown = true;
@ -236,8 +238,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
boolean done = false; boolean done = false;
final BlockingQueue<InputDatum> inputQueue = new LinkedBlockingDeque<InputDatum>(bufferSize); final BlockingQueue<InputDatum> inputQueue = new LinkedBlockingDeque<InputDatum>(bufferSize);
final InputProducer inputProducer = new InputProducer(inputReader, inputQueue);
inputProducer.start(); inputExecutor.submit(new InputProducer(inputReader, inputQueue));
while ( ! done ) { while ( ! done ) {
try { try {
@ -247,7 +249,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( ! inputs.isEmpty() ) { if ( ! inputs.isEmpty() ) {
// send jobs for map // send jobs for map
final Queue<Future<MapType>> mapQueue = submitMapJobs(map, executor, inputs); final Queue<Future<MapType>> mapQueue = submitMapJobs(map, mapExecutor, inputs);
// send off the reduce job, and block until we get at least one reduce result // send off the reduce job, and block until we get at least one reduce result
sum = reduceSerial(reduce, mapQueue, sum); sum = reduceSerial(reduce, mapQueue, sum);
@ -311,7 +313,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
return new Pair<List<InputType>, Boolean>(inputs, done); return new Pair<List<InputType>, Boolean>(inputs, done);
} }
private class InputProducer extends Thread { private class InputProducer implements Runnable {
final Iterator<InputType> inputReader; final Iterator<InputType> inputReader;
final BlockingQueue<InputDatum> outputQueue; final BlockingQueue<InputDatum> outputQueue;