diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index a78ab4375..73cde3d3c 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -190,7 +190,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { allCreatedTraversalEngines.add(traversalEngine); availableTraversalEngines.add(traversalEngine); } - logger.info("Creating " + threadAllocation.getNumDataThreads() + " traversal engines"); // Create our progress meter this.progressMeter = new ProgressMeter(progressLogFile, diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java similarity index 69% rename from public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java rename to public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java index 2daa6c9eb..eddf5de3c 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java @@ -3,7 +3,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Invariant; /** - * Wrapper to hold data for a blocking queue, distinguishing an EOF marker from a real object + * Wrapper to hold data that distinguishing an special EOF marker from a real object * * The only way to tell in a consumer thread that a blocking queue has no more data ever * coming down the pipe is to pass in a "poison" or EOF object. This class provides @@ -14,13 +14,13 @@ import com.google.java.contract.Invariant; * BlockingQueue q * producer: * while ( x has items ) - * q.put(new BlockingQueueValue(x)) - * q.put(new BlockingQueueValue()) + * q.put(new EOFMarkedValue(x)) + * q.put(new EOFMarkedValue()) * * Consumer: * while ( true ) * value = q.take() - * if ( value.isLast() ) + * if ( value.isEOFMarker() ) * break * else * do something useful with value @@ -30,8 +30,8 @@ import com.google.java.contract.Invariant; * Date: 9/6/12 * Time: 3:08 PM */ -@Invariant("! isLast || value == null") -class BlockingQueueValue { +@Invariant("! isEOFMarker() || value == null") +class EOFMarkedValue { /** * True if this is the EOF marker object */ @@ -43,18 +43,18 @@ class BlockingQueueValue { final private T value; /** - * Create a new BlockingQueueValue containing a real value, where last is false + * Create a new EOFMarkedValue containing a real value, where last is false * @param value */ - BlockingQueueValue(final T value) { + EOFMarkedValue(final T value) { isLast = false; this.value = value; } /** - * Create a new BlockingQueueValue that is the last item + * Create a new EOFMarkedValue that is the last item */ - BlockingQueueValue() { + EOFMarkedValue() { isLast = true; this.value = null; } @@ -64,18 +64,18 @@ class BlockingQueueValue { * * @return true if so, else false */ - public boolean isLast() { + public boolean isEOFMarker() { return isLast; } /** - * Get the value held by this BlockingQueueValue + * Get the value held by this EOFMarkedValue * * @return the value * @throws IllegalStateException if this is the last item */ public T getValue() { - if ( isLast() ) + if ( isEOFMarker() ) throw new IllegalStateException("Cannot get value for last object"); return value; } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java deleted file mode 100644 index 9508a15aa..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Create a future that simply returns a given value - * - * The only standard way to create a future in java is via the ExecutorService interface. - * If you have a data structure holding futures of value T, and you want to add a - * value to it for some reason (to add a EOF marker, for instance) you can use this - * class to create a dummy Future that simply returns a value. - * - * @author depristo - * @since 09/12 - */ -class FutureValue implements Future { - final V value; - - FutureValue(final V value) { - this.value = value; - } - - @Override public boolean cancel(boolean mayInterruptIfRunning) { - return true; - } - - @Override public boolean isCancelled() { - return false; - } - - @Override public boolean isDone() { - return true; - } - - @Override public V get() throws InterruptedException, ExecutionException { - return value; - } - - @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return get(); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index d669603c4..2e5003ff0 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,13 +1,16 @@ package org.broadinstitute.sting.utils.nanoScheduler; import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; /** - * Producer Thread that reads input values from an inputReads and puts them into a BlockingQueue + * Producer Thread that reads input values from an inputReads and puts them into an output queue */ -class InputProducer { +class InputProducer implements Runnable { /** * The iterator we are using to get data from */ @@ -18,38 +21,120 @@ class InputProducer { */ final SimpleTimer inputTimer; + /** + * Where we put our input values for consumption + */ + final BlockingQueue outputQueue; + + /** + * Have we read the last value from inputReader? + * + * Must be a local variable, as inputReader.hasNext() can actually end up doing a lot + * of work, and the method getNElementsInInputStream() is supposed to be called not in the + * thread executing the reading of values but in the thread enqueuing results + */ + boolean readLastValue = false; + + int nRead = 0; + + /** + * A latch used to block threads that want to start up only when all of the values + * in inputReader have been read by the thread executing run() + */ + final CountDownLatch latch = new CountDownLatch(1); + public InputProducer(final Iterator inputReader, - final SimpleTimer inputTimer) { + final SimpleTimer inputTimer, + final BlockingQueue outputQueue) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null"); + if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); this.inputReader = inputReader; this.inputTimer = inputTimer; + this.outputQueue = outputQueue; } - public synchronized boolean hasNextNow() { - return inputReader.hasNext(); + /** + * Returns the number of elements in the input stream, AFTER we've read all of the values. + * If we haven't read them all yet, returns -1 + * + * @return the total number of elements in input stream, or -1 if some are still to be read + */ + public synchronized int getNElementsInInputStream() { + return readLastValue ? nRead : -1; } - public synchronized InputValue next() { + /** + * Read the next item from the input stream, if possible + * + * If the inputReader has values, returns them, otherwise return null. + * + * This method is synchronized, as it manipulates local state accessed across multiple threads. + * + * @return the next input stream value, or null if the stream contains no more elements + * @throws InterruptedException + */ + private synchronized InputType readNextItem() throws InterruptedException { inputTimer.restart(); - - final InputValue v; - if ( inputReader.hasNext() ) { - v = new InputValue(inputReader.next()); + if ( ! inputReader.hasNext() ) { + // we are done, mark ourselves as such and return null + readLastValue = true; + inputTimer.stop(); + return null; } else { - v = new InputValue(); + // get the next value, and return it + final InputType input = inputReader.next(); + inputTimer.stop(); + nRead++; + return input; } + } - inputTimer.stop(); + /** + * Run this input producer, looping over all items in the input reader and + * enqueueing them as InputValues into the outputQueue. After the + * end of the stream has been encountered, any threads waiting because + * they called waitForDone() will be freed. + */ + public void run() { + try { + while ( true ) { + final InputType value = readNextItem(); + if ( value == null ) { + // add the EOF marker + // add the EOF object so our consumer knows we are done in all inputs + outputQueue.put(new InputValue()); - return v; + break; + } else { + // add the actual value + outputQueue.put(new InputValue(value)); + } + } + + latch.countDown(); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + + /** + * Block until all of the items have been read from inputReader. + * + * Note that this call doesn't actually read anything. You have to submit a thread + * to actually execute run() directly. + * + * @throws InterruptedException + */ + public void waitForDone() throws InterruptedException { + latch.await(); } /** * Helper class that contains a read value suitable for EOF marking in a BlockingQueue */ - class InputValue extends BlockingQueueValue { + class InputValue extends EOFMarkedValue { private InputValue(InputType datum) { super(datum); } private InputValue() { } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java index 10d1f2b2e..83d671560 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -4,7 +4,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; * Holds the results of a map job suitable for producer/consumer threading * via a BlockingQueue */ -class MapResult extends BlockingQueueValue implements Comparable> { +class MapResult extends EOFMarkedValue implements Comparable> { final int jobID; /** @@ -40,8 +40,19 @@ class MapResult extends BlockingQueueValue implements Comparab return jobID; } + /** + * Compare these MapResults in order of JobID. + * + * @param o + * @return + */ @Override public int compareTo(MapResult o) { return Integer.valueOf(jobID).compareTo(o.getJobID()); } + + @Override + public String toString() { + return "[MapResult id=" + jobID + "]"; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 2676f567b..08f29d155 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -1,5 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.threading.NamedThreadFactory; @@ -43,14 +45,11 @@ public class NanoScheduler { private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; private final static boolean LOG_MAP_TIMES = false; - private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; - final int bufferSize; final int nThreads; final ExecutorService inputExecutor; - final ExecutorService reduceExecutor; final ExecutorService mapExecutor; - final Semaphore mapQueueSizeManagingSemaphone; + final Semaphore runningMapJobSlots; boolean shutdown = false; boolean debug = false; @@ -84,14 +83,13 @@ public class NanoScheduler { this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = this.reduceExecutor = null; - mapQueueSizeManagingSemaphone = null; + this.mapExecutor = this.inputExecutor = null; + runningMapJobSlots = null; } else { - this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d")); - mapQueueSizeManagingSemaphone = new Semaphore(this.bufferSize); + this.mapExecutor = Executors.newFixedThreadPool(nThreads - 1, new NamedThreadFactory("NS-map-thread-%d")); + runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); - this.reduceExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-reduce-thread-%d")); } // start timing the time spent outside of the nanoScheduler @@ -102,7 +100,7 @@ public class NanoScheduler { * The number of parallel map threads in use with this NanoScheduler * @return */ -// @Ensures("result > 0") + @Ensures("result > 0") public int getnThreads() { return nThreads; } @@ -111,7 +109,7 @@ public class NanoScheduler { * The input buffer size used by this NanoScheduler * @return */ -// @Ensures("result > 0") + @Ensures("result > 0") public int getBufferSize() { return this.bufferSize; } @@ -130,7 +128,6 @@ public class NanoScheduler { if ( nThreads > 1 ) { shutdownExecutor("inputExecutor", inputExecutor); shutdownExecutor("mapExecutor", mapExecutor); - shutdownExecutor("reduceExecutor", reduceExecutor); } shutdown = true; @@ -156,8 +153,8 @@ public class NanoScheduler { * @param name a string name for error messages for the executorService we are shutting down * @param executorService the executorService to shut down */ -// @Requires({"name != null", "executorService != null"}) -// @Ensures("executorService.isShutdown()") + @Requires({"name != null", "executorService != null"}) + @Ensures("executorService.isShutdown()") private void shutdownExecutor(final String name, final ExecutorService executorService) { if ( executorService.isShutdown() || executorService.isTerminated() ) throw new IllegalStateException("Executor service " + name + " is already shut down!"); @@ -187,8 +184,8 @@ public class NanoScheduler { * @param format the format argument suitable for String.format * @param args the arguments for String.format */ -// @Requires("format != null") - private void debugPrint(final String format, Object ... args) { + @Requires("format != null") + protected void debugPrint(final String format, Object ... args) { if ( isDebug() ) logger.warn("Thread " + Thread.currentThread().getId() + ":" + String.format(format, args)); } @@ -262,7 +259,7 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ -// @Requires({"inputReader != null", "map != null", "reduce != null"}) + @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeSingleThreaded(final Iterator inputReader, final NSMapFunction map, final ReduceType initialValue, @@ -305,69 +302,22 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ -// @Requires({"inputReader != null", "map != null", "reduce != null"}) + @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeMultiThreaded(final Iterator inputReader, final NSMapFunction map, final ReduceType initialValue, final NSReduceFunction reduce) { -// debugPrint("Executing nanoScheduler"); -// -// // a blocking queue that limits the number of input datum to the requested buffer size -// final BlockingQueue.InputValue> inputQueue -// = new LinkedBlockingDeque.InputValue>(bufferSize); -// -// // a priority queue that stores up to bufferSize elements -// // produced by completed map jobs. -// final BlockingQueue>> mapResultQueue = -// new LinkedBlockingDeque>>(bufferSize); -// -// // Start running the input reader thread -// inputExecutor.submit(new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue)); -// -// // Start running the reducer thread -// final ReducerThread reducer -// = new ReducerThread(reduce, myNSRuntimeProfile.reduceTimer, initialValue, mapResultQueue); -// final Future reduceResult = reduceExecutor.submit(reducer); -// -// try { -// int numJobs = 0; -// -// while ( true ) { -// // block on input -// final InputProducer.InputValue inputEnqueueWrapped = inputQueue.take(); -// -// if ( ! inputEnqueueWrapped.isLast() ) { -// // get the object itself -// final InputType input = inputEnqueueWrapped.getValue(); -// -// // the next map call has jobID + 1 -// numJobs++; -// -// // send job for map via the completion service -// final CallableMap doMap = new CallableMap(map, numJobs, input); -// final Future> mapJob = mapExecutor.submit(doMap); -// mapResultQueue.put(mapJob); -// -// debugPrint(" Done with cycle of map/reduce"); -// -// if ( numJobs % bufferSize == 0 && progressFunction != null ) -// progressFunction.progress(input); -// } else { -// mapResultQueue.put(new FutureValue>(new MapResult())); -// return reduceResult.get(); // wait for our result of reduce -// } -// } -// } catch (InterruptedException ex) { -// throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); -// } -// } - debugPrint("Executing nanoScheduler"); + // a blocking queue that limits the number of input datum to the requested buffer size + // note we need +1 because we continue to enqueue the lastObject + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(bufferSize+1); + + // Create the input producer and start it running final InputProducer inputProducer = - new InputProducer(inputReader, myNSRuntimeProfile.inputTimer); + new InputProducer(inputReader, myNSRuntimeProfile.inputTimer, inputQueue); + inputExecutor.submit(inputProducer); // a priority queue that stores up to bufferSize elements // produced by completed map jobs. @@ -378,40 +328,79 @@ public class NanoScheduler { = new Reducer(reduce, myNSRuntimeProfile.reduceTimer, initialValue); try { - int jobID = -1; + int nSubmittedJobs = 0; + int jobID = -1; // must be -1 as setLastJobID special cases -1 to indicate no jobs were enqueued + + while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { + // acquire a slot to run a map job. Blocks if too many jobs are enqueued + runningMapJobSlots.acquire(); - while ( inputProducer.hasNextNow() ) { - mapQueueSizeManagingSemaphone.acquire(); jobID++; - debugPrint("Submitting job with id %d", jobID); - mapExecutor.submit(new ReadMapReduceJob(jobID, inputProducer, mapResultQueue, map, reducer)); + mapExecutor.submit(new MapReduceJob(jobID, inputQueue, mapResultQueue, map, reducer)); + nSubmittedJobs++; } - debugPrint("Setting last job id %d", jobID); - reducer.setLastJobID(jobID); // the last actually submitted job id is jobID - 1 + // mark the last job id we've submitted so we now the id to wait for + reducer.setLastJobID(jobID); - return reducer.waitForFinalReduce(); + // wait for all of the input and map threads to finish + return waitForCompletion(inputProducer, reducer); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); } } - private class ReadMapReduceJob implements Runnable { + /** + * Wait until the input thread and all map threads have completed running, and return the final reduce result + */ + private ReduceType waitForCompletion(final InputProducer inputProducer, + final Reducer reducer) throws InterruptedException { + // wait until we have a final reduce result + final ReduceType finalSum = reducer.waitForFinalReduce(); + + // now wait for the input provider thread to terminate + inputProducer.waitForDone(); + + // wait for all the map threads to finish by acquiring and then releasing all map job semaphores + runningMapJobSlots.acquire(this.bufferSize); + runningMapJobSlots.release(this.bufferSize); + + // everything is finally shutdown, return the final reduce value + return finalSum; + } + + /** + * Should we continue to submit jobs given the number of jobs already submitted and the + * number of read items in inputProducer? + * + * We continue to submit jobs while inputProducer hasn't reached EOF or the number + * of jobs we've enqueued isn't the number of read elements. This means that in + * some cases we submit more jobs than total read elements (cannot know because of + * multi-threading) so map jobs must handle the case where getNext() returns EOF. + * + * @param nJobsSubmitted + * @param inputProducer + * @return + */ + private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { + final int nReadItems = inputProducer.getNElementsInInputStream(); + return nReadItems == -1 || nJobsSubmitted < nReadItems; + } + + private class MapReduceJob implements Runnable { final int jobID; - final InputProducer inputProducer; - final BlockingQueue> mapResultQueue; + final BlockingQueue.InputValue> inputQueue; + final PriorityBlockingQueue> mapResultQueue; final NSMapFunction map; final Reducer reducer; - private ReadMapReduceJob(final int jobID, - final InputProducer inputProducer, - final BlockingQueue> mapResultQueue, - final NSMapFunction map, - final Reducer reducer) { + private MapReduceJob(final int jobID, + BlockingQueue.InputValue> inputQueue, + final PriorityBlockingQueue> mapResultQueue, + final NSMapFunction map, + final Reducer reducer) { this.jobID = jobID; - this.inputProducer = inputProducer; + this.inputQueue = inputQueue; this.mapResultQueue = mapResultQueue; this.map = map; this.reducer = reducer; @@ -420,11 +409,11 @@ public class NanoScheduler { @Override public void run() { try { - debugPrint("Running ReadMapReduceJob " + jobID); - final InputProducer.InputValue inputWrapper = inputProducer.next(); + //debugPrint("Running MapReduceJob " + jobID); + final InputProducer.InputValue inputWrapper = inputQueue.take(); final MapResult result; - if ( ! inputWrapper.isLast() ) { + if ( ! inputWrapper.isEOFMarker() ) { // just skip doing anything if we don't have work to do, which is possible // because we don't necessarily know how much input there is when we queue // up our jobs @@ -443,50 +432,21 @@ public class NanoScheduler { if ( jobID % bufferSize == 0 && progressFunction != null ) progressFunction.progress(input); } else { + // push back the EOF marker so other waiting threads can read it + inputQueue.add(inputWrapper); // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer result = new MapResult(jobID); } mapResultQueue.put(result); - debugPrint(" Pushed MapResult with job id %d", jobID); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); - debugPrint(" reduced %d values", nReduced); // we finished a map job, release the job queue semaphore - mapQueueSizeManagingSemaphone.release(); + runningMapJobSlots.release(); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); -// } catch (ExecutionException ex) { -// throw new ReviewedStingException("got execution exception", ex); } } } - -// /** -// * A simple callable version of the map function for use with the executor pool -// */ -// private class CallableMap implements Callable> { -// final int id; -// final InputType input; -// final NSMapFunction map; -// -// @Requires({"map != null"}) -// private CallableMap(final NSMapFunction map, -// final int id, -// final InputType input) { -// this.id = id; -// this.input = input; -// this.map = map; -// } -// -// @Override -// public MapResult call() { -// if ( debug ) debugPrint("\t\tmap " + input); -// myNSRuntimeProfile.mapTimer.restart(); -// final MapType result = map.apply(input); -// myNSRuntimeProfile.mapTimer.stop(); -// return new MapResult(result, id); -// } -// } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 0923b0952..4fc34e2c9 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -1,12 +1,25 @@ package org.broadinstitute.sting.utils.nanoScheduler; +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; import org.broadinstitute.sting.utils.SimpleTimer; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.PriorityBlockingQueue; /** - * Thread that runs the reduce of the map/reduce. + * Reducer supporting two-threaded reduce of the map/reduce. + * + * The first thread, using the reduceAsMuchAsPossible function, actually reduces the data + * as it arrives in the blockingQueue. + * + * The second thread, using the waitForFinalReduce, can block on this data structure + * until that all jobs have arrived and been reduced. + * + * The key function for communication here is setLastJobID(), which the thread that submits + * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the + * Reduce that ID of the last job that's been submitted. When a job arrives with that + * ID, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. * * This thread reads from mapResultsQueue until the poison EOF object arrives. At each * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the @@ -14,15 +27,34 @@ import java.util.concurrent.CountDownLatch; * until the map result Future has a value. */ class Reducer { - final CountDownLatch countDownLatch = new CountDownLatch(1); + private final static int UNSET_LAST_JOB_ID = -2; + final CountDownLatch countDownLatch = new CountDownLatch(1); final NSReduceFunction reduce; final SimpleTimer reduceTimer; + /** + * The sum of the reduce function applied to all MapResults. After this Reducer + * is done sum contains the final reduce result. + */ ReduceType sum; - int lastJobID = -2; // not yet set + + int lastJobID = UNSET_LAST_JOB_ID; // not yet set + + /** + * The jobID of the last job we've seen + */ int prevJobID = -1; // no jobs observed + /** + * Create a new Reducer that will apply the reduce function with initialSum value + * to values via reduceAsMuchAsPossible, timing the reduce function call costs with + * reduceTimer + * + * @param reduce the reduce function to apply + * @param reduceTimer the timer to time the reduce function call + * @param initialSum the initial reduce sum + */ public Reducer(final NSReduceFunction reduce, final SimpleTimer reduceTimer, final ReduceType initialSum) { @@ -34,15 +66,36 @@ class Reducer { this.sum = initialSum; } - private synchronized boolean readyToReduce(final BlockingQueue> mapResultQueue) { + /** + * Should we reduce the next value in the mapResultQueue? + * + * + * @param mapResultQueue the queue of map results + * @return true if we should reduce + */ + @Requires("mapResultQueue != null") + private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue> mapResultQueue) { final MapResult nextMapResult = mapResultQueue.peek(); return nextMapResult != null && nextMapResult.getJobID() == prevJobID + 1; } - public synchronized int reduceAsMuchAsPossible(final BlockingQueue> mapResultQueue) throws InterruptedException { + /** + * Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed + * + * As much as possible is defined as all of the MapResults in the queue are in order starting from the + * lastJobID we reduced previously, up to the either the queue being empty or where the next MapResult + * doesn't have JobID == prevJobID + 1. + * + * @param mapResultQueue a queue of MapResults in jobID order + * @return the number of reduces run, from 0 > + * @throws InterruptedException + */ + @Ensures("result >= 0") + public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) throws InterruptedException { + if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); int nReduces = 0; - while ( readyToReduce(mapResultQueue) ) { + while ( reduceNextValueInQueue(mapResultQueue) ) { final MapResult result = mapResultQueue.take(); if ( result.getJobID() < prevJobID ) @@ -51,7 +104,7 @@ class Reducer { prevJobID = result.getJobID(); - if ( ! result.isLast() ) { // TODO -- rename to isEmpty + if ( ! result.isEOFMarker() ) { nReduces++; // apply reduce, keeping track of sum @@ -67,6 +120,11 @@ class Reducer { return nReduces; } + /** + * release the latch if appropriate + * + * Appropriate means we've seen the last job, or there's only a single job id + */ private synchronized void maybeReleaseLatch() { if ( lastJobID != -2 && (prevJobID == lastJobID || lastJobID == -1) ) { // either we've already seen the last one prevJobID == lastJobID or @@ -75,12 +133,46 @@ class Reducer { } } + /** + * For testing. + * @return + */ + protected synchronized boolean latchIsReleased() { + return countDownLatch.getCount() == 0; + } + + /** + * Key function: tell this class the job ID of the last job that will provide data in the mapResultsQueue + * + * The last job id controls when we free threads blocked on waitForFinalReduce. When we see the job + * with this last job id, those threads are released. + * + * Until this function is called, those thread will block forever. The last job id has a few constraints. + * First, it must be >= -1. -1 indicates that in fact no jobs will ever be submitted (i.e., there's no + * data coming) so the latch should be opened immediately. If it's >= 0, we will wait until + * a job with that id arrives. + * + * Note that we throw an IllegalStateException if this function is called twice. + * + * @param lastJobID int >= -1 indicating the MapResult job id of the last job that will enqueue results into our queue + */ public synchronized void setLastJobID(final int lastJobID) { - if ( lastJobID < -1 ) throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID); + if ( lastJobID < -1 ) + throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID); + if ( this.lastJobID != UNSET_LAST_JOB_ID ) + throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); + this.lastJobID = lastJobID; maybeReleaseLatch(); } + /** + * Block until the last job has submitted its MapResult to our queue, and we've reduced it, and + * return the reduce result resulting from applying reduce(...) to all MapResult elements. + * + * @return the total reduce result across all jobs + * @throws InterruptedException + */ public ReduceType waitForFinalReduce() throws InterruptedException { countDownLatch.await(); return sum; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java deleted file mode 100644 index dcdba3490..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import org.broadinstitute.sting.utils.SimpleTimer; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -/** - * Thread that runs the reduce of the map/reduce. - * - * This thread reads from mapResultsQueue until the poison EOF object arrives. At each - * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the - * queue waits until the mapResultQueue has a value to take. Then, it gets and waits - * until the map result Future has a value. - */ -class ReducerThread implements Callable { - final NSReduceFunction reduce; - final SimpleTimer reduceTimer; - final BlockingQueue>> mapResultQueue; - - ReduceType sum; - int lastJobID = -1; - - public ReducerThread(final NSReduceFunction reduce, - final SimpleTimer reduceTimer, - final ReduceType sum, - final BlockingQueue>> mapResultQueue) { - if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); - if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null"); - if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); - - this.reduce = reduce; - this.reduceTimer = reduceTimer; - this.sum = sum; - this.mapResultQueue = mapResultQueue; - } - - public ReduceType call() { - try { - while ( true ) { - final MapResult result = mapResultQueue.take().get(); - if ( result.isLast() ) { - // we are done, just return sum - return sum; - } - else if ( result.getJobID() < lastJobID ) { - // make sure the map results are coming in order - throw new IllegalStateException("BUG: last jobID " + lastJobID + " > current jobID " + result.getJobID()); - } else { - lastJobID = result.getJobID(); - // apply reduce, keeping track of sum - reduceTimer.restart(); - sum = reduce.apply(result.getValue(), sum); - reduceTimer.stop(); - } - } - } catch (ExecutionException ex) { - throw new ReviewedStingException("got execution exception", ex); - } catch (InterruptedException ex) { - throw new ReviewedStingException("got execution exception", ex); - } - } -} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 2b90b582f..829fc2f12 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -1,72 +1,104 @@ -//package org.broadinstitute.sting.utils.nanoScheduler; -// -//import org.broadinstitute.sting.BaseTest; -//import org.broadinstitute.sting.utils.SimpleTimer; -//import org.testng.Assert; -//import org.testng.annotations.DataProvider; -//import org.testng.annotations.Test; -// -//import java.util.ArrayList; -//import java.util.Arrays; -//import java.util.List; -//import java.util.concurrent.ExecutorService; -//import java.util.concurrent.Executors; -//import java.util.concurrent.LinkedBlockingDeque; -// -///** -// * UnitTests for the InputProducer -// * -// * User: depristo -// * Date: 8/24/12 -// * Time: 11:25 AM -// * To change this template use File | Settings | File Templates. -// */ -//public class InputProducerUnitTest extends BaseTest { -// @DataProvider(name = "InputProducerTest") -// public Object[][] createInputProducerTest() { -// List tests = new ArrayList(); -// -// for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { -// for ( final int queueSize : Arrays.asList(1, 10, 100) ) { -// tests.add(new Object[]{ nElements, queueSize }); -// } -// } -// -// return tests.toArray(new Object[][]{}); -// } -// -// @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) -// public void testInputProducer(final int nElements, final int queueSize) throws InterruptedException { -// final List elements = new ArrayList(nElements); -// for ( int i = 0; i < nElements; i++ ) elements.add(i); -// -// final LinkedBlockingDeque.InputValue> readQueue = -// new LinkedBlockingDeque.InputValue>(queueSize); -// -// final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); -// -// final ExecutorService es = Executors.newSingleThreadExecutor(); -// es.submit(ip); -// -// int lastValue = -1; -// int nRead = 0; -// while ( true ) { -// final int observedQueueSize = readQueue.size(); -// Assert.assertTrue(observedQueueSize <= queueSize, -// "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); -// -// final InputProducer.InputValue value = readQueue.take(); -// if ( value.isLast() ) { -// Assert.assertEquals(nRead, nElements, "Number of input values " + nRead + " not all that are expected " + nElements); -// Assert.assertEquals(readQueue.size(), 0, "Last queue element found but queue contains more values!"); -// break; -// } else { -// Assert.assertTrue(lastValue < value.getValue(), "Read values coming out of order!"); -// final int expected = lastValue + 1; -// Assert.assertEquals((int)value.getValue(), expected, "Value observed " + value.getValue() + " not equal to the expected value " + expected); -// nRead++; -// lastValue = value.getValue(); -// } -// } -// } -//} +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; + +/** +* UnitTests for the InputProducer +* +* User: depristo +* Date: 8/24/12 +* Time: 11:25 AM +* To change this template use File | Settings | File Templates. +*/ +public class InputProducerUnitTest extends BaseTest { + @DataProvider(name = "InputProducerTest") + public Object[][] createInputProducerTest() { + List tests = new ArrayList(); + + for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { + for ( final int queueSize : Arrays.asList(1, 10, 100) ) { + tests.add(new Object[]{ nElements, queueSize }); + } + } + + return tests.toArray(new Object[][]{}); + } + + @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testInputProducer(final int nElements, final int queueSize) throws InterruptedException { + final List elements = new ArrayList(nElements); + for ( int i = 0; i < nElements; i++ ) elements.add(i); + + final LinkedBlockingDeque.InputValue> readQueue = + new LinkedBlockingDeque.InputValue>(queueSize); + + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + + final ExecutorService es = Executors.newSingleThreadExecutor(); + + Assert.assertEquals(ip.getNElementsInInputStream(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); + + es.submit(ip); + + int lastValue = -1; + int nRead = 0; + while ( true ) { + final int nTotalElements = ip.getNElementsInInputStream(); + final int observedQueueSize = readQueue.size(); + Assert.assertTrue(observedQueueSize <= queueSize, + "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); + + if ( nRead + observedQueueSize < nElements ) + Assert.assertEquals(nTotalElements, -1, "getNElementsInInputStream should have returned -1 with not all elements read"); + // note, cannot test else case because elements input could have emptied between calls + + final InputProducer.InputValue value = readQueue.take(); + if ( value.isEOFMarker() ) { + Assert.assertEquals(nRead, nElements, "Number of input values " + nRead + " not all that are expected " + nElements); + Assert.assertEquals(readQueue.size(), 0, "Last queue element found but queue contains more values!"); + break; + } else { + Assert.assertTrue(lastValue < value.getValue(), "Read values coming out of order!"); + final int expected = lastValue + 1; + Assert.assertEquals((int)value.getValue(), expected, "Value observed " + value.getValue() + " not equal to the expected value " + expected); + nRead++; + lastValue = value.getValue(); + } + } + + Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "Wrong number of total elements getNElementsInInputStream"); + } + + @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testInputProducerLocking(final int nElements, final int queueSize) throws InterruptedException { + final List elements = new ArrayList(nElements); + for ( int i = 0; i < nElements; i++ ) elements.add(i); + + final LinkedBlockingDeque.InputValue> readQueue = + new LinkedBlockingDeque.InputValue>(); + + final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); + + final ExecutorService es = Executors.newSingleThreadExecutor(); + es.submit(ip); + + ip.waitForDone(); + + Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet"); + Assert.assertEquals(readQueue.size(), nElements + 1, "readQueue should have had all elements read into it"); + } + + // TODO -- add a test that really tests ip.getNElementsInInputStream + // Create an iterator, containing a semaphore, that allows us to step through the reader +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 008c11f0a..eede30077 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -21,6 +21,7 @@ import java.util.List; * To change this template use File | Settings | File Templates. */ public class NanoSchedulerUnitTest extends BaseTest { + private final static boolean debug = false; public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000; private static class Map2x implements NSMapFunction { @@ -102,10 +103,14 @@ public class NanoSchedulerUnitTest extends BaseTest { public ReduceSum makeReduce() { return new ReduceSum(); } public NanoScheduler makeScheduler() { + final NanoScheduler nano; if ( bufferSize == -1 ) - return new NanoScheduler(nThreads); + nano = new NanoScheduler(nThreads); else - return new NanoScheduler(bufferSize, nThreads); + nano = new NanoScheduler(bufferSize, nThreads); + + nano.setDebug(debug); + return nano; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java deleted file mode 100644 index 08771e9ec..000000000 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -import org.broadinstitute.sting.BaseTest; -import org.broadinstitute.sting.utils.SimpleTimer; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.*; - -/** - * UnitTests for the InputProducer - * - * User: depristo - * Date: 8/24/12 - * Time: 11:25 AM - * To change this template use File | Settings | File Templates. - */ -public class ReducerThreadUnitTest extends BaseTest { - @DataProvider(name = "ReducerThreadTest") - public Object[][] createReducerThreadTest() { - List tests = new ArrayList(); - - for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { - tests.add(new Object[]{ nElements }); - } - - return tests.toArray(new Object[][]{}); - } - - @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) - public void testReducerThreadTest(final int nElements) throws Exception { - List values = new ArrayList(nElements); - List jobIDs = new ArrayList(nElements); - for ( int i = 0; i < nElements; i++ ) { - values.add(i); - jobIDs.add(i); - } - - runTests(values, jobIDs); - } - - @Test(enabled = true, timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME, expectedExceptions = ExecutionException.class) - public void testReducerThreadTestByJobOrder() throws Exception { - runTests(Arrays.asList(0, 1, 2), Arrays.asList(1, 3, 2)); - } - - private void runTests( final List mapValues, final List jobIDs) throws Exception { - final LinkedBlockingDeque>> mapResultsQueue = - new LinkedBlockingDeque>>(mapValues.size()+1); - - for ( int i = 0; i < mapValues.size(); i++ ) { - final int value = mapValues.get(i); - final int jobID = jobIDs.get(i); - final MapResult mapResult = new MapResult(value, jobID); - mapResultsQueue.add(new FutureValue>(mapResult)); - } - mapResultsQueue.add(new FutureValue>(new MapResult())); - - final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); - final ReducerThread thread - = new ReducerThread(reduce, new SimpleTimer(), 0, mapResultsQueue); - - final ExecutorService es = Executors.newSingleThreadExecutor(); - final Future value = es.submit(thread); - value.get(); - - Assert.assertEquals(reduce.nRead, mapValues.size()); - } - - public class ReduceSumTest implements NSReduceFunction { - final LinkedBlockingDeque>> mapResultsQueue; - int nRead = 0; - int lastValue = -1; - - public ReduceSumTest(LinkedBlockingDeque>> mapResultsQueue) { - this.mapResultsQueue = mapResultsQueue; - } - - @Override public Integer apply(Integer one, Integer sum) { - Assert.assertTrue(lastValue < one, "Reduce came in out of order. Prev " + lastValue + " cur " + one); - - Assert.assertTrue(lastValue < one, "Read values coming out of order!"); - final int expected = lastValue + 1; - Assert.assertEquals((int)one, expected, "Value observed " + one + " not equal to the expected value " + expected); - nRead++; - lastValue = expected; - - return one + sum; - } - } -} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java new file mode 100644 index 000000000..d5136abbe --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -0,0 +1,206 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.Utils; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * UnitTests for Reducer + * + * User: depristo + * Date: 8/24/12 + * Time: 11:25 AM + * To change this template use File | Settings | File Templates. + */ +public class ReducerUnitTest extends BaseTest { + @DataProvider(name = "ReducerThreadTest") + public Object[][] createReducerThreadTest() { + List tests = new ArrayList(); + + for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) { + for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) { + for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) { + if ( groupSize < nElements ) { + for ( final List> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) { + tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize }); + } + } + } + + for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) { + if ( groupSize < nElements ) { + tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize }); + } + } + } + } + + return tests.toArray(new Object[][]{}); + } + + private static class ListOfJobs extends ArrayList> { + private ListOfJobs(Collection> c) { + super(c); + } + + @Override + public String toString() { + if ( size() < 10 ) + return super.toString(); + else + return "JobList of " + size(); + } + } + + private static List> makeJobs(final int nElements) { + List> jobs = new ArrayList>(nElements); + for ( int i = 0; i < nElements; i++ ) { + jobs.add(new MapResult(i, i)); + } + return jobs; + } + + private int expectedSum(final List> jobs) { + int sum = 0; + for ( final MapResult job : jobs ) + sum += job.getValue(); + return sum; + } + + @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testReducerThread(final List> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception { + runTests(jobs, setJobIDAtStart, groupSize); + } + + private void runTests( final List> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception { + if ( groupSize == -1 ) + groupSize = allJobs.size(); + + int lastJobID = -1; + for ( final MapResult job : allJobs ) { + lastJobID = Math.max(job.getJobID(), lastJobID); + } + + final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); + + final List>> jobGroups = Utils.groupList(allJobs, groupSize); + final ReduceSumTest reduce = new ReduceSumTest(); + final Reducer reducer = new Reducer(reduce, new SimpleTimer(), 0); + + final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs)); + final ExecutorService es = Executors.newSingleThreadExecutor(); + es.submit(waitingThread); + + int nJobsSubmitted = 0; + int jobGroupCount = 0; + final int lastJobGroupCount = jobGroups.size() - 1; + setJobIDAtStart = setJobIDAtStart && groupSize == 1; + + for ( final List> jobs : jobGroups ) { + //logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs"); + for ( final MapResult job : jobs ) { + mapResultsQueue.add(job); + nJobsSubmitted++; + } + + if ( jobGroupCount == lastJobGroupCount ) { + mapResultsQueue.add(new MapResult()); + nJobsSubmitted++; + } + + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start"); + + if ( jobGroupCount == 0 && lastJobID != -1 && setJobIDAtStart ) { + // only can do the setJobID if jobs cannot be submitted out of order + reducer.setLastJobID(lastJobID); + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); + } + + final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue); + Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted"); + + if ( setJobIDAtStart ) { + final boolean submittedLastJob = jobGroupCount == lastJobGroupCount; + Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob, + "When last job is set, latch should only be released if the last job has been submitted"); + } else { + Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release"); + } + + jobGroupCount++; + } + + if ( setJobIDAtStart ) + Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); + else { + Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); + if ( lastJobID != -1 ) { + reducer.setLastJobID(lastJobID); + Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); + } + } + + Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue"); + es.shutdown(); + es.awaitTermination(1, TimeUnit.HOURS); + } + + @Test(expectedExceptions = IllegalStateException.class) + private void runSettingJobIDTwice() throws Exception { + final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); + + final Reducer reducer = new Reducer(new ReduceSumTest(), new SimpleTimer(), 0); + + reducer.setLastJobID(10); + reducer.setLastJobID(15); + } + + public class ReduceSumTest implements NSReduceFunction { + int nRead = 0; + int lastValue = -1; + + @Override public Integer apply(Integer one, Integer sum) { + Assert.assertTrue(lastValue < one, "Reduce came in out of order. Prev " + lastValue + " cur " + one); + + Assert.assertTrue(lastValue < one, "Read values coming out of order!"); + final int expected = lastValue + 1; + Assert.assertEquals((int)one, expected, "Value observed " + one + " not equal to the expected value " + expected); + nRead++; + lastValue = expected; + + return one + sum; + } + } + + final static class TestWaitingForFinalReduce implements Runnable { + final Reducer reducer; + final int expectedSum; + + TestWaitingForFinalReduce(Reducer reducer, final int expectedSum) { + this.reducer = reducer; + this.expectedSum = expectedSum; + } + + @Override + public void run() { + try { + final int observedSum = reducer.waitForFinalReduce(); + Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value"); + } catch ( InterruptedException ex ) { + Assert.fail("Got interrupted"); + } + } + } +} \ No newline at end of file