From 33fabb8180391a66f50711792a18aedf18223765 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 19 Sep 2012 09:31:31 -0400 Subject: [PATCH] Final V3 version of NanoScheduler -- Fixed basic bugs in tracking of input -> map -> reduce jobs -- Simplified classes -- Expanded unit tests --- .../utils/nanoScheduler/InputProducer.java | 77 ++++++++++++---- .../utils/nanoScheduler/NanoScheduler.java | 29 ++++--- .../sting/utils/nanoScheduler/Reducer.java | 87 ++++++++++++------- .../nanoScheduler/InputProducerUnitTest.java | 41 --------- .../nanoScheduler/NanoSchedulerUnitTest.java | 2 +- .../utils/nanoScheduler/ReducerUnitTest.java | 19 ++-- 6 files changed, 137 insertions(+), 118 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index d0d25e886..adec98cff 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -1,5 +1,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; +import com.google.java.contract.Ensures; +import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -11,6 +13,8 @@ import java.util.concurrent.CountDownLatch; * Producer Thread that reads input values from an inputReads and puts them into an output queue */ class InputProducer implements Runnable { + private final static Logger logger = Logger.getLogger(InputProducer.class); + /** * The iterator we are using to get data from */ @@ -36,6 +40,7 @@ class InputProducer implements Runnable { boolean readLastValue = false; int nRead = 0; + int inputID = -1; /** * A latch used to block threads that want to start up only when all of the values @@ -109,29 +114,27 @@ class InputProducer implements Runnable { public void run() { try { while ( true ) { - final InputValue inputValue = runOne(); - outputQueue.put(inputValue); - if ( inputValue.isEOFMarker() ) + final InputType value = readNextItem(); + + if ( value == null ) { + // add the EOF object so our consumer knows we are done in all inputs + // note that we do not increase inputID here, so that variable indicates the ID + // of the last real value read from the queue + outputQueue.put(new InputValue(inputID + 1)); break; + } else { + // add the actual value to the outputQueue + outputQueue.put(new InputValue(++inputID, value)); + } } latch.countDown(); - } catch (InterruptedException ex) { + } catch (Exception ex) { + logger.warn("Got exception " + ex); throw new ReviewedStingException("got execution exception", ex); } } - protected InputValue runOne() throws InterruptedException { - final InputType value = readNextItem(); - if ( value == null ) { - // add the EOF object so our consumer knows we are done in all inputs - return new InputValue(); - } else { - // add the actual value - return new InputValue(value); - } - } - /** * Block until all of the items have been read from inputReader. * @@ -146,9 +149,49 @@ class InputProducer implements Runnable { /** * Helper class that contains a read value suitable for EOF marking in a BlockingQueue + * + * This class also contains an ID, an integer incrementing from 0 to N, for N total + * values in the input stream. This ID indicates which element in the element stream this + * InputValue corresponds to. Necessary for tracking and ordering results by input position. + * + * Note that EOF markers have IDs > N, and ID values >> N can occur if many EOF markers + * are enqueued in the outputQueue. */ class InputValue extends EOFMarkedValue { - private InputValue(InputType datum) { super(datum); } - private InputValue() { } + final int id; + + private InputValue(final int id, InputType datum) { + super(datum); + if ( id < 0 ) throw new IllegalArgumentException("id must be >= 0"); + this.id = id; + } + private InputValue(final int id) { + super(); + if ( id < 0 ) throw new IllegalArgumentException("id must be >= 0"); + this.id = id; + } + + /** + * Returns the ID of this input marker + * @return id >= 0 + */ + public int getId() { + return id; + } + + /** + * Create another EOF marker with ID + 1 to this one. + * + * Useful in the case where we need to enqueue another EOF marker for future jobs and we + * want them to have a meaningful ID, one greater than the last one. + * + * @return ID + */ + @Ensures({"result.isEOFMarker()", "result.getId() == getId() + 1"}) + public InputValue nextEOF() { + if ( ! isEOFMarker() ) + throw new IllegalArgumentException("Cannot request next EOF marker for non-EOF marker InputValue"); + return new InputValue(getId() + 1); + } } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 0aa27f662..31ce04074 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -329,23 +329,23 @@ public class NanoScheduler { try { int nSubmittedJobs = 0; - int jobID = -1; // must be -1 as setLastJobID special cases -1 to indicate no jobs were enqueued while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) { // acquire a slot to run a map job. Blocks if too many jobs are enqueued runningMapJobSlots.acquire(); - jobID++; - mapExecutor.submit(new MapReduceJob(jobID, inputQueue, mapResultQueue, map, reducer)); + mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer)); nSubmittedJobs++; } // mark the last job id we've submitted so we now the id to wait for - reducer.setLastJobID(jobID); + //logger.warn("setting jobs submitted to " + nSubmittedJobs); + reducer.setTotalJobCount(nSubmittedJobs); // wait for all of the input and map threads to finish return waitForCompletion(inputProducer, reducer); - } catch (InterruptedException ex) { + } catch (Exception ex) { + logger.warn("Got exception " + ex); throw new ReviewedStingException("got execution exception", ex); } } @@ -356,12 +356,15 @@ public class NanoScheduler { private ReduceType waitForCompletion(final InputProducer inputProducer, final Reducer reducer) throws InterruptedException { // wait until we have a final reduce result +// logger.warn("waiting for final reduce"); final ReduceType finalSum = reducer.waitForFinalReduce(); // now wait for the input provider thread to terminate +// logger.warn("waiting on inputProducer"); inputProducer.waitForDone(); // wait for all the map threads to finish by acquiring and then releasing all map job semaphores +// logger.warn("waiting on map"); runningMapJobSlots.acquire(this.bufferSize); runningMapJobSlots.release(this.bufferSize); @@ -388,18 +391,15 @@ public class NanoScheduler { } private class MapReduceJob implements Runnable { - final int jobID; final BlockingQueue.InputValue> inputQueue; final PriorityBlockingQueue> mapResultQueue; final NSMapFunction map; final Reducer reducer; - private MapReduceJob(final int jobID, - BlockingQueue.InputValue> inputQueue, + private MapReduceJob(BlockingQueue.InputValue> inputQueue, final PriorityBlockingQueue> mapResultQueue, final NSMapFunction map, final Reducer reducer) { - this.jobID = jobID; this.inputQueue = inputQueue; this.mapResultQueue = mapResultQueue; this.map = map; @@ -411,6 +411,7 @@ public class NanoScheduler { try { //debugPrint("Running MapReduceJob " + jobID); final InputProducer.InputValue inputWrapper = inputQueue.take(); + final int jobID = inputWrapper.getId(); final MapResult result; if ( ! inputWrapper.isEOFMarker() ) { @@ -433,7 +434,8 @@ public class NanoScheduler { progressFunction.progress(input); } else { // push back the EOF marker so other waiting threads can read it - inputQueue.add(inputWrapper); + inputQueue.put(inputWrapper.nextEOF()); + // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer result = new MapResult(jobID); } @@ -441,11 +443,12 @@ public class NanoScheduler { mapResultQueue.put(result); final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue); - + } catch (Exception ex) { + logger.warn("Got exception " + ex); + throw new ReviewedStingException("got execution exception", ex); + } finally { // we finished a map job, release the job queue semaphore runningMapJobSlots.release(); - } catch (InterruptedException ex) { - throw new ReviewedStingException("got execution exception", ex); } } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index 4fc34e2c9..428ab37fd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; +import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.SimpleTimer; import java.util.concurrent.CountDownLatch; @@ -16,10 +17,10 @@ import java.util.concurrent.PriorityBlockingQueue; * The second thread, using the waitForFinalReduce, can block on this data structure * until that all jobs have arrived and been reduced. * - * The key function for communication here is setLastJobID(), which the thread that submits + * The key function for communication here is setTotalJobCount(), which the thread that submits * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the - * Reduce that ID of the last job that's been submitted. When a job arrives with that - * ID, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. + * Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs + * have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed. * * This thread reads from mapResultsQueue until the poison EOF object arrives. At each * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the @@ -27,7 +28,8 @@ import java.util.concurrent.PriorityBlockingQueue; * until the map result Future has a value. */ class Reducer { - private final static int UNSET_LAST_JOB_ID = -2; + private final static Logger logger = Logger.getLogger(Reducer.class); + private final static int UNSET_NUM_SUBMITTED_JOBS = -2; final CountDownLatch countDownLatch = new CountDownLatch(1); final NSReduceFunction reduce; @@ -39,13 +41,18 @@ class Reducer { */ ReduceType sum; - int lastJobID = UNSET_LAST_JOB_ID; // not yet set + int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set /** * The jobID of the last job we've seen */ int prevJobID = -1; // no jobs observed + /** + * A counter keeping track of the number of jobs we're reduced + */ + int numJobsReduced = 0; + /** * Create a new Reducer that will apply the reduce function with initialSum value * to values via reduceAsMuchAsPossible, timing the reduce function call costs with @@ -69,21 +76,28 @@ class Reducer { /** * Should we reduce the next value in the mapResultQueue? * - * * @param mapResultQueue the queue of map results * @return true if we should reduce */ @Requires("mapResultQueue != null") private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue> mapResultQueue) { final MapResult nextMapResult = mapResultQueue.peek(); - return nextMapResult != null && nextMapResult.getJobID() == prevJobID + 1; + if ( nextMapResult == null ) { + return false; + } else if ( nextMapResult.getJobID() < prevJobID + 1 ) { + throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is < previous job id " + prevJobID); + } else if ( nextMapResult.getJobID() == prevJobID + 1 ) { + return true; + } else { + return false; + } } /** * Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed * * As much as possible is defined as all of the MapResults in the queue are in order starting from the - * lastJobID we reduced previously, up to the either the queue being empty or where the next MapResult + * numSubmittedJobs we reduced previously, up to the either the queue being empty or where the next MapResult * doesn't have JobID == prevJobID + 1. * * @param mapResultQueue a queue of MapResults in jobID order @@ -93,19 +107,17 @@ class Reducer { @Ensures("result >= 0") public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) throws InterruptedException { if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); - int nReduces = 0; + int nReducesNow = 0; + +// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS ) +// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); while ( reduceNextValueInQueue(mapResultQueue) ) { final MapResult result = mapResultQueue.take(); - - if ( result.getJobID() < prevJobID ) - // make sure the map results are coming in order - throw new IllegalStateException("BUG: last jobID " + prevJobID + " > current jobID " + result.getJobID()); - prevJobID = result.getJobID(); if ( ! result.isEOFMarker() ) { - nReduces++; + nReducesNow++; // apply reduce, keeping track of sum reduceTimer.restart(); @@ -114,10 +126,14 @@ class Reducer { } + numJobsReduced++; maybeReleaseLatch(); } - return nReduces; +// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS ) +// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); + + return nReducesNow; } /** @@ -126,43 +142,46 @@ class Reducer { * Appropriate means we've seen the last job, or there's only a single job id */ private synchronized void maybeReleaseLatch() { - if ( lastJobID != -2 && (prevJobID == lastJobID || lastJobID == -1) ) { - // either we've already seen the last one prevJobID == lastJobID or + if ( numJobsReduced == numSubmittedJobs ) { + // either we've already seen the last one prevJobID == numSubmittedJobs or // the last job ID is -1, meaning that no jobs were ever submitted countDownLatch.countDown(); } } /** - * For testing. - * @return + * For testing only + * + * @return true if latch is released */ protected synchronized boolean latchIsReleased() { return countDownLatch.getCount() == 0; } /** - * Key function: tell this class the job ID of the last job that will provide data in the mapResultsQueue + * Key function: tell this class the total number of jobs will provide data in the mapResultsQueue * - * The last job id controls when we free threads blocked on waitForFinalReduce. When we see the job - * with this last job id, those threads are released. + * The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs + * MapResults from the queue, those threads are released. * - * Until this function is called, those thread will block forever. The last job id has a few constraints. - * First, it must be >= -1. -1 indicates that in fact no jobs will ever be submitted (i.e., there's no - * data coming) so the latch should be opened immediately. If it's >= 0, we will wait until - * a job with that id arrives. + * Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints. + * First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no + * data coming) so the latch should be opened immediately. If it's >= 1, we will wait until + * we see numOfSubmittedJobs jobs before freeing them. * * Note that we throw an IllegalStateException if this function is called twice. * - * @param lastJobID int >= -1 indicating the MapResult job id of the last job that will enqueue results into our queue + * @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will + * enqueue results into our queue */ - public synchronized void setLastJobID(final int lastJobID) { - if ( lastJobID < -1 ) - throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID); - if ( this.lastJobID != UNSET_LAST_JOB_ID ) + public synchronized void setTotalJobCount(final int numOfSubmittedJobs) { + if ( numOfSubmittedJobs < 0 ) + throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs); + if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS) throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); - this.lastJobID = lastJobID; + //logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs); + this.numSubmittedJobs = numOfSubmittedJobs; maybeReleaseLatch(); } @@ -174,7 +193,9 @@ class Reducer { * @throws InterruptedException */ public ReduceType waitForFinalReduce() throws InterruptedException { + //logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs); countDownLatch.await(); + //logger.warn(" done waitForFinalReduce"); return sum; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 3baca66ef..5f54303a9 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -143,45 +143,4 @@ public class InputProducerUnitTest extends BaseTest { throw new UnsupportedOperationException("x"); } } - - // TODO -- this doesn't work because the synchronization in InputProvider... -// @Test(enabled = false, dataProvider = "InputProducerTest", dependsOnMethods = "testInputProducer", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) -// public void testInputProducerSingleStepIterator(final int nElements, final int queueSize) throws InterruptedException { -// -// final List elements = new ArrayList(nElements); -// for ( int i = 0; i < nElements; i++ ) elements.add(i); -// -// //final BlockingIterator myIterator = new BlockingIterator(elements.iterator()); -// -// final LinkedBlockingDeque.InputValue> readQueue = -// new LinkedBlockingDeque.InputValue>(queueSize); -// -// final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); -// -// final ExecutorService es = Executors.newSingleThreadExecutor(); -// -// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet"); -// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); -// -// //es.submit(ip); -// -// for ( int nCycles = 0; nCycles < nElements; nCycles++ ) { -// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I'm not down reading yet"); -// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I'm not down reading yet"); -// -//// final int observedQueueSize = readQueue.size(); -//// Assert.assertEquals(observedQueueSize, nCycles, "Reader enqueued " + observedQueueSize + " elements but expected expected " + nCycles); -// -// //myIterator.allowNext(); -// //myIterator.blockTillNext(); -// ip.runOne(); -// } -// -// //myIterator.allowNext(); -// //Thread.sleep(100); -// -// Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all"); -// Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues"); -// es.shutdownNow(); -// } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index eede30077..d9fe4ddd6 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -121,7 +121,7 @@ public class NanoSchedulerUnitTest extends BaseTest { // for ( final int nt : Arrays.asList(1, 2, 4) ) { // for ( final int start : Arrays.asList(0) ) { // for ( final int end : Arrays.asList(0, 1, 2) ) { -// exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end); +// exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end, false); // } // } // } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index d5136abbe..2732d67d3 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -88,11 +88,6 @@ public class ReducerUnitTest extends BaseTest { if ( groupSize == -1 ) groupSize = allJobs.size(); - int lastJobID = -1; - for ( final MapResult job : allJobs ) { - lastJobID = Math.max(job.getJobID(), lastJobID); - } - final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); final List>> jobGroups = Utils.groupList(allJobs, groupSize); @@ -122,9 +117,9 @@ public class ReducerUnitTest extends BaseTest { Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start"); - if ( jobGroupCount == 0 && lastJobID != -1 && setJobIDAtStart ) { + if ( jobGroupCount == 0 && setJobIDAtStart ) { // only can do the setJobID if jobs cannot be submitted out of order - reducer.setLastJobID(lastJobID); + reducer.setTotalJobCount(allJobs.size()); Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); } @@ -146,10 +141,8 @@ public class ReducerUnitTest extends BaseTest { Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); else { Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); - if ( lastJobID != -1 ) { - reducer.setLastJobID(lastJobID); - Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); - } + reducer.setTotalJobCount(allJobs.size()); + Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); } Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue"); @@ -163,8 +156,8 @@ public class ReducerUnitTest extends BaseTest { final Reducer reducer = new Reducer(new ReduceSumTest(), new SimpleTimer(), 0); - reducer.setLastJobID(10); - reducer.setLastJobID(15); + reducer.setTotalJobCount(10); + reducer.setTotalJobCount(15); } public class ReduceSumTest implements NSReduceFunction {