diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociNano.java index 73b73c002..e4e2254d0 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociNano.java @@ -8,10 +8,10 @@ import org.broadinstitute.sting.gatk.datasources.providers.ReferenceOrderedView; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction; import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; -import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerMapFunction; -import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerProgressFunction; -import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerReduceFunction; import java.util.Iterator; @@ -153,7 +153,7 @@ public class TraverseLociNano extends TraverseLociBase { * * Applies walker.map to MapData, returning a MapResult object containing the result */ - private class TraverseLociMap implements NanoSchedulerMapFunction { + private class TraverseLociMap implements NSMapFunction { final LocusWalker walker; private TraverseLociMap(LocusWalker walker) { @@ -174,11 +174,11 @@ public class TraverseLociNano extends TraverseLociBase { } /** - * NanoSchedulerReduceFunction for TraverseReads meeting NanoScheduler interface requirements + * NSReduceFunction for TraverseReads meeting NanoScheduler interface requirements * * Takes a MapResult object and applies the walkers reduce function to each map result, when applicable */ - private class TraverseLociReduce implements NanoSchedulerReduceFunction { + private class TraverseLociReduce implements NSReduceFunction { final LocusWalker walker; private TraverseLociReduce(LocusWalker walker) { @@ -195,7 +195,7 @@ public class TraverseLociNano extends TraverseLociBase { } } - private class TraverseLociProgress implements NanoSchedulerProgressFunction { + private class TraverseLociProgress implements NSProgressFunction { @Override public void progress(MapData lastProcessedMap) { if (lastProcessedMap.alignmentContext != null) diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index 5679747e1..b3a0a1390 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -35,9 +35,9 @@ import org.broadinstitute.sting.gatk.datasources.reads.ReadShard; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction; import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; -import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerMapFunction; -import org.broadinstitute.sting.utils.nanoScheduler.NanoSchedulerReduceFunction; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import java.util.LinkedList; @@ -191,7 +191,7 @@ public class TraverseReadsNano extends TraversalEngine, * * Applies walker.map to MapData, returning a MapResult object containing the result */ - private class TraverseReadsMap implements NanoSchedulerMapFunction { + private class TraverseReadsMap implements NSMapFunction { final ReadWalker walker; private TraverseReadsMap(ReadWalker walker) { @@ -211,11 +211,11 @@ public class TraverseReadsNano extends TraversalEngine, } /** - * NanoSchedulerReduceFunction for TraverseReads meeting NanoScheduler interface requirements + * NSReduceFunction for TraverseReads meeting NanoScheduler interface requirements * * Takes a MapResult object and applies the walkers reduce function to each map result, when applicable */ - private class TraverseReadsReduce implements NanoSchedulerReduceFunction { + private class TraverseReadsReduce implements NSReduceFunction { final ReadWalker walker; private TraverseReadsReduce(ReadWalker walker) { diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java new file mode 100644 index 000000000..2daa6c9eb --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/BlockingQueueValue.java @@ -0,0 +1,82 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Invariant; + +/** + * Wrapper to hold data for a blocking queue, distinguishing an EOF marker from a real object + * + * The only way to tell in a consumer thread that a blocking queue has no more data ever + * coming down the pipe is to pass in a "poison" or EOF object. This class provides + * a generic capacity for that... + * + * The use case looks like this: + * + * BlockingQueue q + * producer: + * while ( x has items ) + * q.put(new BlockingQueueValue(x)) + * q.put(new BlockingQueueValue()) + * + * Consumer: + * while ( true ) + * value = q.take() + * if ( value.isLast() ) + * break + * else + * do something useful with value + * + * + * User: depristo + * Date: 9/6/12 + * Time: 3:08 PM + */ +@Invariant("! isLast || value == null") +class BlockingQueueValue { + /** + * True if this is the EOF marker object + */ + final private boolean isLast; + + /** + * Our value, if we aren't the EOF marker + */ + final private T value; + + /** + * Create a new BlockingQueueValue containing a real value, where last is false + * @param value + */ + BlockingQueueValue(final T value) { + isLast = false; + this.value = value; + } + + /** + * Create a new BlockingQueueValue that is the last item + */ + BlockingQueueValue() { + isLast = true; + this.value = null; + } + + /** + * Is this the EOF marker? + * + * @return true if so, else false + */ + public boolean isLast() { + return isLast; + } + + /** + * Get the value held by this BlockingQueueValue + * + * @return the value + * @throws IllegalStateException if this is the last item + */ + public T getValue() { + if ( isLast() ) + throw new IllegalStateException("Cannot get value for last object"); + return value; + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java new file mode 100644 index 000000000..9508a15aa --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/FutureValue.java @@ -0,0 +1,45 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Create a future that simply returns a given value + * + * The only standard way to create a future in java is via the ExecutorService interface. + * If you have a data structure holding futures of value T, and you want to add a + * value to it for some reason (to add a EOF marker, for instance) you can use this + * class to create a dummy Future that simply returns a value. + * + * @author depristo + * @since 09/12 + */ +class FutureValue implements Future { + final V value; + + FutureValue(final V value) { + this.value = value; + } + + @Override public boolean cancel(boolean mayInterruptIfRunning) { + return true; + } + + @Override public boolean isCancelled() { + return false; + } + + @Override public boolean isDone() { + return true; + } + + @Override public V get() throws InterruptedException, ExecutionException { + return value; + } + + @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java new file mode 100644 index 000000000..29dddbc49 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -0,0 +1,62 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; + +/** + * Producer Thread that reads input values from an inputReads and puts them into a BlockingQueue + */ +class InputProducer implements Runnable { + /** + * The iterator we are using to get data from + */ + final Iterator inputReader; + + /** + * Our timer (may be null) that we use to track our input costs + */ + final SimpleTimer inputTimer; + + /** + * Where we put our input values for consumption + */ + final BlockingQueue outputQueue; + + public InputProducer(final Iterator inputReader, + final SimpleTimer inputTimer, + final BlockingQueue outputQueue) { + if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( outputQueue == null ) throw new IllegalArgumentException("OutputQueue cannot be null"); + + this.inputReader = inputReader; + this.inputTimer = inputTimer; + this.outputQueue = outputQueue; + } + + public void run() { + try { + while ( inputReader.hasNext() ) { + if ( inputTimer != null ) inputTimer.restart(); + final InputType input = inputReader.next(); + if ( inputTimer != null ) inputTimer.stop(); + outputQueue.put(new InputValue(input)); + } + + // add the EOF object so our consumer knows we are done in all inputs + outputQueue.put(new InputValue()); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + + /** + * Helper class that contains a read value suitable for EOF marking in a BlockingQueue + */ + class InputValue extends BlockingQueueValue { + private InputValue(InputType datum) { super(datum); } + private InputValue() { } + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java new file mode 100644 index 000000000..3cc6fa786 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -0,0 +1,36 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * Holds the results of a map job suitable for producer/consumer threading + * via a BlockingQueue + */ +class MapResult extends BlockingQueueValue { + final int jobID; + + /** + * Create a new MapResult with value datum and jod jobID ID + * + * @param datum the value produced by the map job + * @param jobID the id of the map job (for correctness testing) + */ + MapResult(final MapType datum, final int jobID) { + super(datum); + this.jobID = jobID; + if ( jobID < 0 ) throw new IllegalArgumentException("JobID must be >= 0"); + } + + /** + * Create the EOF marker version of MapResult + */ + MapResult() { + super(); + this.jobID = Integer.MAX_VALUE; + } + + /** + * @return the job ID of the map job that produced this MapResult + */ + public int getJobID() { + return jobID; + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerMapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java similarity index 84% rename from public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerMapFunction.java rename to public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java index ddf4421d2..cc5335051 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerMapFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java @@ -9,7 +9,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; * Date: 8/24/12 * Time: 9:49 AM */ -public interface NanoSchedulerMapFunction { +public interface NSMapFunction { /** * Return function on input, returning a value of ResultType * @param input diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerProgressFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java similarity index 81% rename from public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerProgressFunction.java rename to public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java index 8631196a3..8b12c62c4 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerProgressFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java @@ -7,6 +7,6 @@ package org.broadinstitute.sting.utils.nanoScheduler; * Time: 2:10 PM * To change this template use File | Settings | File Templates. */ -public interface NanoSchedulerProgressFunction { +public interface NSProgressFunction { public void progress(final InputType lastMapInput); } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java similarity index 87% rename from public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerReduceFunction.java rename to public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java index 7e58eeaf9..879a33a1d 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerReduceFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java @@ -7,7 +7,7 @@ package org.broadinstitute.sting.utils.nanoScheduler; * Date: 8/24/12 * Time: 9:49 AM */ -public interface NanoSchedulerReduceFunction { +public interface NSReduceFunction { /** * Combine one with sum into a new ReduceType * @param one the result of a map call on an input element diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index fe8731d3b..664fb7b9b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -17,12 +17,12 @@ import java.util.concurrent.*; * * The overall framework works like this * - * nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads) + * nano <- new Nanoschedule(inputBufferSize, numberOfMapElementsToProcessTogether, nThreads) * List[Input] outerData : outerDataLoop ) * result = nano.execute(outerData.iterator(), map, reduce) * - * bufferSize determines how many elements from the input stream are read in one go by the - * nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well + * inputBufferSize determines how many elements from the input stream are read in one go by the + * nanoscheduler. The scheduler may hold up to inputBufferSize in memory at one time, as well * as up to inputBufferSize map results as well. * * numberOfMapElementsToProcessTogether determines how many input elements are processed @@ -48,40 +48,45 @@ public class NanoScheduler { private final static boolean LOG_MAP_TIMES = false; private final static boolean TIME_CALLS = true; - final int bufferSize; - final int nThreads; + private final static int MAP_BUFFER_SIZE_SCALE_FACTOR = 100; + final int inputBufferSize; + final int mapBufferSize; + final int nThreads; final ExecutorService inputExecutor; final ExecutorService reduceExecutor; - final ExecutorService mapExecutor; + final ThreadPoolExecutor mapExecutor; + boolean shutdown = false; boolean debug = false; + private NSProgressFunction progressFunction = null; - private NanoSchedulerProgressFunction progressFunction = null; - - final SimpleTimer outsideSchedulerTimer = new SimpleTimer("outside"); - final SimpleTimer inputTimer = new SimpleTimer("input"); - final SimpleTimer mapTimer = new SimpleTimer("map"); - final SimpleTimer reduceTimer = new SimpleTimer("reduce"); + final SimpleTimer outsideSchedulerTimer = TIME_CALLS ? new SimpleTimer("outside") : null; + final SimpleTimer inputTimer = TIME_CALLS ? new SimpleTimer("input") : null; + final SimpleTimer mapTimer = TIME_CALLS ? new SimpleTimer("map") : null; + final SimpleTimer reduceTimer = TIME_CALLS ? new SimpleTimer("reduce") : null; /** - * Create a new nanoschedule with the desire characteristics requested by the argument + * Create a new nanoscheduler with the desire characteristics requested by the argument * - * @param bufferSize the number of input elements to read in each scheduling cycle. - * @param nThreads the number of threads to use to get work done, in addition to the thread calling execute + * @param inputBufferSize the number of input elements to read in each scheduling cycle. + * @param nThreads the number of threads to use to get work done, in addition to the + * thread calling execute */ - public NanoScheduler(final int bufferSize, - final int nThreads) { - if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize); + public NanoScheduler(final int inputBufferSize, final int nThreads) { + if ( inputBufferSize < 1 ) throw new IllegalArgumentException("inputBufferSize must be >= 1, got " + inputBufferSize); if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads); - this.bufferSize = bufferSize; + this.inputBufferSize = inputBufferSize; + this.mapBufferSize = inputBufferSize * MAP_BUFFER_SIZE_SCALE_FACTOR; this.nThreads = nThreads; if ( nThreads == 1 ) { - this.mapExecutor = this.inputExecutor = this.reduceExecutor = null; + this.mapExecutor = null; + this.inputExecutor = this.reduceExecutor = null; } else { - this.mapExecutor = Executors.newFixedThreadPool(nThreads-1, new NamedThreadFactory("NS-map-thread-%d")); + this.mapExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(nThreads-1, new NamedThreadFactory("NS-map-thread-%d")); + this.mapExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); this.reduceExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-reduce-thread-%d")); } @@ -104,8 +109,8 @@ public class NanoScheduler { * @return */ @Ensures("result > 0") - public int getBufferSize() { - return bufferSize; + public int getInputBufferSize() { + return inputBufferSize; } /** @@ -116,9 +121,11 @@ public class NanoScheduler { public void shutdown() { outsideSchedulerTimer.stop(); - shutdownExecutor("inputExecutor", inputExecutor); - shutdownExecutor("mapExecutor", mapExecutor); - shutdownExecutor("reduceExecutor", reduceExecutor); + if ( nThreads > 1 ) { + shutdownExecutor("inputExecutor", inputExecutor); + shutdownExecutor("mapExecutor", mapExecutor); + shutdownExecutor("reduceExecutor", reduceExecutor); + } shutdown = true; if (TIME_CALLS) { @@ -136,15 +143,15 @@ public class NanoScheduler { * @param name a string name for error messages for the executorService we are shutting down * @param executorService the executorService to shut down */ + @Requires({"name != null", "executorService != null"}) + @Ensures("executorService.isShutdown()") private void shutdownExecutor(final String name, final ExecutorService executorService) { - if ( executorService != null ) { - if ( executorService.isShutdown() || executorService.isTerminated() ) - throw new IllegalStateException("Executor service " + name + " is already shut down!"); + if ( executorService.isShutdown() || executorService.isTerminated() ) + throw new IllegalStateException("Executor service " + name + " is already shut down!"); - final List remaining = executorService.shutdownNow(); - if ( ! remaining.isEmpty() ) - throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!"); - } + final List remaining = executorService.shutdownNow(); + if ( ! remaining.isEmpty() ) + throw new IllegalStateException(remaining.size() + " remaining tasks found in an executor " + name + ", unexpected behavior!"); } /** @@ -204,7 +211,7 @@ public class NanoScheduler { * * @param progressFunction a progress function to call, or null if you don't want any progress callback */ - public void setProgressFunction(final NanoSchedulerProgressFunction progressFunction) { + public void setProgressFunction(final NSProgressFunction progressFunction) { this.progressFunction = progressFunction; } @@ -231,9 +238,9 @@ public class NanoScheduler { * @return the last reduce value */ public ReduceType execute(final Iterator inputReader, - final NanoSchedulerMapFunction map, + final NSMapFunction map, final ReduceType initialValue, - final NanoSchedulerReduceFunction reduce) { + final NSReduceFunction reduce) { if ( isShutdown() ) throw new IllegalStateException("execute called on already shutdown NanoScheduler"); if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); @@ -259,9 +266,9 @@ public class NanoScheduler { */ @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeSingleThreaded(final Iterator inputReader, - final NanoSchedulerMapFunction map, + final NSMapFunction map, final ReduceType initialValue, - final NanoSchedulerReduceFunction reduce) { + final NSReduceFunction reduce) { ReduceType sum = initialValue; int i = 0; @@ -278,7 +285,7 @@ public class NanoScheduler { if ( LOG_MAP_TIMES ) logger.info("MAP TIME " + (mapTimer.currentTimeNano() - preMapTime)); if ( TIME_CALLS ) mapTimer.stop(); - if ( i++ % bufferSize == 0 && progressFunction != null ) + if ( i++ % inputBufferSize == 0 && progressFunction != null ) progressFunction.progress(input); // reduce @@ -299,55 +306,53 @@ public class NanoScheduler { */ @Requires({"inputReader != null", "map != null", "reduce != null"}) private ReduceType executeMultiThreaded(final Iterator inputReader, - final NanoSchedulerMapFunction map, + final NSMapFunction map, final ReduceType initialValue, - final NanoSchedulerReduceFunction reduce) { + final NSReduceFunction reduce) { debugPrint("Executing nanoScheduler"); - // a completion service that tracks when jobs complete, so we can wait in this thread - // until all of the map jobs are completed, without having to shut down the executor itself - final ExecutorCompletionService mapJobCompletionService = - new ExecutorCompletionService(mapExecutor); - // a blocking queue that limits the number of input datum to the requested buffer size - final BlockingQueue inputQueue = new LinkedBlockingDeque(bufferSize); + final BlockingQueue.InputValue> inputQueue + = new LinkedBlockingDeque.InputValue>(inputBufferSize); - // a priority queue that stores up to bufferSize * MAP_QUEUE_SCALE_FACTOR elements + // a priority queue that stores up to mapBufferSize elements // produced by completed map jobs. - final PriorityBlockingQueue mapResultQueue = new PriorityBlockingQueue(bufferSize*100); + final BlockingQueue>> mapResultQueue = + new LinkedBlockingDeque>>(mapBufferSize); - // TODO -- the logic of this blocking queue is wrong! We need to wait for map jobs in order, not just - // -- in the order in which they are produced + // Start running the input reader thread + inputExecutor.submit(new InputProducer(inputReader, inputTimer, inputQueue)); - // TODO -- map executor must have fixed size map jobs queue - - inputExecutor.submit(new InputProducer(inputReader, inputQueue)); - final Future reduceResult = reduceExecutor.submit(new ReducerThread(reduce, initialValue, mapResultQueue)); + // Start running the reducer thread + final ReducerThread reducer + = new ReducerThread(reduce, reduceTimer, initialValue, mapResultQueue); + final Future reduceResult = reduceExecutor.submit(reducer); try { int numJobs = 0; + while ( true ) { // block on input - final InputDatum inputEnqueueWrapped = inputQueue.take(); + final InputProducer.InputValue inputEnqueueWrapped = inputQueue.take(); if ( ! inputEnqueueWrapped.isLast() ) { // get the object itself - final InputType input = inputEnqueueWrapped.datum; + final InputType input = inputEnqueueWrapped.getValue(); - // the next map call has id + 1 + // the next map call has jobID + 1 numJobs++; // send job for map via the completion service - final CallableMap doMap = new CallableMap(map, numJobs, input, mapResultQueue); - mapJobCompletionService.submit(doMap, numJobs); + final CallableMap doMap = new CallableMap(map, numJobs, input); + final Future> mapJob = mapExecutor.submit(doMap); + mapResultQueue.put(mapJob); debugPrint(" Done with cycle of map/reduce"); - if ( progressFunction != null ) // TODO -- don't cycle so often + if ( numJobs % inputBufferSize == 0 && progressFunction != null ) progressFunction.progress(input); } else { - waitForLastJob(mapJobCompletionService, numJobs); - mapResultQueue.add(new MapResult()); + mapResultQueue.put(new FutureValue>(new MapResult())); return reduceResult.get(); // wait for our result of reduce } } @@ -358,147 +363,30 @@ public class NanoScheduler { } } - /** - * Helper routine that will wait until the last map job finishes running - * by taking numJob values from the executor completion service, using - * the blocking take() call. - */ - private void waitForLastJob(final ExecutorCompletionService mapJobCompletionService, - final int numJobs ) throws InterruptedException { - for ( int i = 0; i < numJobs; i++ ) - mapJobCompletionService.take(); - } - - private class ReducerThread implements Callable { - final NanoSchedulerReduceFunction reduce; - ReduceType sum; - final PriorityBlockingQueue mapResultQueue; - - public ReducerThread(final NanoSchedulerReduceFunction reduce, - final ReduceType sum, - final PriorityBlockingQueue mapResultQueue) { - this.reduce = reduce; - this.sum = sum; - this.mapResultQueue = mapResultQueue; - } - - public ReduceType call() { - try { - while ( true ) { - final MapResult result = mapResultQueue.take(); - //System.out.println("Reduce of map result " + result.id + " with sum " + sum); - if ( result.isLast() ) { - //System.out.println("Saw last! " + result.id); - return sum; - } - else { - if ( TIME_CALLS ) reduceTimer.restart(); - sum = reduce.apply(result.datum, sum); - if ( TIME_CALLS ) reduceTimer.stop(); - } - } - } catch (InterruptedException ex) { - //System.out.println("Interrupted"); - throw new ReviewedStingException("got execution exception", ex); - } - } - } - - private class InputProducer implements Runnable { - final Iterator inputReader; - final BlockingQueue outputQueue; - - public InputProducer(final Iterator inputReader, final BlockingQueue outputQueue) { - this.inputReader = inputReader; - this.outputQueue = outputQueue; - } - - public void run() { - try { - while ( inputReader.hasNext() ) { - if ( TIME_CALLS ) inputTimer.restart(); - final InputType input = inputReader.next(); - if ( TIME_CALLS ) inputTimer.stop(); - outputQueue.put(new InputDatum(input)); - } - - // add the EOF object so we know we are done - outputQueue.put(new InputDatum()); - } catch (InterruptedException ex) { - throw new ReviewedStingException("got execution exception", ex); - } - } - } - - private class BlockingDatum { - final boolean isLast; - final T datum; - - private BlockingDatum(final T datum) { - isLast = false; - this.datum = datum; - } - - private BlockingDatum() { - isLast = true; - this.datum = null; - } - - public boolean isLast() { - return isLast; - } - } - - - private class InputDatum extends BlockingDatum { - private InputDatum(InputType datum) { super(datum); } - private InputDatum() { } - } - - private class MapResult extends BlockingDatum implements Comparable { - final Integer id; - - private MapResult(MapType datum, Integer id) { - super(datum); - this.id = id; - } - - private MapResult() { - this.id = Integer.MAX_VALUE; - } - - @Override - public int compareTo(MapResult o) { - return id.compareTo(o.id); - } - } - /** * A simple callable version of the map function for use with the executor pool */ - private class CallableMap implements Runnable { + private class CallableMap implements Callable> { final int id; final InputType input; - final NanoSchedulerMapFunction map; - final PriorityBlockingQueue mapResultQueue; + final NSMapFunction map; @Requires({"map != null"}) - private CallableMap(final NanoSchedulerMapFunction map, + private CallableMap(final NSMapFunction map, final int id, - final InputType input, - final PriorityBlockingQueue mapResultQueue) { + final InputType input) { this.id = id; this.input = input; this.map = map; - this.mapResultQueue = mapResultQueue; } - @Override public void run() { + @Override + public MapResult call() { if ( TIME_CALLS ) mapTimer.restart(); if ( debug ) debugPrint("\t\tmap " + input); final MapType result = map.apply(input); if ( TIME_CALLS ) mapTimer.stop(); - mapResultQueue.add(new MapResult(result, id)); + return new MapResult(result, id); } } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java new file mode 100644 index 000000000..bd29799b6 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java @@ -0,0 +1,64 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Thread that runs the reduce of the map/reduce. + * + * This thread reads from mapResultsQueue until the poison EOF object arrives. At each + * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the + * queue waits until the mapResultQueue has a value to take. Then, it gets and waits + * until the map result Future has a value. + */ +class ReducerThread implements Callable { + final NSReduceFunction reduce; + final SimpleTimer reduceTimer; + final BlockingQueue>> mapResultQueue; + + ReduceType sum; + int lastJobID = -1; + + public ReducerThread(final NSReduceFunction reduce, + final SimpleTimer reduceTimer, + final ReduceType sum, + final BlockingQueue>> mapResultQueue) { + if ( reduce == null ) throw new IllegalArgumentException("Reduce function cannot be null"); + if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null"); + + this.reduce = reduce; + this.reduceTimer = reduceTimer; + this.sum = sum; + this.mapResultQueue = mapResultQueue; + } + + public ReduceType call() { + try { + while ( true ) { + final MapResult result = mapResultQueue.take().get(); + if ( result.isLast() ) { + // we are done, just return sum + return sum; + } + else if ( result.getJobID() < lastJobID ) { + // make sure the map results are coming in order + throw new IllegalStateException("BUG: last jobID " + lastJobID + " > current jobID " + result.getJobID()); + } else { + // apply reduce, keeping track of sum + if ( reduceTimer != null ) reduceTimer.restart(); + sum = reduce.apply(result.getValue(), sum); + if ( reduceTimer != null ) reduceTimer.stop(); + } + } + } catch (ExecutionException ex) { + throw new ReviewedStingException("got execution exception", ex); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 21ac6dcec..47dcc1d5e 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -22,11 +22,11 @@ import java.util.List; public class NanoSchedulerUnitTest extends BaseTest { public static final int NANO_SCHEDULE_MAX_RUNTIME = 60000; - private static class Map2x implements NanoSchedulerMapFunction { + private static class Map2x implements NSMapFunction { @Override public Integer apply(Integer input) { return input * 2; } } - private static class ReduceSum implements NanoSchedulerReduceFunction { + private static class ReduceSum implements NSReduceFunction { int prevOne = Integer.MIN_VALUE; @Override public Integer apply(Integer one, Integer sum) { @@ -35,7 +35,7 @@ public class NanoSchedulerUnitTest extends BaseTest { } } - private static class ProgressCallback implements NanoSchedulerProgressFunction { + private static class ProgressCallback implements NSProgressFunction { int callBacks = 0; @Override @@ -120,7 +120,7 @@ public class NanoSchedulerUnitTest extends BaseTest { final ProgressCallback callback = new ProgressCallback(); nanoScheduler.setProgressFunction(callback); - Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument"); + Assert.assertEquals(nanoScheduler.getInputBufferSize(), test.bufferSize, "inputBufferSize argument"); Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument"); final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());