GSA-515 Nanoscheduler GSA-551 / Optimize nanoScheduling performance of UnifiedGenotyper

-- I've rewritten the entire NS framework to use a producer / consumer model for input -> map and from map -> reduce. This is allowing us to scale reasonably efficiently up to 4 threads (see figure). Future work on the nano scheduler will be itemized in a separate JIRA entry.
-- Restructured the NS code for clarity.  Docs everywhere.
-- This is considered version 1.0
This commit is contained in:
Mark DePristo 2012-09-07 08:57:35 -04:00
parent 9d12935986
commit c503884958
12 changed files with 383 additions and 206 deletions

View File

@ -8,10 +8,10 @@ import org.broadinstitute.sting.gatk.datasources.providers.ReferenceOrderedView;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerMapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerProgressFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerReduceFunction;
import java.util.Iterator;
@ -153,7 +153,7 @@ public class TraverseLociNano<M,T> extends TraverseLociBase<M,T> {
*
* Applies walker.map to MapData, returning a MapResult object containing the result
*/
private class TraverseLociMap implements NanoSchedulerMapFunction<MapData, MapResult> {
private class TraverseLociMap implements NSMapFunction<MapData, MapResult> {
final LocusWalker<M,T> walker;
private TraverseLociMap(LocusWalker<M, T> walker) {
@ -174,11 +174,11 @@ public class TraverseLociNano<M,T> extends TraverseLociBase<M,T> {
}
/**
* NanoSchedulerReduceFunction for TraverseReads meeting NanoScheduler interface requirements
* NSReduceFunction for TraverseReads meeting NanoScheduler interface requirements
*
* Takes a MapResult object and applies the walkers reduce function to each map result, when applicable
*/
private class TraverseLociReduce implements NanoSchedulerReduceFunction<MapResult, T> {
private class TraverseLociReduce implements NSReduceFunction<MapResult, T> {
final LocusWalker<M,T> walker;
private TraverseLociReduce(LocusWalker<M, T> walker) {
@ -195,7 +195,7 @@ public class TraverseLociNano<M,T> extends TraverseLociBase<M,T> {
}
}
private class TraverseLociProgress implements NanoSchedulerProgressFunction<MapData> {
private class TraverseLociProgress implements NSProgressFunction<MapData> {
@Override
public void progress(MapData lastProcessedMap) {
if (lastProcessedMap.alignmentContext != null)

View File

@ -35,9 +35,9 @@ import org.broadinstitute.sting.gatk.datasources.reads.ReadShard;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerMapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerReduceFunction;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.LinkedList;
@ -191,7 +191,7 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
*
* Applies walker.map to MapData, returning a MapResult object containing the result
*/
private class TraverseReadsMap implements NanoSchedulerMapFunction<MapData, MapResult> {
private class TraverseReadsMap implements NSMapFunction<MapData, MapResult> {
final ReadWalker<M,T> walker;
private TraverseReadsMap(ReadWalker<M, T> walker) {
@ -211,11 +211,11 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
}
/**
* NanoSchedulerReduceFunction for TraverseReads meeting NanoScheduler interface requirements
* NSReduceFunction for TraverseReads meeting NanoScheduler interface requirements
*
* Takes a MapResult object and applies the walkers reduce function to each map result, when applicable
*/
private class TraverseReadsReduce implements NanoSchedulerReduceFunction<MapResult, T> {
private class TraverseReadsReduce implements NSReduceFunction<MapResult, T> {
final ReadWalker<M,T> walker;
private TraverseReadsReduce(ReadWalker<M, T> walker) {

View File

@ -0,0 +1,82 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Invariant;
/**
* Wrapper to hold data for a blocking queue, distinguishing an EOF marker from a real object
*
* The only way to tell in a consumer thread that a blocking queue has no more data ever
* coming down the pipe is to pass in a "poison" or EOF object. This class provides
* a generic capacity for that...
*
* The use case looks like this:
*
* BlockingQueue q
* producer:
* while ( x has items )
* q.put(new BlockingQueueValue(x))
* q.put(new BlockingQueueValue())
*
* Consumer:
* while ( true )
* value = q.take()
* if ( value.isLast() )
* break
* else
* do something useful with value
*
*
* User: depristo
* Date: 9/6/12
* Time: 3:08 PM
*/
@Invariant("! isLast || value == null")
class BlockingQueueValue<T> {
/**
* True if this is the EOF marker object
*/
final private boolean isLast;
/**
* Our value, if we aren't the EOF marker
*/
final private T value;
/**
* Create a new BlockingQueueValue containing a real value, where last is false
* @param value
*/
BlockingQueueValue(final T value) {
isLast = false;
this.value = value;
}
/**
* Create a new BlockingQueueValue that is the last item
*/
BlockingQueueValue() {
isLast = true;
this.value = null;
}
/**
* Is this the EOF marker?
*
* @return true if so, else false
*/
public boolean isLast() {
return isLast;
}
/**
* Get the value held by this BlockingQueueValue
*
* @return the value
* @throws IllegalStateException if this is the last item
*/
public T getValue() {
if ( isLast() )
throw new IllegalStateException("Cannot get value for last object");
return value;
}
}

View File

@ -0,0 +1,45 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Create a future that simply returns a given value
*
* The only standard way to create a future in java is via the ExecutorService interface.
* If you have a data structure holding futures of value T, and you want to add a
* value to it for some reason (to add a EOF marker, for instance) you can use this
* class to create a dummy Future<T> that simply returns a value.
*
* @author depristo
* @since 09/12
*/
class FutureValue<V> implements Future<V> {
final V value;
FutureValue(final V value) {
this.value = value;
}
@Override public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}
@Override public boolean isCancelled() {
return false;
}
@Override public boolean isDone() {
return true;
}
@Override public V get() throws InterruptedException, ExecutionException {
return value;
}
@Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
}

View File

@ -0,0 +1,62 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
/**
* Producer Thread that reads input values from an inputReads and puts them into a BlockingQueue
*/
class InputProducer<InputType> implements Runnable {
/**
* The iterator we are using to get data from
*/
final Iterator<InputType> inputReader;
/**
* Our timer (may be null) that we use to track our input costs
*/
final SimpleTimer inputTimer;
/**
* Where we put our input values for consumption
*/
final BlockingQueue<InputValue> outputQueue;
public InputProducer(final Iterator<InputType> inputReader,
final SimpleTimer inputTimer,
final BlockingQueue<InputValue> outputQueue) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null");
this.inputReader = inputReader;
this.inputTimer = inputTimer;
this.outputQueue = outputQueue;
}
public void run() {
try {
while ( inputReader.hasNext() ) {
if ( inputTimer != null ) inputTimer.restart();
final InputType input = inputReader.next();
if ( inputTimer != null ) inputTimer.stop();
outputQueue.put(new InputValue(input));
}
// add the EOF object so our consumer knows we are done in all inputs
outputQueue.put(new InputValue());
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
/**
* Helper class that contains a read value suitable for EOF marking in a BlockingQueue
*/
class InputValue extends BlockingQueueValue<InputType> {
private InputValue(InputType datum) { super(datum); }
private InputValue() { }
}
}

View File

@ -0,0 +1,36 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* Holds the results of a map job suitable for producer/consumer threading
* via a BlockingQueue
*/
class MapResult<MapType> extends BlockingQueueValue<MapType> {
final int jobID;
/**
* Create a new MapResult with value datum and jod jobID ID
*
* @param datum the value produced by the map job
* @param jobID the id of the map job (for correctness testing)
*/
MapResult(final MapType datum, final int jobID) {
super(datum);
this.jobID = jobID;
if ( jobID < 0 ) throw new IllegalArgumentException("JobID must be >= 0");
}
/**
* Create the EOF marker version of MapResult
*/
MapResult() {
super();
this.jobID = Integer.MAX_VALUE;
}
/**
* @return the job ID of the map job that produced this MapResult
*/
public int getJobID() {
return jobID;
}
}

View File

@ -9,7 +9,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface NanoSchedulerMapFunction<InputType, ResultType> {
public interface NSMapFunction<InputType, ResultType> {
/**
* Return function on input, returning a value of ResultType
* @param input

View File

@ -7,6 +7,6 @@ package org.broadinstitute.sting.utils.nanoScheduler;
* Time: 2:10 PM
* To change this template use File | Settings | File Templates.
*/
public interface NanoSchedulerProgressFunction<InputType> {
public interface NSProgressFunction<InputType> {
public void progress(final InputType lastMapInput);
}

View File

@ -7,7 +7,7 @@ package org.broadinstitute.sting.utils.nanoScheduler;
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface NanoSchedulerReduceFunction<MapType, ReduceType> {
public interface NSReduceFunction<MapType, ReduceType> {
/**
* Combine one with sum into a new ReduceType
* @param one the result of a map call on an input element

View File

@ -17,12 +17,12 @@ import java.util.concurrent.*;
*
* The overall framework works like this
*
* nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads)
* nano <- new Nanoschedule(inputBufferSize, numberOfMapElementsToProcessTogether, nThreads)
* List[Input] outerData : outerDataLoop )
* result = nano.execute(outerData.iterator(), map, reduce)
*
* bufferSize determines how many elements from the input stream are read in one go by the
* nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well
* inputBufferSize determines how many elements from the input stream are read in one go by the
* nanoscheduler. The scheduler may hold up to inputBufferSize in memory at one time, as well
* as up to inputBufferSize map results as well.
*
* numberOfMapElementsToProcessTogether determines how many input elements are processed
@ -48,40 +48,45 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
private final static boolean LOG_MAP_TIMES = false;
private final static boolean TIME_CALLS = true;
final int bufferSize;
final int nThreads;
private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100;
final int inputBufferSize;
final int mapBufferSize;
final int nThreads;
final ExecutorService inputExecutor;
final ExecutorService reduceExecutor;
final ExecutorService mapExecutor;
final ThreadPoolExecutor mapExecutor;
boolean shutdown = false;
boolean debug = false;
private NSProgressFunction<InputType> progressFunction = null;
private NanoSchedulerProgressFunction<InputType> progressFunction = null;
final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside");
final SimpleTimer inputTimer = new SimpleTimer("input");
final SimpleTimer mapTimer = new SimpleTimer("map");
final SimpleTimer reduceTimer = new SimpleTimer("reduce");
final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null;
final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null;
final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null;
final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null;
/**
* Create a new nanoschedule with the desire characteristics requested by the argument
* Create a new nanoscheduler with the desire characteristics requested by the argument
*
* @param bufferSize the number of input elements to read in each scheduling cycle.
* @param nThreads the number of threads to use to get work done, in addition to the thread calling execute
* @param inputBufferSize the number of input elements to read in each scheduling cycle.
* @param nThreads the number of threads to use to get work done, in addition to the
* thread calling execute
*/
public NanoScheduler(final int bufferSize,
final int nThreads) {
if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize);
public NanoScheduler(final int inputBufferSize, final int nThreads) {
if ( inputBufferSize < 1 ) throw new IllegalArgumentException("inputBufferSize must be >= 1, got " + inputBufferSize);
if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads);
this.bufferSize = bufferSize;
this.inputBufferSize = inputBufferSize;
this.mapBufferSize = inputBufferSize * MAP_BUFFER_SIZE_SCALE_FACTOR;
this.nThreads = nThreads;
if ( nThreads == 1 ) {
this.mapExecutor = this.inputExecutor = this.reduceExecutor = null;
this.mapExecutor = null;
this.inputExecutor = this.reduceExecutor = null;
} else {
this.mapExecutor = Executors.newFixedThreadPool(nThreads-1, new NamedThreadFactory("NS-map-thread-%d"));
this.mapExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(nThreads-1, new NamedThreadFactory("NS-map-thread-%d"));
this.mapExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.reduceExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-reduce-thread-%d"));
}
@ -104,8 +109,8 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* @return
*/
@Ensures("result > 0")
public int getBufferSize() {
return bufferSize;
public int getInputBufferSize() {
return inputBufferSize;
}
/**
@ -116,9 +121,11 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
public void shutdown() {
outsideSchedulerTimer.stop();
shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor);
shutdownExecutor("reduceExecutor", reduceExecutor);
if ( nThreads > 1 ) {
shutdownExecutor("inputExecutor", inputExecutor);
shutdownExecutor("mapExecutor", mapExecutor);
shutdownExecutor("reduceExecutor", reduceExecutor);
}
shutdown = true;
if (TIME_CALLS) {
@ -136,15 +143,15 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* @param name a string name for error messages for the executorService we are shutting down
* @param executorService the executorService to shut down
*/
@Requires({"name != null", "executorService != null"})
@Ensures("executorService.isShutdown()")
private void shutdownExecutor(final String name, final ExecutorService executorService) {
if ( executorService != null ) {
if ( executorService.isShutdown() || executorService.isTerminated() )
throw new IllegalStateException("Executor service " + name + " is already shut down!");
if ( executorService.isShutdown() || executorService.isTerminated() )
throw new IllegalStateException("Executor service " + name + " is already shut down!");
final List<Runnable> remaining = executorService.shutdownNow();
if ( ! remaining.isEmpty() )
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
}
final List<Runnable> remaining = executorService.shutdownNow();
if ( ! remaining.isEmpty() )
throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!");
}
/**
@ -204,7 +211,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
*
* @param progressFunction a progress function to call, or null if you don't want any progress callback
*/
public void setProgressFunction(final NanoSchedulerProgressFunction<InputType> progressFunction) {
public void setProgressFunction(final NSProgressFunction<InputType> progressFunction) {
this.progressFunction = progressFunction;
}
@ -231,9 +238,9 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* @return the last reduce value
*/
public ReduceType execute(final Iterator<InputType> inputReader,
final NanoSchedulerMapFunction<InputType, MapType> map,
final NSMapFunction<InputType, MapType> map,
final ReduceType initialValue,
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
final NSReduceFunction<MapType, ReduceType> reduce) {
if ( isShutdown() ) throw new IllegalStateException("execute called on already shutdown NanoScheduler");
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
@ -259,9 +266,9 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
*/
@Requires({"inputReader != null", "map != null", "reduce != null"})
private ReduceType executeSingleThreaded(final Iterator<InputType> inputReader,
final NanoSchedulerMapFunction<InputType, MapType> map,
final NSMapFunction<InputType, MapType> map,
final ReduceType initialValue,
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
final NSReduceFunction<MapType, ReduceType> reduce) {
ReduceType sum = initialValue;
int i = 0;
@ -278,7 +285,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime));
if ( TIME_CALLS ) mapTimer.stop();
if ( i++ % bufferSize == 0 && progressFunction != null )
if ( i++ % inputBufferSize == 0 && progressFunction != null )
progressFunction.progress(input);
// reduce
@ -299,55 +306,53 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
*/
@Requires({"inputReader != null", "map != null", "reduce != null"})
private ReduceType executeMultiThreaded(final Iterator<InputType> inputReader,
final NanoSchedulerMapFunction<InputType, MapType> map,
final NSMapFunction<InputType, MapType> map,
final ReduceType initialValue,
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
final NSReduceFunction<MapType, ReduceType> reduce) {
debugPrint("Executing nanoScheduler");
// a completion service that tracks when jobs complete, so we can wait in this thread
// until all of the map jobs are completed, without having to shut down the executor itself
final ExecutorCompletionService<Integer> mapJobCompletionService =
new ExecutorCompletionService<Integer>(mapExecutor);
// a blocking queue that limits the number of input datum to the requested buffer size
final BlockingQueue<InputDatum> inputQueue = new LinkedBlockingDeque<InputDatum>(bufferSize);
final BlockingQueue<InputProducer<InputType>.InputValue> inputQueue
= new LinkedBlockingDeque<InputProducer<InputType>.InputValue>(inputBufferSize);
// a priority queue that stores up to bufferSize * MAP_QUEUE_SCALE_FACTOR elements
// a priority queue that stores up to mapBufferSize elements
// produced by completed map jobs.
final PriorityBlockingQueue<MapResult> mapResultQueue = new PriorityBlockingQueue<MapResult>(bufferSize*100);
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue =
new LinkedBlockingDeque<Future<MapResult<MapType>>>(mapBufferSize);
// TODO -- the logic of this blocking queue is wrong! We need to wait for map jobs in order, not just
// -- in the order in which they are produced
// Start running the input reader thread
inputExecutor.submit(new InputProducer<InputType>(inputReader, inputTimer, inputQueue));
// TODO -- map executor must have fixed size map jobs queue
inputExecutor.submit(new InputProducer(inputReader, inputQueue));
final Future<ReduceType> reduceResult = reduceExecutor.submit(new ReducerThread(reduce, initialValue, mapResultQueue));
// Start running the reducer thread
final ReducerThread<MapType, ReduceType> reducer
= new ReducerThread<MapType, ReduceType>(reduce, reduceTimer, initialValue, mapResultQueue);
final Future<ReduceType> reduceResult = reduceExecutor.submit(reducer);
try {
int numJobs = 0;
while ( true ) {
// block on input
final InputDatum inputEnqueueWrapped = inputQueue.take();
final InputProducer<InputType>.InputValue inputEnqueueWrapped = inputQueue.take();
if ( ! inputEnqueueWrapped.isLast() ) {
// get the object itself
final InputType input = inputEnqueueWrapped.datum;
final InputType input = inputEnqueueWrapped.getValue();
// the next map call has id + 1
// the next map call has jobID + 1
numJobs++;
// send job for map via the completion service
final CallableMap doMap = new CallableMap(map, numJobs, input, mapResultQueue);
mapJobCompletionService.submit(doMap, numJobs);
final CallableMap doMap = new CallableMap(map, numJobs, input);
final Future<MapResult<MapType>> mapJob = mapExecutor.submit(doMap);
mapResultQueue.put(mapJob);
debugPrint(" Done with cycle of map/reduce");
if ( progressFunction != null ) // TODO -- don't cycle so often
if ( numJobs % inputBufferSize == 0 && progressFunction != null )
progressFunction.progress(input);
} else {
waitForLastJob(mapJobCompletionService, numJobs);
mapResultQueue.add(new MapResult());
mapResultQueue.put(new FutureValue<MapResult<MapType>>(new MapResult<MapType>()));
return reduceResult.get(); // wait for our result of reduce
}
}
@ -358,147 +363,30 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
}
}
/**
* Helper routine that will wait until the last map job finishes running
* by taking numJob values from the executor completion service, using
* the blocking take() call.
*/
private void waitForLastJob(final ExecutorCompletionService<Integer> mapJobCompletionService,
final int numJobs ) throws InterruptedException {
for ( int i = 0; i < numJobs; i++ )
mapJobCompletionService.take();
}
private class ReducerThread implements Callable {
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce;
ReduceType sum;
final PriorityBlockingQueue<MapResult> mapResultQueue;
public ReducerThread(final NanoSchedulerReduceFunction<MapType, ReduceType> reduce,
final ReduceType sum,
final PriorityBlockingQueue<MapResult> mapResultQueue) {
this.reduce = reduce;
this.sum = sum;
this.mapResultQueue = mapResultQueue;
}
public ReduceType call() {
try {
while ( true ) {
final MapResult result = mapResultQueue.take();
//System.out.println("Reduce of map result " + result.id + " with sum " + sum);
if ( result.isLast() ) {
//System.out.println("Saw last! " + result.id);
return sum;
}
else {
if ( TIME_CALLS ) reduceTimer.restart();
sum = reduce.apply(result.datum, sum);
if ( TIME_CALLS ) reduceTimer.stop();
}
}
} catch (InterruptedException ex) {
//System.out.println("Interrupted");
throw new ReviewedStingException("got execution exception", ex);
}
}
}
private class InputProducer implements Runnable {
final Iterator<InputType> inputReader;
final BlockingQueue<InputDatum> outputQueue;
public InputProducer(final Iterator<InputType> inputReader, final BlockingQueue<InputDatum> outputQueue) {
this.inputReader = inputReader;
this.outputQueue = outputQueue;
}
public void run() {
try {
while ( inputReader.hasNext() ) {
if ( TIME_CALLS ) inputTimer.restart();
final InputType input = inputReader.next();
if ( TIME_CALLS ) inputTimer.stop();
outputQueue.put(new InputDatum(input));
}
// add the EOF object so we know we are done
outputQueue.put(new InputDatum());
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
}
private class BlockingDatum<T> {
final boolean isLast;
final T datum;
private BlockingDatum(final T datum) {
isLast = false;
this.datum = datum;
}
private BlockingDatum() {
isLast = true;
this.datum = null;
}
public boolean isLast() {
return isLast;
}
}
private class InputDatum extends BlockingDatum<InputType> {
private InputDatum(InputType datum) { super(datum); }
private InputDatum() { }
}
private class MapResult extends BlockingDatum<MapType> implements Comparable<MapResult> {
final Integer id;
private MapResult(MapType datum, Integer id) {
super(datum);
this.id = id;
}
private MapResult() {
this.id = Integer.MAX_VALUE;
}
@Override
public int compareTo(MapResult o) {
return id.compareTo(o.id);
}
}
/**
* A simple callable version of the map function for use with the executor pool
*/
private class CallableMap implements Runnable {
private class CallableMap implements Callable<MapResult<MapType>> {
final int id;
final InputType input;
final NanoSchedulerMapFunction<InputType, MapType> map;
final PriorityBlockingQueue<MapResult> mapResultQueue;
final NSMapFunction<InputType, MapType> map;
@Requires({"map != null"})
private CallableMap(final NanoSchedulerMapFunction<InputType, MapType> map,
private CallableMap(final NSMapFunction<InputType, MapType> map,
final int id,
final InputType input,
final PriorityBlockingQueue<MapResult> mapResultQueue) {
final InputType input) {
this.id = id;
this.input = input;
this.map = map;
this.mapResultQueue = mapResultQueue;
}
@Override public void run() {
@Override
public MapResult<MapType> call() {
if ( TIME_CALLS ) mapTimer.restart();
if ( debug ) debugPrint("\t\tmap " + input);
final MapType result = map.apply(input);
if ( TIME_CALLS ) mapTimer.stop();
mapResultQueue.add(new MapResult(result, id));
return new MapResult<MapType>(result, id);
}
}
}

View File

@ -0,0 +1,64 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Thread that runs the reduce of the map/reduce.
*
* This thread reads from mapResultsQueue until the poison EOF object arrives. At each
* stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the
* queue waits until the mapResultQueue has a value to take. Then, it gets and waits
* until the map result Future has a value.
*/
class ReducerThread<MapType, ReduceType> implements Callable<ReduceType> {
final NSReduceFunction<MapType, ReduceType> reduce;
final SimpleTimer reduceTimer;
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue;
ReduceType sum;
int lastJobID = -1;
public ReducerThread(final NSReduceFunction<MapType, ReduceType> reduce,
final SimpleTimer reduceTimer,
final ReduceType sum,
final BlockingQueue<Future<MapResult<MapType>>> mapResultQueue) {
if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null");
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
this.reduce = reduce;
this.reduceTimer = reduceTimer;
this.sum = sum;
this.mapResultQueue = mapResultQueue;
}
public ReduceType call() {
try {
while ( true ) {
final MapResult<MapType> result = mapResultQueue.take().get();
if ( result.isLast() ) {
// we are done, just return sum
return sum;
}
else if ( result.getJobID() < lastJobID ) {
// make sure the map results are coming in order
throw new IllegalStateException("BUG: last jobID " + lastJobID + " > current jobID " + result.getJobID());
} else {
// apply reduce, keeping track of sum
if ( reduceTimer != null ) reduceTimer.restart();
sum = reduce.apply(result.getValue(), sum);
if ( reduceTimer != null ) reduceTimer.stop();
}
}
} catch (ExecutionException ex) {
throw new ReviewedStingException("got execution exception", ex);
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
}

View File

@ -22,11 +22,11 @@ import java.util.List;
public class NanoSchedulerUnitTest extends BaseTest {
public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000;
private static class Map2x implements NanoSchedulerMapFunction<Integer, Integer> {
private static class Map2x implements NSMapFunction<Integer, Integer> {
@Override public Integer apply(Integer input) { return input * 2; }
}
private static class ReduceSum implements NanoSchedulerReduceFunction<Integer, Integer> {
private static class ReduceSum implements NSReduceFunction<Integer, Integer> {
int prevOne = Integer.MIN_VALUE;
@Override public Integer apply(Integer one, Integer sum) {
@ -35,7 +35,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
}
}
private static class ProgressCallback implements NanoSchedulerProgressFunction<Integer> {
private static class ProgressCallback implements NSProgressFunction<Integer> {
int callBacks = 0;
@Override
@ -120,7 +120,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
final ProgressCallback callback = new ProgressCallback();
nanoScheduler.setProgressFunction(callback);
Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument");
Assert.assertEquals(nanoScheduler.getInputBufferSize(), test.bufferSize, "inputBufferSize argument");
Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument");
final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());