NanoScheduler optimization: don't use a PriorityBlockingQueue for the MapResultsQueue

-- Created a separate, limited interface MapResultsQueue object that previously was set to the PriorityBlockingQueue.
-- The MapResultsQueue is now backed by a synchronized ExpandingArrayList, since job ids are integers incrementing from 0 to N.  This means we avoid the n log n sort in the priority queue which was generating a lot of cost in the reduce step
-- Had to update ReducerUnitTest because the test itself was brittle, and broken when I changed the underlying code.
-- A few bits of minor code cleanup through the system (removing unused constructors, local variables, etc)
-- ExpandingArrayList called ensureCapacity so that we increase the size of the arraylist once to accommodate the upcoming size needs
This commit is contained in:
Mark DePristo 2012-12-19 16:17:51 -05:00
parent b92f563d06
commit 0f04485c24
7 changed files with 138 additions and 58 deletions

View File

@ -54,6 +54,7 @@ public class ExpandingArrayList<E> extends ArrayList<E> {
private void maybeExpand(int index, E value) {
if ( index >= size() ) {
ensureCapacity(index+1); // make sure we have space to hold at least index + 1 elements
// We need to add null items until we can safely set index to element
for ( int i = size(); i <= index; i++ )
add(value);

View File

@ -37,12 +37,6 @@ class InputProducer<InputType> {
int nRead = 0;
int inputID = -1;
/**
* A latch used to block threads that want to start up only when all of the values
* in inputReader have been read by the thread executing run()
*/
final CountDownLatch latch = new CountDownLatch(1);
public InputProducer(final Iterator<InputType> inputReader) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
this.inputReader = inputReader;

View File

@ -25,14 +25,6 @@ class MapResult<MapType> extends EOFMarkedValue<MapType> implements Comparable<M
if ( jobID < 0 ) throw new IllegalArgumentException("JobID must be >= 0");
}
/**
* Create the EOF marker version of MapResult
*/
MapResult() {
super();
this.jobID = Integer.MAX_VALUE;
}
/**
* @return the job ID of the map job that produced this MapResult
*/

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.utils.collections.ExpandingArrayList;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 12/19/12
* Time: 3:53 PM
*
* This class makes some critical assumptions. First is that the jobID of the first
* job is 0. If this isn't true the MapResultsQueue will certainly fail.
*/
public class MapResultsQueue<MapType> {
//private final static boolean DEBUG = false;
//private final static Logger logger = Logger.getLogger(MapResultsQueue.class);
/**
* Although naturally stored as priority blocking queue, this is actually quite expensive
* due to the O(n log n) sorting calculation. Since we know that the job ids start
* at 0 and increment by 1 in each successive job, we store an array instead. The
* array is indexed by jobID, and contains the MapResult for that job id. Because elements
* can be added to the queue in any order, we need to use an expanding array list to
* store the elements.
*/
final ExpandingArrayList<MapResult<MapType>> queue = new ExpandingArrayList<MapResult<MapType>>(10000);
/**
* The jobID of the last job we've seen
*/
int prevJobID = -1; // no jobs observed
/**
* Put mapResult into this MapResultsQueue, associated with its jobID
* @param mapResult a non-null map result
*/
public synchronized void put(final MapResult<MapType> mapResult) {
if ( mapResult == null ) throw new IllegalArgumentException("mapResult cannot be null");
// make sure that nothing is at the job id for map
assert queue.size() < mapResult.getJobID() || queue.get(mapResult.getJobID()) == null;
queue.set(mapResult.getJobID(), mapResult);
}
/**
* Should we reduce the next value in the mapResultQueue?
*
* @return true if we should reduce
*/
public synchronized boolean nextValueIsAvailable() {
final MapResult<MapType> nextMapResult = queue.get(nextJobID());
if ( nextMapResult == null ) {
// natural case -- the next job hasn't had a value added yet
return false;
} else if ( nextMapResult.getJobID() != nextJobID() ) {
// sanity check -- the job id at next isn't the one we expect
throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not == previous job id " + prevJobID + " + 1");
} else {
// there's a value at the next job id, so return true
return true;
}
}
/**
* Get the next job ID'd be expect to see given our previous job id
* @return the next job id we'd fetch to reduce
*/
private int nextJobID() {
return prevJobID + 1;
}
/**
* Can only be called when nextValueIsAvailable is true
* @return
* @throws InterruptedException
*/
// TODO -- does this have to be synchronized? -- I think the answer is no
public synchronized MapResult<MapType> take() throws InterruptedException {
final MapResult<MapType> result = queue.get(nextJobID());
// make sure the value we've fetched has the right id
assert result.getJobID() == nextJobID();
prevJobID = result.getJobID();
queue.set(prevJobID, null);
return result;
}
}

View File

@ -351,8 +351,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// a priority queue that stores up to bufferSize elements
// produced by completed map jobs.
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
new PriorityBlockingQueue<MapResult<MapType>>();
final MapResultsQueue<MapType> mapResultQueue = new MapResultsQueue<MapType>();
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
@ -420,12 +419,12 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
private class ReadMapReduceJob implements Runnable {
final InputProducer<InputType> inputProducer;
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue;
final MapResultsQueue<MapType> mapResultQueue;
final NSMapFunction<InputType, MapType> map;
final Reducer<MapType, ReduceType> reducer;
private ReadMapReduceJob(final InputProducer<InputType> inputProducer,
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue,
final MapResultsQueue<MapType> mapResultQueue,
final NSMapFunction<InputType, MapType> map,
final Reducer<MapType, ReduceType> reducer) {
this.inputProducer = inputProducer;

View File

@ -43,11 +43,6 @@ class Reducer<MapType, ReduceType> {
int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set
/**
* The jobID of the last job we've seen
*/
int prevJobID = -1; // no jobs observed
/**
* A counter keeping track of the number of jobs we're reduced
*/
@ -72,26 +67,6 @@ class Reducer<MapType, ReduceType> {
this.sum = initialSum;
}
/**
* Should we reduce the next value in the mapResultQueue?
*
* @param mapResultQueue the queue of map results
* @return true if we should reduce
*/
@Requires("mapResultQueue != null")
private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
final MapResult<MapType> nextMapResult = mapResultQueue.peek();
if ( nextMapResult == null ) {
return false;
} else if ( nextMapResult.getJobID() < prevJobID + 1 ) {
throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not < previous job id " + prevJobID);
} else if ( nextMapResult.getJobID() == prevJobID + 1 ) {
return true;
} else {
return false;
}
}
/**
* Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed
*
@ -104,16 +79,20 @@ class Reducer<MapType, ReduceType> {
* @throws InterruptedException
*/
@Ensures("result >= 0")
public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
public synchronized int reduceAsMuchAsPossible(final MapResultsQueue<MapType> mapResultQueue) {
// TODO -- have conditional lock acquistion. If the lock can be acquired, actually do some useful
// TODO -- work, but if it cannot just continue on your way. No sense in having all our map
// TODO -- threads block here just to fall through. The only question is if, with that locking scheme,
// TODO -- it's possible to leave some values in the map queue, as the thread owning the lock is
// TODO -- exiting and the only remaining thread to actually complete the reduce falls through.
// TODO -- all we really need to do is add a final call to reduceAsMuchAsPossible when exiting the nano scheduler
// TODO -- to make sure we've cleaned everything up
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReducesNow = 0;
// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
try {
while ( reduceNextValueInQueue(mapResultQueue) ) {
while ( mapResultQueue.nextValueIsAvailable() ) {
final MapResult<MapType> result = mapResultQueue.take();
prevJobID = result.getJobID();
if ( ! result.isEOFMarker() ) {
nReducesNow++;
@ -129,8 +108,6 @@ class Reducer<MapType, ReduceType> {
errorTracker.notifyOfError(ex);
countDownLatch.countDown();
}
// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
return nReducesNow;
}
@ -176,10 +153,11 @@ class Reducer<MapType, ReduceType> {
public synchronized void setTotalJobCount(final int numOfSubmittedJobs) {
if ( numOfSubmittedJobs < 0 )
throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs);
if ( numJobsReduced > numOfSubmittedJobs )
throw new IllegalArgumentException("numOfSubmittedJobs " + numOfSubmittedJobs + " < numJobsReduced " + numJobsReduced);
if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS)
throw new IllegalStateException("setlastJobID called multiple times, but should only be called once");
//logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs);
this.numSubmittedJobs = numOfSubmittedJobs;
maybeReleaseLatch();
}
@ -192,9 +170,7 @@ class Reducer<MapType, ReduceType> {
* @throws InterruptedException
*/
public ReduceType waitForFinalReduce() throws InterruptedException {
//logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs);
countDownLatch.await();
//logger.warn(" done waitForFinalReduce");
return sum;
}
}

View File

@ -88,7 +88,7 @@ public class ReducerUnitTest extends BaseTest {
if ( groupSize == -1 )
groupSize = allJobs.size();
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
final ReduceSumTest reduce = new ReduceSumTest();
@ -98,6 +98,7 @@ public class ReducerUnitTest extends BaseTest {
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(waitingThread);
int lastJobID = -1;
int nJobsSubmitted = 0;
int jobGroupCount = 0;
final int lastJobGroupCount = jobGroups.size() - 1;
@ -106,12 +107,13 @@ public class ReducerUnitTest extends BaseTest {
for ( final List<MapResult<Integer>> jobs : jobGroups ) {
//logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs");
for ( final MapResult<Integer> job : jobs ) {
mapResultsQueue.add(job);
lastJobID = Math.max(lastJobID, job.getJobID());
mapResultsQueue.put(job);
nJobsSubmitted++;
}
if ( jobGroupCount == lastJobGroupCount ) {
mapResultsQueue.add(new MapResult<Integer>());
mapResultsQueue.put(new MapResult<Integer>(lastJobID+1));
nJobsSubmitted++;
}
@ -119,7 +121,7 @@ public class ReducerUnitTest extends BaseTest {
if ( jobGroupCount == 0 && setJobIDAtStart ) {
// only can do the setJobID if jobs cannot be submitted out of order
reducer.setTotalJobCount(allJobs.size());
reducer.setTotalJobCount(allJobs.size()+1);
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything");
}
@ -141,7 +143,7 @@ public class ReducerUnitTest extends BaseTest {
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set");
else {
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set");
reducer.setTotalJobCount(allJobs.size());
reducer.setTotalJobCount(allJobs.size() + 1);
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id ");
}