From a997c99806b49c1ca0efdd4cc9c834df465e7b22 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 4 Sep 2012 17:54:43 -0400 Subject: [PATCH] Initial NanoScheduler with input producer thread --- .../utils/nanoScheduler/NanoScheduler.java | 109 ++++++++++++++---- .../nanoScheduler/NanoSchedulerUnitTest.java | 3 +- 2 files changed, 86 insertions(+), 26 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 61d4fdd01..4f9fedce3 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -5,6 +5,7 @@ import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.AutoFormattingTime; import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.collections.Pair; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; @@ -74,7 +75,7 @@ public class NanoScheduler { this.bufferSize = bufferSize; this.nThreads = nThreads; - this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads); + this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1); // start timing the time spent outside of the nanoScheduler outsideSchedulerTimer.start(); @@ -232,20 +233,31 @@ public class NanoScheduler { final NanoSchedulerReduceFunction reduce) { debugPrint("Executing nanoScheduler"); ReduceType sum = initialValue; - while ( inputReader.hasNext() ) { + boolean done = false; + + final BlockingQueue inputQueue = new LinkedBlockingDeque(bufferSize); + final InputProducer inputProducer = new InputProducer(inputReader, inputQueue); + inputProducer.start(); + + while ( ! done ) { try { - // read in our input values - final List inputs = readInputs(inputReader); - debugPrint("Enqueuing " + inputs.size() + " elements to map"); + final Pair, Boolean> readResults = readInputs(inputQueue); + final List inputs = readResults.getFirst(); + done = readResults.getSecond(); - // send jobs for map - final Queue> mapQueue = submitMapJobs(map, executor, inputs); + if ( ! inputs.isEmpty() ) { + // send jobs for map + final Queue> mapQueue = submitMapJobs(map, executor, inputs); - // send off the reduce job, and block until we get at least one reduce result - sum = reduceSerial(reduce, mapQueue, sum); - debugPrint(" Done with cycle of map/reduce"); + // send off the reduce job, and block until we get at least one reduce result + sum = reduceSerial(reduce, mapQueue, sum); + debugPrint(" Done with cycle of map/reduce"); - if ( progressFunction != null ) progressFunction.progress(inputs.get(inputs.size()-1)); + if ( progressFunction != null ) progressFunction.progress(inputs.get(inputs.size()-1)); + } else { + // we must be done + if ( ! done ) throw new IllegalStateException("Inputs empty but not done"); + } } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); } catch (ExecutionException ex) { @@ -267,9 +279,9 @@ public class NanoScheduler { for ( final Future future : mapQueue ) { final MapType value = future.get(); // block until we get the values for this task - if ( TIME_CALLS) reduceTimer.restart(); + if ( TIME_CALLS ) reduceTimer.restart(); sum = reduce.apply(value, sum); - if ( TIME_CALLS) reduceTimer.stop(); + if ( TIME_CALLS ) reduceTimer.stop(); } return sum; @@ -280,21 +292,68 @@ public class NanoScheduler { * * @return a queue of input read in, containing one or more values of InputType read in */ - @Requires("inputReader.hasNext()") - @Ensures("!result.isEmpty()") - private List readInputs(final Iterator inputReader) { + @Requires("inputReader != null") + @Ensures("result != null") + private Pair, Boolean> readInputs(final BlockingQueue inputReader) throws InterruptedException { int n = 0; final List inputs = new LinkedList(); + boolean done = false; - if ( TIME_CALLS) inputTimer.restart(); - while ( inputReader.hasNext() && n < getBufferSize() ) { - final InputType input = inputReader.next(); - inputs.add(input); - n++; + while ( ! done && n < getBufferSize() ) { + final InputDatum input = inputReader.take(); + done = input.isLast(); + if ( ! done ) { + inputs.add(input.datum); + n++; + } } - if ( TIME_CALLS) inputTimer.stop(); - return inputs; + return new Pair, Boolean>(inputs, done); + } + + private class InputProducer extends Thread { + final Iterator inputReader; + final BlockingQueue outputQueue; + + public InputProducer(final Iterator inputReader, final BlockingQueue outputQueue) { + this.inputReader = inputReader; + this.outputQueue = outputQueue; + } + + public void run() { + try { + while ( inputReader.hasNext() ) { + if ( TIME_CALLS ) inputTimer.restart(); + final InputType input = inputReader.next(); + if ( TIME_CALLS ) inputTimer.stop(); + outputQueue.put(new InputDatum(input)); + } + + // add the EOF object so we know we are done + outputQueue.put(new InputDatum()); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + } + + private class InputDatum { + final boolean isLast; + final InputType datum; + + private InputDatum(final InputType datum) { + isLast = false; + this.datum = datum; + } + + private InputDatum() { + isLast = true; + this.datum = null; + } + + public boolean isLast() { + return isLast; + } } @Requires({"map != null", "! inputs.isEmpty()"}) @@ -326,10 +385,10 @@ public class NanoScheduler { } @Override public MapType call() throws Exception { - if ( TIME_CALLS) mapTimer.restart(); + if ( TIME_CALLS ) mapTimer.restart(); if ( debug ) debugPrint("\t\tmap " + input); final MapType result = map.apply(input); - if ( TIME_CALLS) mapTimer.stop(); + if ( TIME_CALLS ) mapTimer.stop(); return result; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 3bd006ffe..ddfc3cecd 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -126,7 +126,7 @@ public class NanoSchedulerUnitTest extends BaseTest { Assert.assertNotNull(sum); Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); - Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected"); + Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks); nanoScheduler.shutdown(); } @@ -168,6 +168,7 @@ public class NanoSchedulerUnitTest extends BaseTest { final NanoSchedulerBasicTest test = new NanoSchedulerBasicTest(1000, Integer.valueOf(args[0]), 0, Integer.valueOf(args[1])); final NanoScheduler nanoScheduler = new NanoScheduler(test.bufferSize, test.nThreads); + nanoScheduler.setDebug(true); final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); System.out.printf("Sum = %d, expected =%d%n", sum, test.expectedResult);