From 7605c6bcc418512b267764927d56ee66f7631144 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 13 Sep 2012 12:50:03 -0400 Subject: [PATCH] Done GSA-515 Nanoscheduler / GSA-557 V3 nanoScheduler algorithm -- V3 + V4 algorithm for NanoScheduler. The newer version uses 1 dedicated input thread and n - 1 map/reduce threads. These MapReduceJobs perform map and a greedy reduce. The main thread's only job is to shuttle inputs from the input producer thread, enqueueing MapReduce jobs for each one. We manage the number of map jobs now via a Semaphore instead of a BlockingQueue of fixed size. -- This new algorithm should consume N00% CPU power for -nct N value. -- Also a cleaner implementation in general -- Vastly expanded unit tests -- Deleted FutureValue and ReduceThread --- .../sting/gatk/executive/MicroScheduler.java | 1 - ...ingQueueValue.java => EOFMarkedValue.java} | 26 +-- .../utils/nanoScheduler/FutureValue.java | 45 ---- .../utils/nanoScheduler/InputProducer.java | 113 +++++++-- .../sting/utils/nanoScheduler/MapResult.java | 13 +- .../utils/nanoScheduler/NanoScheduler.java | 214 +++++++----------- .../sting/utils/nanoScheduler/Reducer.java | 110 ++++++++- .../utils/nanoScheduler/ReducerThread.java | 66 ------ .../nanoScheduler/InputProducerUnitTest.java | 176 ++++++++------ .../nanoScheduler/NanoSchedulerUnitTest.java | 9 +- .../nanoScheduler/ReducerThreadUnitTest.java | 95 -------- .../utils/nanoScheduler/ReducerUnitTest.java | 206 +++++++++++++++++ 12 files changed, 629 insertions(+), 445 deletions(-) rename public/java/src/org/broadinstitute/sting/utils/nanoScheduler/{BlockingQueueValue.java => EOFMarkedValue.java} (69%) delete mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java delete mode 100644 public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java create mode 100644 public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index a78ab4375..73cde3d3c 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -190,7 +190,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { allCreatedTraversalEngines.add(traversalEngine); availableTraversalEngines.add(traversalEngine); } - logger.info("Creating " + threadAllocation.getNumDataThreads() + " traversal engines"); // Create our progress meter this.progressMeter = new ProgressMeter(progressLogFile, diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java similarity index 69% rename from public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java rename to public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java index 2daa6c9eb..eddf5de3c 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java @@ -3,7 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Invariant; /** - * Wrapper to hold data for a blocking queue, distinguishing an EOF marker from a real object + * Wrapper to hold data that distinguishing an special EOF marker from a real object * * The only way to tell in a consumer thread that a blocking queue has no more data ever * coming down the pipe is to pass in a "poison" or EOF object. This class provides @@ -14,13 +14,13 @@ import com.google.java.contract.Invariant; * BlockingQueue q * producer: * while ( x has items ) - * q.put(new BlockingQueueValue(x)) - * q.put(new BlockingQueueValue()) + * q.put(new EOFMarkedValue(x)) + * q.put(new EOFMarkedValue()) * * Consumer: * while ( true ) * value = q.take() - * if ( value.isLast() ) + * if ( value.isEOFMarker() ) * break * else * do something useful with value @@ -30,8 +30,8 @@ import com.google.java.contract.Invariant; * Date: 9/6/12 * Time: 3:08 PM */ -@Invariant("! isLast || value == null") -class BlockingQueueValue { +@Invariant("! isEOFMarker() || value == null") +class EOFMarkedValue { /** * True if this is the EOF marker object */ @@ -43,18 +43,18 @@ class BlockingQueueValue { final private T value; /** - * Create a new BlockingQueueValue containing a real value, where last is false + * Create a new EOFMarkedValue containing a real value, where last is false * @param value */ - BlockingQueueValue(final T value) { + EOFMarkedValue(final T value) { isLast = false; this.value = value; } /** - * Create a new BlockingQueueValue that is the last item + * Create a new EOFMarkedValue that is the last item */ - BlockingQueueValue() { + EOFMarkedValue() { isLast = true; this.value = null; } @@ -64,18 +64,18 @@ class BlockingQueueValue { * * @return true if so, else false */ - public boolean isLast() { + public boolean isEOFMarker() { return isLast; } /** - * Get the value held by this BlockingQueueValue + * Get the value held by this EOFMarkedValue * * @return the value * @throws IllegalStateException if this is the last item */ public T getValue() { - if ( isLast() ) + if ( isEOFMarker() ) throw new IllegalStateException("Cannot get value for last object"); return value; } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java deleted file mode 100644 index 9508a15aa..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Create a future that simply returns a given value - * - * The only standard way to create a future in java is via the ExecutorService interface. - * If you have a data structure holding futures of value T, and you want to add a - * value to it for some reason (to add a EOF marker, for instance) you can use this - * class to create a dummy Future that simply returns a value. - * - * @author depristo - * @since 09/12 - */ -class FutureValue implements Future { - final V value; - - FutureValue(final V value) { - this.value = value; - } - - @Override public boolean cancel(boolean mayInterruptIfRunning) { - return true; - } - - @Override public boolean isCancelled() { - return false; - } - - @Override public boolean isDone() { - return true; - } - - @Override public V get() throws InterruptedException, ExecutionException { - return value; - } - - @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return get(); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index d669603c4..2e5003ff0 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,13 +1,16 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; /** - * Producer Thread that reads input values from an inputReads and puts them into a BlockingQueue + * Producer Thread that reads input values from an inputReads and puts them into an output queue */ -class InputProducer { +class InputProducer implements Runnable { /** * The iterator we are using to get data from */ @@ -18,38 +21,120 @@ class InputProducer { */ final SimpleTimer inputTimer; + /** + * Where we put our input values for consumption + */ + final BlockingQueue outputQueue; + + /** + * Have we read the last value from inputReader? + * + * Must be a local variable, as inputReader.hasNext() can actually end up doing a lot + * of work, and the method getNElementsInInputStream() is supposed to be called not in the + * thread executing the reading of values but in the thread enqueuing results + */ + boolean readLastValue = false; + + int nRead = 0; + + /** + * A latch used to block threads that want to start up only when all of the values + * in inputReader have been read by the thread executing run() + */ + final CountDownLatch latch = new CountDownLatch(1); + public InputProducer(final Iterator inputReader, - final SimpleTimer inputTimer) { + final SimpleTimer inputTimer, + final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); + if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; this.inputTimer = inputTimer; + this.outputQueue = outputQueue; } - public synchronized boolean hasNextNow() { - return inputReader.hasNext(); + /** + * Returns the number of elements in the input stream, AFTER we've read all of the values. + * If we haven't read them all yet, returns -1 + * + * @return the total number of elements in input stream, or -1 if some are still to be read + */ + public synchronized int getNElementsInInputStream() { + return readLastValue ? nRead : -1; } - public synchronized InputValue next() { + /** + * Read the next item from the input stream, if possible + * + * If the inputReader has values, returns them, otherwise return null. + * + * This method is synchronized, as it manipulates local state accessed across multiple threads. + * + * @return the next input stream value, or null if the stream contains no more elements + * @throws InterruptedException + */ + private synchronized InputType readNextItem() throws InterruptedException { inputTimer.restart(); - - final InputValue v; - if ( inputReader.hasNext() ) { - v = new InputValue(inputReader.next()); + if ( ! inputReader.hasNext() ) { + // we are done, mark ourselves as such and return null + readLastValue = true; + inputTimer.stop(); + return null; } else { - v = new InputValue(); + // get the next value, and return it + final InputType input = inputReader.next(); + inputTimer.stop(); + nRead++; + return input; } + } - inputTimer.stop(); + /** + * Run this input producer, looping over all items in the input reader and + * enqueueing them as InputValues into the outputQueue. After the + * end of the stream has been encountered, any threads waiting because + * they called waitForDone() will be freed. + */ + public void run() { + try { + while ( true ) { + final InputType value = readNextItem(); + if ( value == null ) { + // add the EOF marker + // add the EOF object so our consumer knows we are done in all inputs + outputQueue.put(new InputValue()); - return v; + break; + } else { + // add the actual value + outputQueue.put(new InputValue(value)); + } + } + + latch.countDown(); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + + /** + * Block until all of the items have been read from inputReader. + * + * Note that this call doesn't actually read anything. You have to submit a thread + * to actually execute run() directly. + * + * @throws InterruptedException + */ + public void waitForDone() throws InterruptedException { + latch.await(); } /** * Helper class that contains a read value suitable for EOF marking in a BlockingQueue */ - class InputValue extends BlockingQueueValue { + class InputValue extends EOFMarkedValue { private InputValue(InputType datum) { super(datum); } private InputValue() { } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java index 10d1f2b2e..83d671560 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -4,7 +4,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; * Holds the results of a map job suitable for producer/consumer threading * via a BlockingQueue */ -class MapResult extends BlockingQueueValue implements Comparable> { +class MapResult extends EOFMarkedValue implements Comparable> { final int jobID; /** @@ -40,8 +40,19 @@ class MapResult extends BlockingQueueValue implements Comparab return jobID; } + /** + * Compare these MapResults in order of JobID. + * + * @param o + * @return + */ @Override public int compareTo(MapResult o) { return Integer.valueOf(jobID).compareTo(o.getJobID()); } + + @Override + public String toString() { + return "[MapResult id=" + jobID + "]"; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 2676f567b..08f29d155 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -1,5 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; @@ -43,14 +45,11 @@ public class NanoScheduler { private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean LOG_MAP_TIMES = false; - private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; - final int bufferSize; final int nThreads; final ExecutorService inputExecutor; - final ExecutorService reduceExecutor; final ExecutorService mapExecutor; - final Semaphore mapQueueSizeManagingSemaphone; + final Semaphore runningMapJobSlots; boolean shutdown = false; boolean debug = false; @@ -84,14 +83,13 @@ public class NanoScheduler { this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = this.reduceExecutor = null; - mapQueueSizeManagingSemaphone = null; + this.mapExecutor = this.inputExecutor = null; + runningMapJobSlots = null; } else { - this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d")); - mapQueueSizeManagingSemaphone = new Semaphore(this.bufferSize); + this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); + runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); - this.reduceExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-reduce-thread-%d")); } // start timing the time spent outside of the nanoScheduler @@ -102,7 +100,7 @@ public class NanoScheduler { * The number of parallel map threads in use with this NanoScheduler * @return */ -// @Ensures("result > 0") + @Ensures("result > 0") public int getnThreads() { return nThreads; } @@ -111,7 +109,7 @@ public class NanoScheduler { * The input buffer size used by this NanoScheduler * @return */ -// @Ensures("result > 0") + @Ensures("result > 0") public int getBufferSize() { return this.bufferSize; } @@ -130,7 +128,6 @@ public class NanoScheduler { if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); - shutdownExecutor("reduceExecutor", reduceExecutor); } shutdown = true; @@ -156,8 +153,8 @@ public class NanoScheduler { * @param name a string name for error messages for the executorService we are shutting down * @param executorService the executorService to shut down */ -// @Requires({"name != null", "executorService != null"}) -// @Ensures("executorService.isShutdown()") + @Requires({"name != null", "executorService != null"}) + @Ensures("executorService.isShutdown()") private void shutdownExecutor(final String name, final ExecutorService executorService) { if ( executorService.isShutdown() || executorService.isTerminated() ) throw new IllegalStateException("Executor service " + name + " is already shut down!"); @@ -187,8 +184,8 @@ public class NanoScheduler { * @param format the format argument suitable for String.format * @param args the arguments for String.format */ -// @Requires("format != null") - private void debugPrint(final String format, Object ... args) { + @Requires("format != null") + protected void debugPrint(final String format, Object ... args) { if ( isDebug() ) logger.warn("Thread " + Thread.currentThread().getId() + ":" + String.format(format, args)); } @@ -262,7 +259,7 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ -// @Requires({"inputReader != null", "map != null", "reduce != null"}) + @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeSingleThreaded(final Iterator inputReader, final NSMapFunction map, final ReduceType initialValue, @@ -305,69 +302,22 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ -// @Requires({"inputReader != null", "map != null", "reduce != null"}) + @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeMultiThreaded(final Iterator inputReader, final NSMapFunction map, final ReduceType initialValue, final NSReduceFunction reduce) { -// debugPrint("Executing nanoScheduler"); -// -// // a blocking queue that limits the number of input datum to the requested buffer size -// final BlockingQueue.InputValue> inputQueue -// = new LinkedBlockingDeque.InputValue>(bufferSize); -// -// // a priority queue that stores up to bufferSize elements -// // produced by completed map jobs. -// final BlockingQueue>> mapResultQueue = -// new LinkedBlockingDeque>>(bufferSize); -// -// // Start running the input reader thread -// inputExecutor.submit(new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue)); -// -// // Start running the reducer thread -// final ReducerThread reducer -// = new ReducerThread(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue); -// final Future reduceResult = reduceExecutor.submit(reducer); -// -// try { -// int numJobs = 0; -// -// while ( true ) { -// // block on input -// final InputProducer.InputValue inputEnqueueWrapped = inputQueue.take(); -// -// if ( ! inputEnqueueWrapped.isLast() ) { -// // get the object itself -// final InputType input = inputEnqueueWrapped.getValue(); -// -// // the next map call has jobID + 1 -// numJobs++; -// -// // send job for map via the completion service -// final CallableMap doMap = new CallableMap(map, numJobs, input); -// final Future> mapJob = mapExecutor.submit(doMap); -// mapResultQueue.put(mapJob); -// -// debugPrint(" Done with cycle of map/reduce"); -// -// if ( numJobs % bufferSize == 0 && progressFunction != null ) -// progressFunction.progress(input); -// } else { -// mapResultQueue.put(new FutureValue>(new MapResult())); -// return reduceResult.get(); // wait for our result of reduce -// } -// } -// } catch (InterruptedException ex) { -// throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); -// } -// } - debugPrint("Executing nanoScheduler"); + // a blocking queue that limits the number of input datum to the requested buffer size + // note we need +1 because we continue to enqueue the lastObject + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(bufferSize+1); + + // Create the input producer and start it running final InputProducer inputProducer = - new InputProducer(inputReader, myNSRuntimeProfile.inputTimer); + new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue); + inputExecutor.submit(inputProducer); // a priority queue that stores up to bufferSize elements // produced by completed map jobs. @@ -378,40 +328,79 @@ public class NanoScheduler { = new Reducer(reduce, myNSRuntimeProfile.reduceTimer, initialValue); try { - int jobID = -1; + int nSubmittedJobs = 0; + int jobID = -1; // must be -1 as setLastJobID special cases -1 to indicate no jobs were enqueued + + while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { + // acquire a slot to run a map job. Blocks if too many jobs are enqueued + runningMapJobSlots.acquire(); - while ( inputProducer.hasNextNow() ) { - mapQueueSizeManagingSemaphone.acquire(); jobID++; - debugPrint("Submitting job with id %d", jobID); - mapExecutor.submit(new ReadMapReduceJob(jobID, inputProducer, mapResultQueue, map, reducer)); + mapExecutor.submit(new MapReduceJob(jobID, inputQueue, mapResultQueue, map, reducer)); + nSubmittedJobs++; } - debugPrint("Setting last job id %d", jobID); - reducer.setLastJobID(jobID); // the last actually submitted job id is jobID - 1 + // mark the last job id we've submitted so we now the id to wait for + reducer.setLastJobID(jobID); - return reducer.waitForFinalReduce(); + // wait for all of the input and map threads to finish + return waitForCompletion(inputProducer, reducer); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); } } - private class ReadMapReduceJob implements Runnable { + /** + * Wait until the input thread and all map threads have completed running, and return the final reduce result + */ + private ReduceType waitForCompletion(final InputProducer inputProducer, + final Reducer reducer) throws InterruptedException { + // wait until we have a final reduce result + final ReduceType finalSum = reducer.waitForFinalReduce(); + + // now wait for the input provider thread to terminate + inputProducer.waitForDone(); + + // wait for all the map threads to finish by acquiring and then releasing all map job semaphores + runningMapJobSlots.acquire(this.bufferSize); + runningMapJobSlots.release(this.bufferSize); + + // everything is finally shutdown, return the final reduce value + return finalSum; + } + + /** + * Should we continue to submit jobs given the number of jobs already submitted and the + * number of read items in inputProducer? + * + * We continue to submit jobs while inputProducer hasn't reached EOF or the number + * of jobs we've enqueued isn't the number of read elements. This means that in + * some cases we submit more jobs than total read elements (cannot know because of + * multi-threading) so map jobs must handle the case where getNext() returns EOF. + * + * @param nJobsSubmitted + * @param inputProducer + * @return + */ + private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { + final int nReadItems = inputProducer.getNElementsInInputStream(); + return nReadItems == -1 || nJobsSubmitted < nReadItems; + } + + private class MapReduceJob implements Runnable { final int jobID; - final InputProducer inputProducer; - final BlockingQueue> mapResultQueue; + final BlockingQueue.InputValue> inputQueue; + final PriorityBlockingQueue> mapResultQueue; final NSMapFunction map; final Reducer reducer; - private ReadMapReduceJob(final int jobID, - final InputProducer inputProducer, - final BlockingQueue> mapResultQueue, - final NSMapFunction map, - final Reducer reducer) { + private MapReduceJob(final int jobID, + BlockingQueue.InputValue> inputQueue, + final PriorityBlockingQueue> mapResultQueue, + final NSMapFunction map, + final Reducer reducer) { this.jobID = jobID; - this.inputProducer = inputProducer; + this.inputQueue = inputQueue; this.mapResultQueue = mapResultQueue; this.map = map; this.reducer = reducer; @@ -420,11 +409,11 @@ public class NanoScheduler { @Override public void run() { try { - debugPrint("Running ReadMapReduceJob " + jobID); - final InputProducer.InputValue inputWrapper = inputProducer.next(); + //debugPrint("Running MapReduceJob " + jobID); + final InputProducer.InputValue inputWrapper = inputQueue.take(); final MapResult result; - if ( ! inputWrapper.isLast() ) { + if ( ! inputWrapper.isEOFMarker() ) { // just skip doing anything if we don't have work to do, which is possible // because we don't necessarily know how much input there is when we queue // up our jobs @@ -443,50 +432,21 @@ public class NanoScheduler { if ( jobID % bufferSize == 0 && progressFunction != null ) progressFunction.progress(input); } else { + // push back the EOF marker so other waiting threads can read it + inputQueue.add(inputWrapper); // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer result = new MapResult(jobID); } mapResultQueue.put(result); - debugPrint(" Pushed MapResult with job id %d", jobID); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); - debugPrint(" reduced %d values", nReduced); // we finished a map job, release the job queue semaphore - mapQueueSizeManagingSemaphone.release(); + runningMapJobSlots.release(); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); } } } - -// /** -// * A simple callable version of the map function for use with the executor pool -// */ -// private class CallableMap implements Callable> { -// final int id; -// final InputType input; -// final NSMapFunction map; -// -// @Requires({"map != null"}) -// private CallableMap(final NSMapFunction map, -// final int id, -// final InputType input) { -// this.id = id; -// this.input = input; -// this.map = map; -// } -// -// @Override -// public MapResult call() { -// if ( debug ) debugPrint("\t\tmap " + input); -// myNSRuntimeProfile.mapTimer.restart(); -// final MapType result = map.apply(input); -// myNSRuntimeProfile.mapTimer.stop(); -// return new MapResult(result, id); -// } -// } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 0923b0952..4fc34e2c9 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -1,12 +1,25 @@ package org.broadinstitute.sting.utils.nanoScheduler; +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; import org.broadinstitute.sting.utils.SimpleTimer; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.PriorityBlockingQueue; /** - * Thread that runs the reduce of the map/reduce. + * Reducer supporting two-threaded reduce of the map/reduce. + * + * The first thread, using the reduceAsMuchAsPossible function, actually reduces the data + * as it arrives in the blockingQueue. + * + * The second thread, using the waitForFinalReduce, can block on this data structure + * until that all jobs have arrived and been reduced. + * + * The key function for communication here is setLastJobID(), which the thread that submits + * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the + * Reduce that ID of the last job that's been submitted. When a job arrives with that + * ID, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. * * This thread reads from mapResultsQueue until the poison EOF object arrives. At each * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the @@ -14,15 +27,34 @@ import java.util.concurrent.CountDownLatch; * until the map result Future has a value. */ class Reducer { - final CountDownLatch countDownLatch = new CountDownLatch(1); + private final static int UNSET_LAST_JOB_ID = -2; + final CountDownLatch countDownLatch = new CountDownLatch(1); final NSReduceFunction reduce; final SimpleTimer reduceTimer; + /** + * The sum of the reduce function applied to all MapResults. After this Reducer + * is done sum contains the final reduce result. + */ ReduceType sum; - int lastJobID = -2; // not yet set + + int lastJobID = UNSET_LAST_JOB_ID; // not yet set + + /** + * The jobID of the last job we've seen + */ int prevJobID = -1; // no jobs observed + /** + * Create a new Reducer that will apply the reduce function with initialSum value + * to values via reduceAsMuchAsPossible, timing the reduce function call costs with + * reduceTimer + * + * @param reduce the reduce function to apply + * @param reduceTimer the timer to time the reduce function call + * @param initialSum the initial reduce sum + */ public Reducer(final NSReduceFunction reduce, final SimpleTimer reduceTimer, final ReduceType initialSum) { @@ -34,15 +66,36 @@ class Reducer { this.sum = initialSum; } - private synchronized boolean readyToReduce(final BlockingQueue> mapResultQueue) { + /** + * Should we reduce the next value in the mapResultQueue? + * + * + * @param mapResultQueue the queue of map results + * @return true if we should reduce + */ + @Requires("mapResultQueue != null") + private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue> mapResultQueue) { final MapResult nextMapResult = mapResultQueue.peek(); return nextMapResult != null && nextMapResult.getJobID() == prevJobID + 1; } - public synchronized int reduceAsMuchAsPossible(final BlockingQueue> mapResultQueue) throws InterruptedException { + /** + * Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed + * + * As much as possible is defined as all of the MapResults in the queue are in order starting from the + * lastJobID we reduced previously, up to the either the queue being empty or where the next MapResult + * doesn't have JobID == prevJobID + 1. + * + * @param mapResultQueue a queue of MapResults in jobID order + * @return the number of reduces run, from 0 > + * @throws InterruptedException + */ + @Ensures("result >= 0") + public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) throws InterruptedException { + if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); int nReduces = 0; - while ( readyToReduce(mapResultQueue) ) { + while ( reduceNextValueInQueue(mapResultQueue) ) { final MapResult result = mapResultQueue.take(); if ( result.getJobID() < prevJobID ) @@ -51,7 +104,7 @@ class Reducer { prevJobID = result.getJobID(); - if ( ! result.isLast() ) { // TODO -- rename to isEmpty + if ( ! result.isEOFMarker() ) { nReduces++; // apply reduce, keeping track of sum @@ -67,6 +120,11 @@ class Reducer { return nReduces; } + /** + * release the latch if appropriate + * + * Appropriate means we've seen the last job, or there's only a single job id + */ private synchronized void maybeReleaseLatch() { if ( lastJobID != -2 && (prevJobID == lastJobID || lastJobID == -1) ) { // either we've already seen the last one prevJobID == lastJobID or @@ -75,12 +133,46 @@ class Reducer { } } + /** + * For testing. + * @return + */ + protected synchronized boolean latchIsReleased() { + return countDownLatch.getCount() == 0; + } + + /** + * Key function: tell this class the job ID of the last job that will provide data in the mapResultsQueue + * + * The last job id controls when we free threads blocked on waitForFinalReduce. When we see the job + * with this last job id, those threads are released. + * + * Until this function is called, those thread will block forever. The last job id has a few constraints. + * First, it must be >= -1. -1 indicates that in fact no jobs will ever be submitted (i.e., there's no + * data coming) so the latch should be opened immediately. If it's >= 0, we will wait until + * a job with that id arrives. + * + * Note that we throw an IllegalStateException if this function is called twice. + * + * @param lastJobID int >= -1 indicating the MapResult job id of the last job that will enqueue results into our queue + */ public synchronized void setLastJobID(final int lastJobID) { - if ( lastJobID < -1 ) throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID); + if ( lastJobID < -1 ) + throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID); + if ( this.lastJobID != UNSET_LAST_JOB_ID ) + throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); + this.lastJobID = lastJobID; maybeReleaseLatch(); } + /** + * Block until the last job has submitted its MapResult to our queue, and we've reduced it, and + * return the reduce result resulting from applying reduce(...) to all MapResult elements. + * + * @return the total reduce result across all jobs + * @throws InterruptedException + */ public ReduceType waitForFinalReduce() throws InterruptedException { countDownLatch.await(); return sum; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java deleted file mode 100644 index dcdba3490..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import org.broadinstitute.sting.utils.SimpleTimer; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -/** - * Thread that runs the reduce of the map/reduce. - * - * This thread reads from mapResultsQueue until the poison EOF object arrives. At each - * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the - * queue waits until the mapResultQueue has a value to take. Then, it gets and waits - * until the map result Future has a value. - */ -class ReducerThread implements Callable { - final NSReduceFunction reduce; - final SimpleTimer reduceTimer; - final BlockingQueue>> mapResultQueue; - - ReduceType sum; - int lastJobID = -1; - - public ReducerThread(final NSReduceFunction reduce, - final SimpleTimer reduceTimer, - final ReduceType sum, - final BlockingQueue>> mapResultQueue) { - if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); - if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); - if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); - - this.reduce = reduce; - this.reduceTimer = reduceTimer; - this.sum = sum; - this.mapResultQueue = mapResultQueue; - } - - public ReduceType call() { - try { - while ( true ) { - final MapResult result = mapResultQueue.take().get(); - if ( result.isLast() ) { - // we are done, just return sum - return sum; - } - else if ( result.getJobID() < lastJobID ) { - // make sure the map results are coming in order - throw new IllegalStateException("BUG: last jobID " + lastJobID + " > current jobID " + result.getJobID()); - } else { - lastJobID = result.getJobID(); - // apply reduce, keeping track of sum - reduceTimer.restart(); - sum = reduce.apply(result.getValue(), sum); - reduceTimer.stop(); - } - } - } catch (ExecutionException ex) { - throw new ReviewedStingException("got execution exception", ex); - } catch (InterruptedException ex) { - throw new ReviewedStingException("got execution exception", ex); - } - } -} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 2b90b582f..829fc2f12 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,72 +1,104 @@ -//package org.broadinstitute.sting.utils.nanoScheduler; -// -//import org.broadinstitute.sting.BaseTest; -//import org.broadinstitute.sting.utils.SimpleTimer; -//import org.testng.Assert; -//import org.testng.annotations.DataProvider; -//import org.testng.annotations.Test; -// -//import java.util.ArrayList; -//import java.util.Arrays; -//import java.util.List; -//import java.util.concurrent.ExecutorService; -//import java.util.concurrent.Executors; -//import java.util.concurrent.LinkedBlockingDeque; -// -///** -// * UnitTests for the InputProducer -// * -// * User: depristo -// * Date: 8/24/12 -// * Time: 11:25 AM -// * To change this template use File | Settings | File Templates. -// */ -//public class InputProducerUnitTest extends BaseTest { -// @DataProvider(name = "InputProducerTest") -// public Object[][] createInputProducerTest() { -// List tests = new ArrayList(); -// -// for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { -// for ( final int queueSize : Arrays.asList(1, 10, 100) ) { -// tests.add(new Object[]{ nElements, queueSize }); -// } -// } -// -// return tests.toArray(new Object[][]{}); -// } -// -// @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) -// public void testInputProducer(final int nElements, final int queueSize) throws InterruptedException { -// final List elements = new ArrayList(nElements); -// for ( int i = 0; i < nElements; i++ ) elements.add(i); -// -// final LinkedBlockingDeque.InputValue> readQueue = -// new LinkedBlockingDeque.InputValue>(queueSize); -// -// final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); -// -// final ExecutorService es = Executors.newSingleThreadExecutor(); -// es.submit(ip); -// -// int lastValue = -1; -// int nRead = 0; -// while ( true ) { -// final int observedQueueSize = readQueue.size(); -// Assert.assertTrue(observedQueueSize <= queueSize, -// "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); -// -// final InputProducer.InputValue value = readQueue.take(); -// if ( value.isLast() ) { -// Assert.assertEquals(nRead, nElements, "Number of input values " + nRead + " not all that are expected " + nElements); -// Assert.assertEquals(readQueue.size(), 0, "Last queue element found but queue contains more values!"); -// break; -// } else { -// Assert.assertTrue(lastValue < value.getValue(), "Read values coming out of order!"); -// final int expected = lastValue + 1; -// Assert.assertEquals((int)value.getValue(), expected, "Value observed " + value.getValue() + " not equal to the expected value " + expected); -// nRead++; -// lastValue = value.getValue(); -// } -// } -// } -//} +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; + +/** +* UnitTests for the InputProducer +* +* User: depristo +* Date: 8/24/12 +* Time: 11:25 AM +* To change this template use File | Settings | File Templates. +*/ +public class InputProducerUnitTest extends BaseTest { + @DataProvider(name = "InputProducerTest") + public Object[][] createInputProducerTest() { + List tests = new ArrayList(); + + for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { + for ( final int queueSize : Arrays.asList(1, 10, 100) ) { + tests.add(new Object[]{ nElements, queueSize }); + } + } + + return tests.toArray(new Object[][]{}); + } + + @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testInputProducer(final int nElements, final int queueSize) throws InterruptedException { + final List elements = new ArrayList(nElements); + for ( int i = 0; i < nElements; i++ ) elements.add(i); + + final LinkedBlockingDeque.InputValue> readQueue = + new LinkedBlockingDeque.InputValue>(queueSize); + + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + + final ExecutorService es = Executors.newSingleThreadExecutor(); + + Assert.assertEquals(ip.getNElementsInInputStream(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); + + es.submit(ip); + + int lastValue = -1; + int nRead = 0; + while ( true ) { + final int nTotalElements = ip.getNElementsInInputStream(); + final int observedQueueSize = readQueue.size(); + Assert.assertTrue(observedQueueSize <= queueSize, + "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); + + if ( nRead + observedQueueSize < nElements ) + Assert.assertEquals(nTotalElements, -1, "getNElementsInInputStream should have returned -1 with not all elements read"); + // note, cannot test else case because elements input could have emptied between calls + + final InputProducer.InputValue value = readQueue.take(); + if ( value.isEOFMarker() ) { + Assert.assertEquals(nRead, nElements, "Number of input values " + nRead + " not all that are expected " + nElements); + Assert.assertEquals(readQueue.size(), 0, "Last queue element found but queue contains more values!"); + break; + } else { + Assert.assertTrue(lastValue < value.getValue(), "Read values coming out of order!"); + final int expected = lastValue + 1; + Assert.assertEquals((int)value.getValue(), expected, "Value observed " + value.getValue() + " not equal to the expected value " + expected); + nRead++; + lastValue = value.getValue(); + } + } + + Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "Wrong number of total elements getNElementsInInputStream"); + } + + @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testInputProducerLocking(final int nElements, final int queueSize) throws InterruptedException { + final List elements = new ArrayList(nElements); + for ( int i = 0; i < nElements; i++ ) elements.add(i); + + final LinkedBlockingDeque.InputValue> readQueue = + new LinkedBlockingDeque.InputValue>(); + + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + + final ExecutorService es = Executors.newSingleThreadExecutor(); + es.submit(ip); + + ip.waitForDone(); + + Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet"); + Assert.assertEquals(readQueue.size(), nElements + 1, "readQueue should have had all elements read into it"); + } + + // TODO -- add a test that really tests ip.getNElementsInInputStream + // Create an iterator, containing a semaphore, that allows us to step through the reader +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 008c11f0a..eede30077 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -21,6 +21,7 @@ import java.util.List; * To change this template use File | Settings | File Templates. */ public class NanoSchedulerUnitTest extends BaseTest { + private final static boolean debug = false; public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000; private static class Map2x implements NSMapFunction { @@ -102,10 +103,14 @@ public class NanoSchedulerUnitTest extends BaseTest { public ReduceSum makeReduce() { return new ReduceSum(); } public NanoScheduler makeScheduler() { + final NanoScheduler nano; if ( bufferSize == -1 ) - return new NanoScheduler(nThreads); + nano = new NanoScheduler(nThreads); else - return new NanoScheduler(bufferSize, nThreads); + nano = new NanoScheduler(bufferSize, nThreads); + + nano.setDebug(debug); + return nano; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java deleted file mode 100644 index 08771e9ec..000000000 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import org.broadinstitute.sting.BaseTest; -import org.broadinstitute.sting.utils.SimpleTimer; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.*; - -/** - * UnitTests for the InputProducer - * - * User: depristo - * Date: 8/24/12 - * Time: 11:25 AM - * To change this template use File | Settings | File Templates. - */ -public class ReducerThreadUnitTest extends BaseTest { - @DataProvider(name = "ReducerThreadTest") - public Object[][] createReducerThreadTest() { - List tests = new ArrayList(); - - for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { - tests.add(new Object[]{ nElements }); - } - - return tests.toArray(new Object[][]{}); - } - - @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) - public void testReducerThreadTest(final int nElements) throws Exception { - List values = new ArrayList(nElements); - List jobIDs = new ArrayList(nElements); - for ( int i = 0; i < nElements; i++ ) { - values.add(i); - jobIDs.add(i); - } - - runTests(values, jobIDs); - } - - @Test(enabled = true, timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME, expectedExceptions = ExecutionException.class) - public void testReducerThreadTestByJobOrder() throws Exception { - runTests(Arrays.asList(0, 1, 2), Arrays.asList(1, 3, 2)); - } - - private void runTests( final List mapValues, final List jobIDs) throws Exception { - final LinkedBlockingDeque>> mapResultsQueue = - new LinkedBlockingDeque>>(mapValues.size()+1); - - for ( int i = 0; i < mapValues.size(); i++ ) { - final int value = mapValues.get(i); - final int jobID = jobIDs.get(i); - final MapResult mapResult = new MapResult(value, jobID); - mapResultsQueue.add(new FutureValue>(mapResult)); - } - mapResultsQueue.add(new FutureValue>(new MapResult())); - - final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); - final ReducerThread thread - = new ReducerThread(reduce, new SimpleTimer(), 0, mapResultsQueue); - - final ExecutorService es = Executors.newSingleThreadExecutor(); - final Future value = es.submit(thread); - value.get(); - - Assert.assertEquals(reduce.nRead, mapValues.size()); - } - - public class ReduceSumTest implements NSReduceFunction { - final LinkedBlockingDeque>> mapResultsQueue; - int nRead = 0; - int lastValue = -1; - - public ReduceSumTest(LinkedBlockingDeque>> mapResultsQueue) { - this.mapResultsQueue = mapResultsQueue; - } - - @Override public Integer apply(Integer one, Integer sum) { - Assert.assertTrue(lastValue < one, "Reduce came in out of order. Prev " + lastValue + " cur " + one); - - Assert.assertTrue(lastValue < one, "Read values coming out of order!"); - final int expected = lastValue + 1; - Assert.assertEquals((int)one, expected, "Value observed " + one + " not equal to the expected value " + expected); - nRead++; - lastValue = expected; - - return one + sum; - } - } -} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java new file mode 100644 index 000000000..d5136abbe --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -0,0 +1,206 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.Utils; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * UnitTests for Reducer + * + * User: depristo + * Date: 8/24/12 + * Time: 11:25 AM + * To change this template use File | Settings | File Templates. + */ +public class ReducerUnitTest extends BaseTest { + @DataProvider(name = "ReducerThreadTest") + public Object[][] createReducerThreadTest() { + List tests = new ArrayList(); + + for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) { + for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) { + for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { + if ( groupSize < nElements ) { + for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { + tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize }); + } + } + } + + for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { + if ( groupSize < nElements ) { + tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize }); + } + } + } + } + + return tests.toArray(new Object[][]{}); + } + + private static class ListOfJobs extends ArrayList> { + private ListOfJobs(Collection> c) { + super(c); + } + + @Override + public String toString() { + if ( size() < 10 ) + return super.toString(); + else + return "JobList of " + size(); + } + } + + private static List> makeJobs(final int nElements) { + List> jobs = new ArrayList>(nElements); + for ( int i = 0; i < nElements; i++ ) { + jobs.add(new MapResult(i, i)); + } + return jobs; + } + + private int expectedSum(final List> jobs) { + int sum = 0; + for ( final MapResult job : jobs ) + sum += job.getValue(); + return sum; + } + + @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testReducerThread(final List> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception { + runTests(jobs, setJobIDAtStart, groupSize); + } + + private void runTests( final List> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception { + if ( groupSize == -1 ) + groupSize = allJobs.size(); + + int lastJobID = -1; + for ( final MapResult job : allJobs ) { + lastJobID = Math.max(job.getJobID(), lastJobID); + } + + final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); + + final List>> jobGroups = Utils.groupList(allJobs, groupSize); + final ReduceSumTest reduce = new ReduceSumTest(); + final Reducer reducer = new Reducer(reduce, new SimpleTimer(), 0); + + final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs)); + final ExecutorService es = Executors.newSingleThreadExecutor(); + es.submit(waitingThread); + + int nJobsSubmitted = 0; + int jobGroupCount = 0; + final int lastJobGroupCount = jobGroups.size() - 1; + setJobIDAtStart = setJobIDAtStart && groupSize == 1; + + for ( final List> jobs : jobGroups ) { + //logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs"); + for ( final MapResult job : jobs ) { + mapResultsQueue.add(job); + nJobsSubmitted++; + } + + if ( jobGroupCount == lastJobGroupCount ) { + mapResultsQueue.add(new MapResult()); + nJobsSubmitted++; + } + + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start"); + + if ( jobGroupCount == 0 && lastJobID != -1 && setJobIDAtStart ) { + // only can do the setJobID if jobs cannot be submitted out of order + reducer.setLastJobID(lastJobID); + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); + } + + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue); + Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted"); + + if ( setJobIDAtStart ) { + final boolean submittedLastJob = jobGroupCount == lastJobGroupCount; + Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob, + "When last job is set, latch should only be released if the last job has been submitted"); + } else { + Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release"); + } + + jobGroupCount++; + } + + if ( setJobIDAtStart ) + Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); + else { + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); + if ( lastJobID != -1 ) { + reducer.setLastJobID(lastJobID); + Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); + } + } + + Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue"); + es.shutdown(); + es.awaitTermination(1, TimeUnit.HOURS); + } + + @Test(expectedExceptions = IllegalStateException.class) + private void runSettingJobIDTwice() throws Exception { + final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); + + final Reducer reducer = new Reducer(new ReduceSumTest(), new SimpleTimer(), 0); + + reducer.setLastJobID(10); + reducer.setLastJobID(15); + } + + public class ReduceSumTest implements NSReduceFunction { + int nRead = 0; + int lastValue = -1; + + @Override public Integer apply(Integer one, Integer sum) { + Assert.assertTrue(lastValue < one, "Reduce came in out of order. Prev " + lastValue + " cur " + one); + + Assert.assertTrue(lastValue < one, "Read values coming out of order!"); + final int expected = lastValue + 1; + Assert.assertEquals((int)one, expected, "Value observed " + one + " not equal to the expected value " + expected); + nRead++; + lastValue = expected; + + return one + sum; + } + } + + final static class TestWaitingForFinalReduce implements Runnable { + final Reducer reducer; + final int expectedSum; + + TestWaitingForFinalReduce(Reducer reducer, final int expectedSum) { + this.reducer = reducer; + this.expectedSum = expectedSum; + } + + @Override + public void run() { + try { + final int observedSum = reducer.waitForFinalReduce(); + Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); + } catch ( InterruptedException ex ) { + Assert.fail("Got interrupted"); + } + } + } +} \ No newline at end of file