Major performance improvement to the GATK engine

-- The NanoSchedule timing code (in NSRuntimeProfile) was crazy expensive, but never showed up in the profilers.  Removed all of the timing code from the NanoScheduler, the NSRuntimeProfile itself, and updated the unit tests.
-- For tools that largely pass through data quickly, this change reduces runtimes by as much as 10x.  For the RealignerTargetCreator example, the runtime before this commit was 3 hours, and after is 30 minutes (6x improvement).
-- Took this opportunity to improve the GATK ProgressMeter.  NotifyOfProgress now just keeps track of the maximum position seen, and a separate daemon thread ProgressMeterDaemon periodically wakes up and prints the current progress.  This removes all inner loop calls to the GATK timers.
-- The history of the bug started here: http://gatkforums.broadinstitute.org/discussion/comment/2402#Comment_2402
This commit is contained in:
Mark DePristo 2012-12-04 22:08:01 -05:00
parent 2b601571e7
commit 465694078e
11 changed files with 122 additions and 179 deletions

View File

@ -43,7 +43,6 @@ import org.broadinstitute.sting.utils.AutoFormattingTime;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
@ -346,9 +345,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
for ( final TraversalEngine te : allCreatedTraversalEngines)
te.shutdown();
// horrible hack to print nano scheduling information across all nano schedulers, if any were used
NanoScheduler.printCombinedRuntimeProfile();
allCreatedTraversalEngines.clear();
availableTraversalEngines.clear();
}

View File

@ -495,4 +495,14 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Serializable, HasGenome
public long sizeOfOverlap( final GenomeLoc that ) {
return ( this.overlapsP(that) ? Math.min( getStop(), that.getStop() ) - Math.max( getStart(), that.getStart() ) + 1L : 0L );
}
/**
* Returns the maximum GenomeLoc of this and other
* @param other another non-null genome loc
* @return the max of this and other
*/
public GenomeLoc max(final GenomeLoc other) {
final int cmp = this.compareTo(other);
return cmp == -1 ? other : this;
}
}

View File

@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
@ -19,11 +18,6 @@ class InputProducer<InputType> implements Runnable {
*/
final Iterator<InputType> inputReader;
/**
* Our timer (may be null) that we use to track our input costs
*/
final SimpleTimer inputTimer;
/**
* Where we put our input values for consumption
*/
@ -51,16 +45,13 @@ class InputProducer<InputType> implements Runnable {
public InputProducer(final Iterator<InputType> inputReader,
final MultiThreadedErrorTracker errorTracker,
final SimpleTimer inputTimer,
final BlockingQueue<InputValue> outputQueue) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
if ( errorTracker == null ) throw new IllegalArgumentException("errorTracker cannot be null");
if ( inputTimer == null ) throw new IllegalArgumentException("inputTimer cannot be null");
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
this.inputReader = inputReader;
this.errorTracker = errorTracker;
this.inputTimer = inputTimer;
this.outputQueue = outputQueue;
}
@ -94,18 +85,15 @@ class InputProducer<InputType> implements Runnable {
* @throws InterruptedException
*/
private synchronized InputType readNextItem() throws InterruptedException {
inputTimer.restart();
if ( ! inputReader.hasNext() ) {
// we are done, mark ourselves as such and return null
readLastValue = true;
inputTimer.stop();
return null;
} else {
// get the next value, and return it
final InputType input = inputReader.next();
if ( input == null )
throw new IllegalStateException("inputReader.next() returned a null value, breaking our contract");
inputTimer.stop();
nRead++;
return input;
}

View File

@ -1,67 +0,0 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.AutoFormattingTime;
import org.broadinstitute.sting.utils.SimpleTimer;
/**
* Holds runtime profile (input, read, map) times as tracked by NanoScheduler
*
* User: depristo
* Date: 9/10/12
* Time: 8:31 PM
*/
public class NSRuntimeProfile {
final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside");
final SimpleTimer inputTimer = new SimpleTimer("input");
final SimpleTimer mapTimer = new SimpleTimer("map");
final SimpleTimer reduceTimer = new SimpleTimer("reduce");
/**
* Combine the elapsed time information from other with this profile
*
* @param other a non-null profile
*/
public void combine(final NSRuntimeProfile other) {
outsideSchedulerTimer.addElapsed(other.outsideSchedulerTimer);
inputTimer.addElapsed(other.inputTimer);
mapTimer.addElapsed(other.mapTimer);
reduceTimer.addElapsed(other.reduceTimer);
}
/**
* Print the runtime profiling to logger
*
* @param logger
*/
public void log(final Logger logger) {
log1(logger, "Input time", inputTimer);
log1(logger, "Map time", mapTimer);
log1(logger, "Reduce time", reduceTimer);
log1(logger, "Outside time", outsideSchedulerTimer);
}
/**
* @return the total runtime for all functions of this nano scheduler
*/
//@Ensures("result >= 0.0")
public double totalRuntimeInSeconds() {
return inputTimer.getElapsedTime()
+ mapTimer.getElapsedTime()
+ reduceTimer.getElapsedTime()
+ outsideSchedulerTimer.getElapsedTime();
}
/**
* Print to logger.info timing information from timer, with name label
*
* @param label the name of the timer to display. Should be human readable
* @param timer the timer whose elapsed time we will display
*/
//@Requires({"label != null", "timer != null"})
private void log1(final Logger logger, final String label, final SimpleTimer timer) {
final double myTimeInSec = timer.getElapsedTime();
final double myTimePercent = myTimeInSec / totalRuntimeInSeconds() * 100;
logger.info(String.format("%s: %s (%5.2f%%)", label, new AutoFormattingTime(myTimeInSec), myTimePercent));
}
}

View File

@ -57,16 +57,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
boolean debug = false;
private NSProgressFunction<InputType> progressFunction = null;
/**
* Tracks the combined runtime profiles across all created nano schedulers
*/
final static private NSRuntimeProfile combinedNSRuntimeProfiler = new NSRuntimeProfile();
/**
* The profile specific to this nano scheduler
*/
final private NSRuntimeProfile myNSRuntimeProfile = new NSRuntimeProfile();
/**
* Create a new nanoscheduler with the desire characteristics requested by the argument
*
@ -94,9 +84,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
}
// start timing the time spent outside of the nanoScheduler
myNSRuntimeProfile.outsideSchedulerTimer.start();
}
/**
@ -123,11 +110,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* After this call, execute cannot be invoked without throwing an error
*/
public void shutdown() {
myNSRuntimeProfile.outsideSchedulerTimer.stop();
// add my timing information to the combined NS runtime profile
combinedNSRuntimeProfiler.combine(myNSRuntimeProfile);
if ( nThreads > 1 ) {
shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor);
@ -137,19 +119,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
shutdown = true;
}
public void printRuntimeProfile() {
myNSRuntimeProfile.log(logger);
}
public static void printCombinedRuntimeProfile() {
if ( combinedNSRuntimeProfiler.totalRuntimeInSeconds() > 0.1 )
combinedNSRuntimeProfiler.log(logger);
}
protected double getTotalRuntime() {
return myNSRuntimeProfile.totalRuntimeInSeconds();
}
/**
* Helper function to cleanly shutdown an execution service, checking that the execution
* state is clean when it's done.
@ -245,8 +214,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
myNSRuntimeProfile.outsideSchedulerTimer.stop();
ReduceType result;
if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) {
result = executeSingleThreaded(inputReader, map, initialValue, reduce);
@ -254,7 +221,6 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
result = executeMultiThreaded(inputReader, map, initialValue, reduce);
}
myNSRuntimeProfile.outsideSchedulerTimer.restart();
return result;
}
@ -273,28 +239,19 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
while ( true ) {
// start timer to ensure that both hasNext and next are caught by the timer
myNSRuntimeProfile.inputTimer.restart();
if ( ! inputReader.hasNext() ) {
myNSRuntimeProfile.inputTimer.stop();
break;
} else {
final InputType input = inputReader.next();
myNSRuntimeProfile.inputTimer.stop();
// map
myNSRuntimeProfile.mapTimer.restart();
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
final MapType mapValue = map.apply(input);
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
myNSRuntimeProfile.mapTimer.stop();
if ( i++ % this.bufferSize == 0 && progressFunction != null )
if ( progressFunction != null )
progressFunction.progress(input);
// reduce
myNSRuntimeProfile.reduceTimer.restart();
sum = reduce.apply(mapValue, sum);
myNSRuntimeProfile.reduceTimer.stop();
}
}
@ -401,7 +358,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// Create the input producer and start it running
final InputProducer<InputType> inputProducer =
new InputProducer<InputType>(inputReader, errorTracker, myNSRuntimeProfile.inputTimer, inputQueue);
new InputProducer<InputType>(inputReader, errorTracker, inputQueue);
inputExecutor.submit(inputProducer);
// a priority queue that stores up to bufferSize elements
@ -410,7 +367,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
new PriorityBlockingQueue<MapResult<MapType>>();
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, myNSRuntimeProfile.reduceTimer, initialValue);
= new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
try {
int nSubmittedJobs = 0;
@ -508,11 +465,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final InputType input = inputWrapper.getValue();
// map
myNSRuntimeProfile.mapTimer.restart();
final long preMapTime = LOG_MAP_TIMES ? 0 : myNSRuntimeProfile.mapTimer.currentTimeNano();
final MapType mapValue = map.apply(input);
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (myNSRuntimeProfile.mapTimer.currentTimeNano() - preMapTime));
myNSRuntimeProfile.mapTimer.stop();
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, jobID);

View File

@ -4,7 +4,6 @@ import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
@ -34,7 +33,6 @@ class Reducer<MapType, ReduceType> {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final NSReduceFunction<MapType, ReduceType> reduce;
final SimpleTimer reduceTimer;
final MultiThreadedErrorTracker errorTracker;
/**
@ -61,20 +59,16 @@ class Reducer<MapType, ReduceType> {
* reduceTimer
*
* @param reduce the reduce function to apply
* @param reduceTimer the timer to time the reduce function call
* @param initialSum the initial reduce sum
*/
public Reducer(final NSReduceFunction<MapType, ReduceType> reduce,
final MultiThreadedErrorTracker errorTracker,
final SimpleTimer reduceTimer,
final ReduceType initialSum) {
if ( errorTracker == null ) throw new IllegalArgumentException("Error tracker cannot be null");
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
if ( reduceTimer == null ) throw new IllegalArgumentException("reduceTimer cannot be null");
this.errorTracker = errorTracker;
this.reduce = reduce;
this.reduceTimer = reduceTimer;
this.sum = initialSum;
}
@ -125,10 +119,7 @@ class Reducer<MapType, ReduceType> {
nReducesNow++;
// apply reduce, keeping track of sum
reduceTimer.restart();
sum = reduce.apply(result.getValue(), sum);
reduceTimer.stop();
}
numJobsReduced++;

View File

@ -26,6 +26,7 @@ package org.broadinstitute.sting.utils.progressmeter;
import com.google.java.contract.Ensures;
import com.google.java.contract.Invariant;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -143,6 +144,12 @@ public class ProgressMeter {
/** We use the SimpleTimer to time our run */
private final SimpleTimer timer = new SimpleTimer();
private GenomeLoc maxGenomeLoc = null;
private String positionMessage = "starting";
private long nTotalRecordsProcessed = 0;
final ProgressMeterDaemon progressMeterDaemon;
/**
* Create a new ProgressMeter
*
@ -177,21 +184,15 @@ public class ProgressMeter {
targetSizeInBP = processingIntervals.coveredSize();
// start up the timer
progressMeterDaemon = new ProgressMeterDaemon(this);
start();
}
/**
* Forward request to notifyOfProgress
*
* Assumes that one cycle has been completed
*
* @param loc our current location. Null means "in unmapped reads"
* @param nTotalRecordsProcessed the total number of records we've processed
* Start up the progress meter, printing initialization message and starting up the
* daemon thread for periodic printing.
*/
public void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) {
notifyOfProgress(loc, false, nTotalRecordsProcessed);
}
@Requires("progressMeterDaemon != null")
private synchronized void start() {
timer.start();
lastProgressPrintTime = timer.currentTime();
@ -199,6 +200,8 @@ public class ProgressMeter {
logger.info("[INITIALIZATION COMPLETE; STARTING PROCESSING]");
logger.info(String.format("%15s processed.%s runtime per.1M.%s completed total.runtime remaining",
"Location", processingUnitName, processingUnitName));
progressMeterDaemon.start();
}
/**
@ -216,19 +219,41 @@ public class ProgressMeter {
* Synchronized to ensure that even with multiple threads calling notifyOfProgress we still
* get one clean stream of meter logs.
*
* Note this thread doesn't actually print progress, unless must print is true, but just registers
* the progress itself. A separate printing daemon periodically polls the meter to print out
* progress
*
* @param loc Current location, can be null if you are at the end of the processing unit
* @param mustPrint If true, will print out info, regardless of time interval
* @param nTotalRecordsProcessed the total number of records we've processed
*/
private synchronized void notifyOfProgress(final GenomeLoc loc, boolean mustPrint, final long nTotalRecordsProcessed) {
public synchronized void notifyOfProgress(final GenomeLoc loc, final long nTotalRecordsProcessed) {
if ( nTotalRecordsProcessed < 0 ) throw new IllegalArgumentException("nTotalRecordsProcessed must be >= 0");
// weird comparison to ensure that loc == null (in unmapped reads) is keep before maxGenomeLoc == null (on startup)
this.maxGenomeLoc = loc == null ? loc : (maxGenomeLoc == null ? loc : loc.max(maxGenomeLoc));
this.nTotalRecordsProcessed = Math.max(this.nTotalRecordsProcessed, nTotalRecordsProcessed);
// a pretty name for our position
this.positionMessage = maxGenomeLoc == null
? "unmapped reads"
: String.format("%s:%d", maxGenomeLoc.getContig(), maxGenomeLoc.getStart());
}
/**
* Actually try to print out progress
*
* This function may print out if the progress print is due, but if not enough time has elapsed
* since the last print we will not print out information.
*
* @param mustPrint if true, progress will be printed regardless of the last time we printed progress
*/
protected synchronized void printProgress(final boolean mustPrint) {
final long curTime = timer.currentTime();
final boolean printProgress = mustPrint || maxElapsedIntervalForPrinting(curTime, lastProgressPrintTime, progressPrintFrequency);
final boolean printLog = performanceLog != null && maxElapsedIntervalForPrinting(curTime, lastPerformanceLogPrintTime, PERFORMANCE_LOG_PRINT_FREQUENCY);
if ( printProgress || printLog ) {
final ProgressMeterData progressData = takeProgressSnapshot(loc, nTotalRecordsProcessed);
final ProgressMeterData progressData = takeProgressSnapshot(maxGenomeLoc, nTotalRecordsProcessed);
final AutoFormattingTime elapsed = new AutoFormattingTime(progressData.getElapsedSeconds());
final AutoFormattingTime bpRate = new AutoFormattingTime(progressData.secondsPerMillionBP());
@ -241,13 +266,8 @@ public class ProgressMeter {
lastProgressPrintTime = curTime;
updateLoggerPrintFrequency(estTotalRuntime.getTimeInSeconds());
// a pretty name for our position
final String posName = loc == null
? (mustPrint ? "done" : "unmapped reads")
: String.format("%s:%d", loc.getContig(), loc.getStart());
logger.info(String.format("%15s %5.2e %s %s %5.1f%% %s %s",
posName, progressData.getUnitsProcessed()*1.0, elapsed, unitRate,
positionMessage, progressData.getUnitsProcessed()*1.0, elapsed, unitRate,
100*fractionGenomeTargetCompleted, estTotalRuntime, timeToCompletion));
}
@ -296,13 +316,18 @@ public class ProgressMeter {
*/
public void notifyDone(final long nTotalRecordsProcessed) {
// print out the progress meter
notifyOfProgress(null, true, nTotalRecordsProcessed);
this.nTotalRecordsProcessed = nTotalRecordsProcessed;
this.positionMessage = "done";
printProgress(true);
logger.info(String.format("Total runtime %.2f secs, %.2f min, %.2f hours",
timer.getElapsedTime(), timer.getElapsedTime() / 60, timer.getElapsedTime() / 3600));
if ( performanceLog != null )
performanceLog.close();
// shutdown our daemon thread
progressMeterDaemon.done();
}
/**

View File

@ -0,0 +1,60 @@
package org.broadinstitute.sting.utils.progressmeter;
/**
* Daemon thread that periodically prints the progress of the progress meter
*
* User: depristo
* Date: 12/4/12
* Time: 9:16 PM
*/
public final class ProgressMeterDaemon extends Thread {
/**
* How frequently should we poll and print progress?
*/
private final static long POLL_FREQUENCY_MILLISECONDS = 10 * 1000;
/**
* Are we to continue periodically printing status, or should we shut down?
*/
boolean done = false;
/**
* The meter we will call print on
*/
final ProgressMeter meter;
/**
* Create a new ProgressMeterDaemon printing progress for meter
* @param meter the progress meter to print progress of
*/
public ProgressMeterDaemon(final ProgressMeter meter) {
if ( meter == null ) throw new IllegalArgumentException("meter cannot be null");
this.meter = meter;
setDaemon(true);
setName("ProgressMeterDaemon");
}
/**
* Tells this daemon thread to shutdown at the next opportunity, as the progress
* metering is complete.
*/
public final void done() {
this.done = true;
}
/**
* Start up the ProgressMeterDaemon, polling every tens of seconds to print, if
* necessary, the provided progress meter. Never exits until the JVM is complete,
* or done() is called, as the thread is a daemon thread
*/
public void run() {
while (! done) {
meter.printProgress(false);
try {
Thread.sleep(POLL_FREQUENCY_MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@ -46,7 +45,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor();
@ -94,7 +93,7 @@ public class InputProducerUnitTest extends BaseTest {
final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
new LinkedBlockingDeque<InputProducer<Integer>.InputValue>();
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), new SimpleTimer(), readQueue);
final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new MultiThreadedErrorTracker(), readQueue);
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(ip);

View File

@ -188,17 +188,6 @@ public class NanoSchedulerUnitTest extends BaseTest {
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected. Expected at least " + test.nExpectedCallbacks() + " but saw only " + callback.callBacks);
nanoScheduler.shutdown();
// TODO -- need to enable only in the case where there's serious time spend in
// TODO -- read /map / reduce, otherwise the "outside" timer doesn't add up
final double myTimeEstimate = timer.getElapsedTime();
final double tolerance = 0.1;
if ( false && myTimeEstimate > 0.1 ) {
Assert.assertTrue(nanoScheduler.getTotalRuntime() > myTimeEstimate * tolerance,
"NanoScheduler said that the total runtime was " + nanoScheduler.getTotalRuntime()
+ " but the overall test time was " + myTimeEstimate + ", beyond our tolerance factor of "
+ tolerance);
}
}
@Test(enabled = true && ! DEBUG, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler", timeOut = NANO_SCHEDULE_MAX_RUNTIME)

View File

@ -2,7 +2,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@ -93,7 +92,7 @@ public class ReducerUnitTest extends BaseTest {
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
final ReduceSumTest reduce = new ReduceSumTest();
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(reduce, new MultiThreadedErrorTracker(), 0);
final TestWaitingForFinalReduce waitingThread = new TestWaitingForFinalReduce(reducer, expectedSum(allJobs));
final ExecutorService es = Executors.newSingleThreadExecutor();
@ -155,7 +154,7 @@ public class ReducerUnitTest extends BaseTest {
private void runSettingJobIDTwice() throws Exception {
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), new SimpleTimer(), 0);
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15);