diff --git a/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java b/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java index 04ef8ece3..abd4eeaba 100755 --- a/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java +++ b/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java @@ -54,6 +54,7 @@ public class ExpandingArrayList extends ArrayList { private void maybeExpand(int index, E value) { if ( index >= size() ) { + ensureCapacity(index+1); // make sure we have space to hold at least index + 1 elements // We need to add null items until we can safely set index to element for ( int i = size(); i <= index; i++ ) add(value); diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 84bb8d45f..ee5c46642 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -37,12 +37,6 @@ class InputProducer { int nRead = 0; int inputID = -1; - /** - * A latch used to block threads that want to start up only when all of the values - * in inputReader have been read by the thread executing run() - */ - final CountDownLatch latch = new CountDownLatch(1); - public InputProducer(final Iterator inputReader) { if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); this.inputReader = inputReader; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java index 83d671560..4544f376c 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -25,14 +25,6 @@ class MapResult extends EOFMarkedValue implements Comparable= 0"); } - /** - * Create the EOF marker version of MapResult - */ - MapResult() { - super(); - this.jobID = Integer.MAX_VALUE; - } - /** * @return the job ID of the map job that produced this MapResult */ diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java new file mode 100644 index 000000000..ef74d669d --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.utils.collections.ExpandingArrayList; + +/** + * Created with IntelliJ IDEA. + * User: depristo + * Date: 12/19/12 + * Time: 3:53 PM + * + * This class makes some critical assumptions. First is that the jobID of the first + * job is 0. If this isn't true the MapResultsQueue will certainly fail. + */ +public class MapResultsQueue { + //private final static boolean DEBUG = false; + //private final static Logger logger = Logger.getLogger(MapResultsQueue.class); + + /** + * Although naturally stored as priority blocking queue, this is actually quite expensive + * due to the O(n log n) sorting calculation. Since we know that the job ids start + * at 0 and increment by 1 in each successive job, we store an array instead. The + * array is indexed by jobID, and contains the MapResult for that job id. Because elements + * can be added to the queue in any order, we need to use an expanding array list to + * store the elements. + */ + final ExpandingArrayList> queue = new ExpandingArrayList>(10000); + + /** + * The jobID of the last job we've seen + */ + int prevJobID = -1; // no jobs observed + + /** + * Put mapResult into this MapResultsQueue, associated with its jobID + * @param mapResult a non-null map result + */ + public synchronized void put(final MapResult mapResult) { + if ( mapResult == null ) throw new IllegalArgumentException("mapResult cannot be null"); + + // make sure that nothing is at the job id for map + assert queue.size() < mapResult.getJobID() || queue.get(mapResult.getJobID()) == null; + + queue.set(mapResult.getJobID(), mapResult); + } + + /** + * Should we reduce the next value in the mapResultQueue? + * + * @return true if we should reduce + */ + public synchronized boolean nextValueIsAvailable() { + final MapResult nextMapResult = queue.get(nextJobID()); + + if ( nextMapResult == null ) { + // natural case -- the next job hasn't had a value added yet + return false; + } else if ( nextMapResult.getJobID() != nextJobID() ) { + // sanity check -- the job id at next isn't the one we expect + throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not == previous job id " + prevJobID + " + 1"); + } else { + // there's a value at the next job id, so return true + return true; + } + } + + /** + * Get the next job ID'd be expect to see given our previous job id + * @return the next job id we'd fetch to reduce + */ + private int nextJobID() { + return prevJobID + 1; + } + + /** + * Can only be called when nextValueIsAvailable is true + * @return + * @throws InterruptedException + */ + // TODO -- does this have to be synchronized? -- I think the answer is no + public synchronized MapResult take() throws InterruptedException { + final MapResult result = queue.get(nextJobID()); + + // make sure the value we've fetched has the right id + assert result.getJobID() == nextJobID(); + + prevJobID = result.getJobID(); + queue.set(prevJobID, null); + + return result; + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 38a1d7b8f..6aabc2c99 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -351,8 +351,7 @@ public class NanoScheduler { // a priority queue that stores up to bufferSize elements // produced by completed map jobs. - final PriorityBlockingQueue> mapResultQueue = - new PriorityBlockingQueue>(); + final MapResultsQueue mapResultQueue = new MapResultsQueue(); final Reducer reducer = new Reducer(reduce, errorTracker, initialValue); @@ -420,12 +419,12 @@ public class NanoScheduler { private class ReadMapReduceJob implements Runnable { final InputProducer inputProducer; - final PriorityBlockingQueue> mapResultQueue; + final MapResultsQueue mapResultQueue; final NSMapFunction map; final Reducer reducer; private ReadMapReduceJob(final InputProducer inputProducer, - final PriorityBlockingQueue> mapResultQueue, + final MapResultsQueue mapResultQueue, final NSMapFunction map, final Reducer reducer) { this.inputProducer = inputProducer; diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java index a7b94e323..66927d073 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java @@ -43,11 +43,6 @@ class Reducer { int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set - /** - * The jobID of the last job we've seen - */ - int prevJobID = -1; // no jobs observed - /** * A counter keeping track of the number of jobs we're reduced */ @@ -72,26 +67,6 @@ class Reducer { this.sum = initialSum; } - /** - * Should we reduce the next value in the mapResultQueue? - * - * @param mapResultQueue the queue of map results - * @return true if we should reduce - */ - @Requires("mapResultQueue != null") - private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue> mapResultQueue) { - final MapResult nextMapResult = mapResultQueue.peek(); - if ( nextMapResult == null ) { - return false; - } else if ( nextMapResult.getJobID() < prevJobID + 1 ) { - throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not < previous job id " + prevJobID); - } else if ( nextMapResult.getJobID() == prevJobID + 1 ) { - return true; - } else { - return false; - } - } - /** * Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed * @@ -104,16 +79,20 @@ class Reducer { * @throws InterruptedException */ @Ensures("result >= 0") - public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) { + public synchronized int reduceAsMuchAsPossible(final MapResultsQueue mapResultQueue) { + // TODO -- have conditional lock acquistion. If the lock can be acquired, actually do some useful + // TODO -- work, but if it cannot just continue on your way. No sense in having all our map + // TODO -- threads block here just to fall through. The only question is if, with that locking scheme, + // TODO -- it's possible to leave some values in the map queue, as the thread owning the lock is + // TODO -- exiting and the only remaining thread to actually complete the reduce falls through. + // TODO -- all we really need to do is add a final call to reduceAsMuchAsPossible when exiting the nano scheduler + // TODO -- to make sure we've cleaned everything up if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); int nReducesNow = 0; -// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS ) -// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); try { - while ( reduceNextValueInQueue(mapResultQueue) ) { + while ( mapResultQueue.nextValueIsAvailable() ) { final MapResult result = mapResultQueue.take(); - prevJobID = result.getJobID(); if ( ! result.isEOFMarker() ) { nReducesNow++; @@ -129,8 +108,6 @@ class Reducer { errorTracker.notifyOfError(ex); countDownLatch.countDown(); } -// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS ) -// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size()); return nReducesNow; } @@ -176,10 +153,11 @@ class Reducer { public synchronized void setTotalJobCount(final int numOfSubmittedJobs) { if ( numOfSubmittedJobs < 0 ) throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs); + if ( numJobsReduced > numOfSubmittedJobs ) + throw new IllegalArgumentException("numOfSubmittedJobs " + numOfSubmittedJobs + " < numJobsReduced " + numJobsReduced); if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS) throw new IllegalStateException("setlastJobID called multiple times, but should only be called once"); - //logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs); this.numSubmittedJobs = numOfSubmittedJobs; maybeReleaseLatch(); } @@ -192,9 +170,7 @@ class Reducer { * @throws InterruptedException */ public ReduceType waitForFinalReduce() throws InterruptedException { - //logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs); countDownLatch.await(); - //logger.warn(" done waitForFinalReduce"); return sum; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java index 6c17aa78d..6ea73f3cd 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerUnitTest.java @@ -88,7 +88,7 @@ public class ReducerUnitTest extends BaseTest { if ( groupSize == -1 ) groupSize = allJobs.size(); - final PriorityBlockingQueue> mapResultsQueue = new PriorityBlockingQueue>(); + final MapResultsQueue mapResultsQueue = new MapResultsQueue(); final List>> jobGroups = Utils.groupList(allJobs, groupSize); final ReduceSumTest reduce = new ReduceSumTest(); @@ -98,6 +98,7 @@ public class ReducerUnitTest extends BaseTest { final ExecutorService es = Executors.newSingleThreadExecutor(); es.submit(waitingThread); + int lastJobID = -1; int nJobsSubmitted = 0; int jobGroupCount = 0; final int lastJobGroupCount = jobGroups.size() - 1; @@ -106,12 +107,13 @@ public class ReducerUnitTest extends BaseTest { for ( final List> jobs : jobGroups ) { //logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs"); for ( final MapResult job : jobs ) { - mapResultsQueue.add(job); + lastJobID = Math.max(lastJobID, job.getJobID()); + mapResultsQueue.put(job); nJobsSubmitted++; } if ( jobGroupCount == lastJobGroupCount ) { - mapResultsQueue.add(new MapResult()); + mapResultsQueue.put(new MapResult(lastJobID+1)); nJobsSubmitted++; } @@ -119,7 +121,7 @@ public class ReducerUnitTest extends BaseTest { if ( jobGroupCount == 0 && setJobIDAtStart ) { // only can do the setJobID if jobs cannot be submitted out of order - reducer.setTotalJobCount(allJobs.size()); + reducer.setTotalJobCount(allJobs.size()+1); Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything"); } @@ -141,7 +143,7 @@ public class ReducerUnitTest extends BaseTest { Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set"); else { Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set"); - reducer.setTotalJobCount(allJobs.size()); + reducer.setTotalJobCount(allJobs.size() + 1); Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id "); }