Final V3 version of NanoScheduler

-- Fixed basic bugs in tracking of input -> map -> reduce jobs
-- Simplified classes
-- Expanded unit tests
This commit is contained in:
Mark DePristo 2012-09-19 09:31:31 -04:00
parent e18bc4e7b1
commit 33fabb8180
6 changed files with 137 additions and 118 deletions

View File

@ -1,5 +1,7 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
@ -11,6 +13,8 @@ import java.util.concurrent.CountDownLatch;
* Producer Thread that reads input values from an inputReads and puts them into an output queue
*/
class InputProducer<InputType> implements Runnable {
private final static Logger logger = Logger.getLogger(InputProducer.class);
/**
* The iterator we are using to get data from
*/
@ -36,6 +40,7 @@ class InputProducer<InputType> implements Runnable {
boolean readLastValue = false;
int nRead = 0;
int inputID = -1;
/**
* A latch used to block threads that want to start up only when all of the values
@ -109,29 +114,27 @@ class InputProducer<InputType> implements Runnable {
public void run() {
try {
while ( true ) {
final InputValue inputValue = runOne();
outputQueue.put(inputValue);
if ( inputValue.isEOFMarker() )
final InputType value = readNextItem();
if ( value == null ) {
// add the EOF object so our consumer knows we are done in all inputs
// note that we do not increase inputID here, so that variable indicates the ID
// of the last real value read from the queue
outputQueue.put(new InputValue(inputID + 1));
break;
} else {
// add the actual value to the outputQueue
outputQueue.put(new InputValue(++inputID, value));
}
}
latch.countDown();
} catch (InterruptedException ex) {
} catch (Exception ex) {
logger.warn("Got exception " + ex);
throw new ReviewedStingException("got execution exception", ex);
}
}
protected InputValue runOne() throws InterruptedException {
final InputType value = readNextItem();
if ( value == null ) {
// add the EOF object so our consumer knows we are done in all inputs
return new InputValue();
} else {
// add the actual value
return new InputValue(value);
}
}
/**
* Block until all of the items have been read from inputReader.
*
@ -146,9 +149,49 @@ class InputProducer<InputType> implements Runnable {
/**
* Helper class that contains a read value suitable for EOF marking in a BlockingQueue
*
* This class also contains an ID, an integer incrementing from 0 to N, for N total
* values in the input stream. This ID indicates which element in the element stream this
* InputValue corresponds to. Necessary for tracking and ordering results by input position.
*
* Note that EOF markers have IDs > N, and ID values >> N can occur if many EOF markers
* are enqueued in the outputQueue.
*/
class InputValue extends EOFMarkedValue<InputType> {
private InputValue(InputType datum) { super(datum); }
private InputValue() { }
final int id;
private InputValue(final int id, InputType datum) {
super(datum);
if ( id < 0 ) throw new IllegalArgumentException("id must be >= 0");
this.id = id;
}
private InputValue(final int id) {
super();
if ( id < 0 ) throw new IllegalArgumentException("id must be >= 0");
this.id = id;
}
/**
* Returns the ID of this input marker
* @return id >= 0
*/
public int getId() {
return id;
}
/**
* Create another EOF marker with ID + 1 to this one.
*
* Useful in the case where we need to enqueue another EOF marker for future jobs and we
* want them to have a meaningful ID, one greater than the last one.
*
* @return ID
*/
@Ensures({"result.isEOFMarker()", "result.getId() == getId() + 1"})
public InputValue nextEOF() {
if ( ! isEOFMarker() )
throw new IllegalArgumentException("Cannot request next EOF marker for non-EOF marker InputValue");
return new InputValue(getId() + 1);
}
}
}

View File

@ -329,23 +329,23 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
try {
int nSubmittedJobs = 0;
int jobID = -1; // must be -1 as setLastJobID special cases -1 to indicate no jobs were enqueued
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
runningMapJobSlots.acquire();
jobID++;
mapExecutor.submit(new MapReduceJob(jobID, inputQueue, mapResultQueue, map, reducer));
mapExecutor.submit(new MapReduceJob(inputQueue, mapResultQueue, map, reducer));
nSubmittedJobs++;
}
// mark the last job id we've submitted so we now the id to wait for
reducer.setLastJobID(jobID);
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
reducer.setTotalJobCount(nSubmittedJobs);
// wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer);
} catch (InterruptedException ex) {
} catch (Exception ex) {
logger.warn("Got exception " + ex);
throw new ReviewedStingException("got execution exception", ex);
}
}
@ -356,12 +356,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
// wait until we have a final reduce result
// logger.warn("waiting for final reduce");
final ReduceType finalSum = reducer.waitForFinalReduce();
// now wait for the input provider thread to terminate
// logger.warn("waiting on inputProducer");
inputProducer.waitForDone();
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map");
runningMapJobSlots.acquire(this.bufferSize);
runningMapJobSlots.release(this.bufferSize);
@ -388,18 +391,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
}
private class MapReduceJob implements Runnable {
final int jobID;
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue;
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue;
final NSMapFunction<InputType, MapType> map;
final Reducer<MapType, ReduceType> reducer;
private MapReduceJob(final int jobID,
BlockingQueue<InputProducer<InputType>.InputValue> inputQueue,
private MapReduceJob(BlockingQueue<InputProducer<InputType>.InputValue> inputQueue,
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue,
final NSMapFunction<InputType, MapType> map,
final Reducer<MapType, ReduceType> reducer) {
this.jobID = jobID;
this.inputQueue = inputQueue;
this.mapResultQueue = mapResultQueue;
this.map = map;
@ -411,6 +411,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
try {
//debugPrint("Running MapReduceJob " + jobID);
final InputProducer<InputType>.InputValue inputWrapper = inputQueue.take();
final int jobID = inputWrapper.getId();
final MapResult<MapType> result;
if ( ! inputWrapper.isEOFMarker() ) {
@ -433,7 +434,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
progressFunction.progress(input);
} else {
// push back the EOF marker so other waiting threads can read it
inputQueue.add(inputWrapper);
inputQueue.put(inputWrapper.nextEOF());
// if there's no input we push empty MapResults with jobIDs for synchronization with Reducer
result = new MapResult<MapType>(jobID);
}
@ -441,11 +443,12 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
mapResultQueue.put(result);
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Exception ex) {
logger.warn("Got exception " + ex);
throw new ReviewedStingException("got execution exception", ex);
} finally {
// we finished a map job, release the job queue semaphore
runningMapJobSlots.release();
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
}

View File

@ -2,6 +2,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.SimpleTimer;
import java.util.concurrent.CountDownLatch;
@ -16,10 +17,10 @@ import java.util.concurrent.PriorityBlockingQueue;
* The second thread, using the waitForFinalReduce, can block on this data structure
* until that all jobs have arrived and been reduced.
*
* The key function for communication here is setLastJobID(), which the thread that submits
* The key function for communication here is setTotalJobCount(), which the thread that submits
* jobs that enqueue MapResults into the blocking queue must call ONCE to tell the
* Reduce that ID of the last job that's been submitted. When a job arrives with that
* ID, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed.
* Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs
* have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed.
*
* This thread reads from mapResultsQueue until the poison EOF object arrives. At each
* stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the
@ -27,7 +28,8 @@ import java.util.concurrent.PriorityBlockingQueue;
* until the map result Future has a value.
*/
class Reducer<MapType, ReduceType> {
private final static int UNSET_LAST_JOB_ID = -2;
private final static Logger logger = Logger.getLogger(Reducer.class);
private final static int UNSET_NUM_SUBMITTED_JOBS = -2;
final CountDownLatch countDownLatch = new CountDownLatch(1);
final NSReduceFunction<MapType, ReduceType> reduce;
@ -39,13 +41,18 @@ class Reducer<MapType, ReduceType> {
*/
ReduceType sum;
int lastJobID = UNSET_LAST_JOB_ID; // not yet set
int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set
/**
* The jobID of the last job we've seen
*/
int prevJobID = -1; // no jobs observed
/**
* A counter keeping track of the number of jobs we're reduced
*/
int numJobsReduced = 0;
/**
* Create a new Reducer that will apply the reduce function with initialSum value
* to values via reduceAsMuchAsPossible, timing the reduce function call costs with
@ -69,21 +76,28 @@ class Reducer<MapType, ReduceType> {
/**
* Should we reduce the next value in the mapResultQueue?
*
*
* @param mapResultQueue the queue of map results
* @return true if we should reduce
*/
@Requires("mapResultQueue != null")
private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
final MapResult<MapType> nextMapResult = mapResultQueue.peek();
return nextMapResult != null && nextMapResult.getJobID() == prevJobID + 1;
if ( nextMapResult == null ) {
return false;
} else if ( nextMapResult.getJobID() < prevJobID + 1 ) {
throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is < previous job id " + prevJobID);
} else if ( nextMapResult.getJobID() == prevJobID + 1 ) {
return true;
} else {
return false;
}
}
/**
* Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed
*
* As much as possible is defined as all of the MapResults in the queue are in order starting from the
* lastJobID we reduced previously, up to the either the queue being empty or where the next MapResult
* numSubmittedJobs we reduced previously, up to the either the queue being empty or where the next MapResult
* doesn't have JobID == prevJobID + 1.
*
* @param mapResultQueue a queue of MapResults in jobID order
@ -93,19 +107,17 @@ class Reducer<MapType, ReduceType> {
@Ensures("result >= 0")
public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) throws InterruptedException {
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReduces = 0;
int nReducesNow = 0;
// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
while ( reduceNextValueInQueue(mapResultQueue) ) {
final MapResult<MapType> result = mapResultQueue.take();
if ( result.getJobID() < prevJobID )
// make sure the map results are coming in order
throw new IllegalStateException("BUG: last jobID " + prevJobID + " > current jobID " + result.getJobID());
prevJobID = result.getJobID();
if ( ! result.isEOFMarker() ) {
nReduces++;
nReducesNow++;
// apply reduce, keeping track of sum
reduceTimer.restart();
@ -114,10 +126,14 @@ class Reducer<MapType, ReduceType> {
}
numJobsReduced++;
maybeReleaseLatch();
}
return nReduces;
// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
return nReducesNow;
}
/**
@ -126,43 +142,46 @@ class Reducer<MapType, ReduceType> {
* Appropriate means we've seen the last job, or there's only a single job id
*/
private synchronized void maybeReleaseLatch() {
if ( lastJobID != -2 && (prevJobID == lastJobID || lastJobID == -1) ) {
// either we've already seen the last one prevJobID == lastJobID or
if ( numJobsReduced == numSubmittedJobs ) {
// either we've already seen the last one prevJobID == numSubmittedJobs or
// the last job ID is -1, meaning that no jobs were ever submitted
countDownLatch.countDown();
}
}
/**
* For testing.
* @return
* For testing only
*
* @return true if latch is released
*/
protected synchronized boolean latchIsReleased() {
return countDownLatch.getCount() == 0;
}
/**
* Key function: tell this class the job ID of the last job that will provide data in the mapResultsQueue
* Key function: tell this class the total number of jobs will provide data in the mapResultsQueue
*
* The last job id controls when we free threads blocked on waitForFinalReduce. When we see the job
* with this last job id, those threads are released.
* The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs
* MapResults from the queue, those threads are released.
*
* Until this function is called, those thread will block forever. The last job id has a few constraints.
* First, it must be >= -1. -1 indicates that in fact no jobs will ever be submitted (i.e., there's no
* data coming) so the latch should be opened immediately. If it's >= 0, we will wait until
* a job with that id arrives.
* Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints.
* First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no
* data coming) so the latch should be opened immediately. If it's >= 1, we will wait until
* we see numOfSubmittedJobs jobs before freeing them.
*
* Note that we throw an IllegalStateException if this function is called twice.
*
* @param lastJobID int >= -1 indicating the MapResult job id of the last job that will enqueue results into our queue
* @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will
* enqueue results into our queue
*/
public synchronized void setLastJobID(final int lastJobID) {
if ( lastJobID < -1 )
throw new IllegalArgumentException("lastJobID must be > -1, but saw " + lastJobID);
if ( this.lastJobID != UNSET_LAST_JOB_ID )
public synchronized void setTotalJobCount(final int numOfSubmittedJobs) {
if ( numOfSubmittedJobs < 0 )
throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs);
if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS)
throw new IllegalStateException("setlastJobID called multiple times, but should only be called once");
this.lastJobID = lastJobID;
//logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs);
this.numSubmittedJobs = numOfSubmittedJobs;
maybeReleaseLatch();
}
@ -174,7 +193,9 @@ class Reducer<MapType, ReduceType> {
* @throws InterruptedException
*/
public ReduceType waitForFinalReduce() throws InterruptedException {
//logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs);
countDownLatch.await();
//logger.warn(" done waitForFinalReduce");
return sum;
}
}

View File

@ -143,45 +143,4 @@ public class InputProducerUnitTest extends BaseTest {
throw new UnsupportedOperationException("x");
}
}
// TODO -- this doesn't work because the synchronization in InputProvider...
// @Test(enabled = false, dataProvider = "InputProducerTest", dependsOnMethods = "testInputProducer", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME)
// public void testInputProducerSingleStepIterator(final int nElements, final int queueSize) throws InterruptedException {
//
// final List<Integer> elements = new ArrayList<Integer>(nElements);
// for ( int i = 0; i < nElements; i++ ) elements.add(i);
//
// //final BlockingIterator<Integer> myIterator = new BlockingIterator<Integer>(elements.iterator());
//
// final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
// new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
//
// final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue);
//
// final ExecutorService es = Executors.newSingleThreadExecutor();
//
// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet");
// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet");
//
// //es.submit(ip);
//
// for ( int nCycles = 0; nCycles < nElements; nCycles++ ) {
// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I'm not down reading yet");
// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I'm not down reading yet");
//
//// final int observedQueueSize = readQueue.size();
//// Assert.assertEquals(observedQueueSize, nCycles, "Reader enqueued " + observedQueueSize + " elements but expected expected " + nCycles);
//
// //myIterator.allowNext();
// //myIterator.blockTillNext();
// ip.runOne();
// }
//
// //myIterator.allowNext();
// //Thread.sleep(100);
//
// Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all");
// Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues");
// es.shutdownNow();
// }
}

View File

@ -121,7 +121,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
// for ( final int nt : Arrays.asList(1, 2, 4) ) {
// for ( final int start : Arrays.asList(0) ) {
// for ( final int end : Arrays.asList(0, 1, 2) ) {
// exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end);
// exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end, false);
// }
// }
// }

View File

@ -88,11 +88,6 @@ public class ReducerUnitTest extends BaseTest {
if ( groupSize == -1 )
groupSize = allJobs.size();
int lastJobID = -1;
for ( final MapResult<Integer> job : allJobs ) {
lastJobID = Math.max(job.getJobID(), lastJobID);
}
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
@ -122,9 +117,9 @@ public class ReducerUnitTest extends BaseTest {
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start");
if ( jobGroupCount == 0 && lastJobID != -1 && setJobIDAtStart ) {
if ( jobGroupCount == 0 && setJobIDAtStart ) {
// only can do the setJobID if jobs cannot be submitted out of order
reducer.setLastJobID(lastJobID);
reducer.setTotalJobCount(allJobs.size());
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything");
}
@ -146,10 +141,8 @@ public class ReducerUnitTest extends BaseTest {
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set");
else {
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set");
if ( lastJobID != -1 ) {
reducer.setLastJobID(lastJobID);
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id ");
}
reducer.setTotalJobCount(allJobs.size());
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id ");
}
Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue");
@ -163,8 +156,8 @@ public class ReducerUnitTest extends BaseTest {
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new SimpleTimer(), 0);
reducer.setLastJobID(10);
reducer.setLastJobID(15);
reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15);
}
public class ReduceSumTest implements NSReduceFunction<Integer, Integer> {