Initial NanoScheduler with input producer thread

This commit is contained in:
Mark DePristo 2012-09-04 17:54:43 -04:00
parent 03dd470ec1
commit a997c99806
2 changed files with 86 additions and 26 deletions

View File

@ -5,6 +5,7 @@ import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.AutoFormattingTime;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator;
@ -74,7 +75,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
this.bufferSize = bufferSize;
this.nThreads = nThreads;
this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads);
this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1);
// start timing the time spent outside of the nanoScheduler
outsideSchedulerTimer.start();
@ -232,20 +233,31 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
debugPrint("Executing nanoScheduler");
ReduceType sum = initialValue;
while ( inputReader.hasNext() ) {
boolean done = false;
final BlockingQueue<InputDatum> inputQueue = new LinkedBlockingDeque<InputDatum>(bufferSize);
final InputProducer inputProducer = new InputProducer(inputReader, inputQueue);
inputProducer.start();
while ( ! done ) {
try {
// read in our input values
final List<InputType> inputs = readInputs(inputReader);
debugPrint("Enqueuing " + inputs.size() + " elements to map");
final Pair<List<InputType>, Boolean> readResults = readInputs(inputQueue);
final List<InputType> inputs = readResults.getFirst();
done = readResults.getSecond();
// send jobs for map
final Queue<Future<MapType>> mapQueue = submitMapJobs(map, executor, inputs);
if ( ! inputs.isEmpty() ) {
// send jobs for map
final Queue<Future<MapType>> mapQueue = submitMapJobs(map, executor, inputs);
// send off the reduce job, and block until we get at least one reduce result
sum = reduceSerial(reduce, mapQueue, sum);
debugPrint(" Done with cycle of map/reduce");
// send off the reduce job, and block until we get at least one reduce result
sum = reduceSerial(reduce, mapQueue, sum);
debugPrint(" Done with cycle of map/reduce");
if ( progressFunction != null ) progressFunction.progress(inputs.get(inputs.size()-1));
if ( progressFunction != null ) progressFunction.progress(inputs.get(inputs.size()-1));
} else {
// we must be done
if ( ! done ) throw new IllegalStateException("Inputs empty but not done");
}
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
} catch (ExecutionException ex) {
@ -267,9 +279,9 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
for ( final Future<MapType> future : mapQueue ) {
final MapType value = future.get(); // block until we get the values for this task
if ( TIME_CALLS) reduceTimer.restart();
if ( TIME_CALLS ) reduceTimer.restart();
sum = reduce.apply(value, sum);
if ( TIME_CALLS) reduceTimer.stop();
if ( TIME_CALLS ) reduceTimer.stop();
}
return sum;
@ -280,21 +292,68 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
*
* @return a queue of input read in, containing one or more values of InputType read in
*/
@Requires("inputReader.hasNext()")
@Ensures("!result.isEmpty()")
private List<InputType> readInputs(final Iterator<InputType> inputReader) {
@Requires("inputReader != null")
@Ensures("result != null")
private Pair<List<InputType>, Boolean> readInputs(final BlockingQueue<InputDatum> inputReader) throws InterruptedException {
int n = 0;
final List<InputType> inputs = new LinkedList<InputType>();
boolean done = false;
if ( TIME_CALLS) inputTimer.restart();
while ( inputReader.hasNext() && n < getBufferSize() ) {
final InputType input = inputReader.next();
inputs.add(input);
n++;
while ( ! done && n < getBufferSize() ) {
final InputDatum input = inputReader.take();
done = input.isLast();
if ( ! done ) {
inputs.add(input.datum);
n++;
}
}
if ( TIME_CALLS) inputTimer.stop();
return inputs;
return new Pair<List<InputType>, Boolean>(inputs, done);
}
private class InputProducer extends Thread {
final Iterator<InputType> inputReader;
final BlockingQueue<InputDatum> outputQueue;
public InputProducer(final Iterator<InputType> inputReader, final BlockingQueue<InputDatum> outputQueue) {
this.inputReader = inputReader;
this.outputQueue = outputQueue;
}
public void run() {
try {
while ( inputReader.hasNext() ) {
if ( TIME_CALLS ) inputTimer.restart();
final InputType input = inputReader.next();
if ( TIME_CALLS ) inputTimer.stop();
outputQueue.put(new InputDatum(input));
}
// add the EOF object so we know we are done
outputQueue.put(new InputDatum());
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
}
private class InputDatum {
final boolean isLast;
final InputType datum;
private InputDatum(final InputType datum) {
isLast = false;
this.datum = datum;
}
private InputDatum() {
isLast = true;
this.datum = null;
}
public boolean isLast() {
return isLast;
}
}
@Requires({"map != null", "! inputs.isEmpty()"})
@ -326,10 +385,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
}
@Override public MapType call() throws Exception {
if ( TIME_CALLS) mapTimer.restart();
if ( TIME_CALLS ) mapTimer.restart();
if ( debug ) debugPrint("\t\tmap " + input);
final MapType result = map.apply(input);
if ( TIME_CALLS) mapTimer.stop();
if ( TIME_CALLS ) mapTimer.stop();
return result;
}
}

View File

@ -126,7 +126,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
Assert.assertNotNull(sum);
Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly");
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected");
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
nanoScheduler.shutdown();
}
@ -168,6 +168,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(1000, Integer.valueOf(args[0]), 0, Integer.valueOf(args[1]));
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
nanoScheduler.setDebug(true);
final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());
System.out.printf("Sum = %d, expected =%d%n", sum, test.expectedResult);