From b3fd74f0c4b02c13bdf9777ece3ac325960f7267 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 24 Aug 2012 13:25:05 -0400 Subject: [PATCH 4/6] HaplotypeCaller forbids BAQ --- .../gatk/walkers/haplotypecaller/HaplotypeCaller.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java index acb5c9ebe..845fc68a6 100755 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java @@ -28,8 +28,10 @@ package org.broadinstitute.sting.gatk.walkers.haplotypecaller; import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.gatk.arguments.StandardCallerArgumentCollection; +import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.gatk.walkers.genotyper.*; import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult; +import org.broadinstitute.sting.utils.baq.BAQ; import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.commandline.*; import org.broadinstitute.sting.gatk.CommandLineGATK; @@ -40,10 +42,6 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContextUtils; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.filters.BadMateFilter; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; -import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension; -import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; -import org.broadinstitute.sting.gatk.walkers.PartitionBy; -import org.broadinstitute.sting.gatk.walkers.PartitionType; import org.broadinstitute.sting.gatk.walkers.annotator.VariantAnnotatorEngine; import org.broadinstitute.sting.gatk.walkers.annotator.interfaces.AnnotatorCompatible; import org.broadinstitute.sting.utils.*; @@ -103,6 +101,7 @@ import java.util.*; @DocumentedGATKFeature( groupName = "Variant Discovery Tools", extraDocs = {CommandLineGATK.class} ) @PartitionBy(PartitionType.LOCUS) +@BAQMode(ApplicationTime = BAQ.ApplicationTime.FORBIDDEN) @ActiveRegionExtension(extension=65, maxRegion=300) public class HaplotypeCaller extends ActiveRegionWalker implements AnnotatorCompatible { From d6e6b30caf15d2f7f64fcc1f2b710b458507f7be Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 24 Aug 2012 14:07:44 -0400 Subject: [PATCH 5/6] Initial implementation of GSA-515: Nanoscheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – Write general NanoScheduler framework in utils.threading. Test with reading via iterator from list of integers, map is int * 2, reduce is sum. Should be efficiency using resources to do sum of 2 * (sum(1 - X)). Done! CPU parallelism is nano threads. Pfor across read / map / reduce. Use work queue to implement. Create general read map reduce framework in utils. Test parallelism independently before hooking up to Locus iterator Represent explicitly the dependency graph. Scheduler should choose the work units that are ready for computation, that are marked as "completing a computation", and then finally that maximize the number of sequent available work units. May be worth measuring expected cost for read read / map / reduce unit and use it to balance the compute As input is single threaded just need one thread to populate inputs, which runs as fast as possible on parallel pushing data to fixed size queue. Each push creates map job and links to upcoming reduce job. Note that there's at most one thread for IO tasks, and all of the threads can contribute to CPU tasks --- .../utils/nanoScheduler/MapFunction.java | 12 ++ .../sting/utils/nanoScheduler/MapResult.java | 31 ++++ .../utils/nanoScheduler/NanoScheduler.java | 165 ++++++++++++++++++ .../utils/nanoScheduler/ReduceFunction.java | 13 ++ .../nanoScheduler/NanoSchedulerUnitTest.java | 93 ++++++++++ 5 files changed, 314 insertions(+) create mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java create mode 100644 public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java new file mode 100644 index 000000000..dd18e09a9 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java @@ -0,0 +1,12 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * A function that maps from InputType -> ResultType + * + * User: depristo + * Date: 8/24/12 + * Time: 9:49 AM + */ +public interface MapFunction { + public ResultType apply(final InputType input); +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java new file mode 100644 index 000000000..90e7c5908 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java @@ -0,0 +1,31 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * Created with IntelliJ IDEA. + * User: depristo + * Date: 8/24/12 + * Time: 9:57 AM + * To change this template use File | Settings | File Templates. + */ +public class MapResult implements Comparable> { + final Integer id; + final MapType value; + + public MapResult(final int id, final MapType value) { + this.id = id; + this.value = value; + } + + public Integer getId() { + return id; + } + + public MapType getValue() { + return value; + } + + @Override + public int compareTo(MapResult o) { + return getId().compareTo(o.getId()); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java new file mode 100644 index 000000000..48a941515 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -0,0 +1,165 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +/** + * Framework for very fine grained MapReduce parallelism + * + * User: depristo + * Date: 8/24/12 + * Time: 9:47 AM + */ +public class NanoScheduler { + final int bufferSize; + final int nThreads; + final Iterator inputReader; + final MapFunction map; + final ReduceFunction reduce; + + public NanoScheduler(final int bufferSize, + final int nThreads, + final Iterator inputReader, + final MapFunction map, + final ReduceFunction reduce) { + if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize); + if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads); + + this.bufferSize = bufferSize; + this.inputReader = inputReader; + this.map = map; + this.reduce = reduce; + this.nThreads = nThreads; + } + + public int getnThreads() { + return nThreads; + } + + private int getBufferSize() { + return bufferSize; + } + + public ReduceType execute() { + if ( getnThreads() == 1 ) { + return executeSingleThreaded(); + } else { + return executeMultiThreaded(); + } + } + + /** + * Simple efficient reference implementation for single threaded execution + * @return the reduce result of this map/reduce job + */ + private ReduceType executeSingleThreaded() { + ReduceType sum = reduce.init(); + while ( inputReader.hasNext() ) { + final InputType input = inputReader.next(); + final MapType mapValue = map.apply(input); + sum = reduce.apply(mapValue, sum); + } + return sum; + } + + /** + * Efficient parallel version of Map/Reduce + * + * @return the reduce result of this map/reduce job + */ + private ReduceType executeMultiThreaded() { + final ExecutorService executor = Executors.newFixedThreadPool(getnThreads() - 1); + + ReduceType sum = reduce.init(); + while ( inputReader.hasNext() ) { + try { + // read in our input values + final Queue inputs = readInputs(); + + // send jobs for map + final Queue> mapQueue = submitMapJobs(executor, inputs); + + // send off the reduce job, and block until we get at least one reduce result + sum = reduceParallel(mapQueue, sum); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } catch (ExecutionException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + + final List remaining = executor.shutdownNow(); + if ( ! remaining.isEmpty() ) + throw new ReviewedStingException("Remaining tasks found in the executor, unexpected behavior!"); + + return sum; + } + + @Requires("! mapQueue.isEmpty()") + private ReduceType reduceParallel(final Queue> mapQueue, final ReduceType initSum) + throws InterruptedException, ExecutionException { + ReduceType sum = initSum; + + // while mapQueue has something in it to reduce + for ( final Future future : mapQueue ) { + // block until we get the value for this task + final MapType value = future.get(); + sum = reduce.apply(value, sum); + } + + return sum; + } + + /** + * Read up to inputBufferSize elements from inputReader + * + * @return a queue of inputs read in, containing one or more values of InputType read in + */ + @Requires("inputReader.hasNext()") + @Ensures("!result.isEmpty()") + private Queue readInputs() { + int n = 0; + final Queue inputs = new LinkedList(); + while ( inputReader.hasNext() && n < getBufferSize() ) { + final InputType input = inputReader.next(); + inputs.add(input); + n++; + } + return inputs; + } + + @Ensures("result.size() == inputs.size()") + private Queue> submitMapJobs(final ExecutorService executor, final Queue inputs) { + final Queue> mapQueue = new LinkedList>(); + + for ( final InputType input : inputs ) { + final CallableMap doMap = new CallableMap(input); + final Future future = executor.submit(doMap); + mapQueue.add(future); + } + + return mapQueue; + } + + /** + * A simple callable version of the map function for use with the executor pool + */ + private class CallableMap implements Callable { + final InputType input; + + private CallableMap(final InputType input) { + this.input = input; + } + + @Override public MapType call() throws Exception { + return map.apply(input); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java new file mode 100644 index 000000000..274e22aff --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java @@ -0,0 +1,13 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * A function that maps from InputType -> ResultType + * + * User: depristo + * Date: 8/24/12 + * Time: 9:49 AM + */ +public interface ReduceFunction { + public ReduceType init(); + public ReduceType apply(MapType one, ReduceType sum); +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java new file mode 100644 index 000000000..18a9f3340 --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -0,0 +1,93 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * UnitTests for the NanoScheduler + * + * User: depristo + * Date: 8/24/12 + * Time: 11:25 AM + * To change this template use File | Settings | File Templates. + */ +public class NanoSchedulerUnitTest extends BaseTest { + private class Map2x implements MapFunction { + @Override public Integer apply(Integer input) { return input * 2; } + } + + private class ReduceSum implements ReduceFunction { + @Override public Integer init() { return 0; } + @Override public Integer apply(Integer one, Integer sum) { return one + sum; } + } + + private static int sum2x(final int start, final int end) { + int sum = 0; + for ( int i = start; i < end; i++ ) + sum += 2 * i; + return sum; + } + + private class NanoSchedulerBasicTest extends TestDataProvider { + final int bufferSize, nThreads, start, end, expectedResult; + + public NanoSchedulerBasicTest(final int bufferSize, final int nThreads, final int start, final int end) { + super(NanoSchedulerBasicTest.class); + this.bufferSize = bufferSize; + this.nThreads = nThreads; + this.start = start; + this.end = end; + this.expectedResult = sum2x(start, end); + setName(String.format("%s nt=%d buf=%d start=%d end=%d sum=%d", + getClass().getSimpleName(), nThreads, bufferSize, start, end, expectedResult)); + } + + public Iterator makeReader() { + final List ints = new ArrayList(); + for ( int i = start; i < end; i++ ) + ints.add(i); + return ints.iterator(); + } + + public Map2x makeMap() { return new Map2x(); } + public ReduceSum makeReduce() { return new ReduceSum(); } + } + + @DataProvider(name = "NanoSchedulerBasicTest") + public Object[][] createNanoSchedulerBasicTest() { + for ( final int bufferSize : Arrays.asList(1, 10, 10000, 1000000) ) { + for ( final int nt : Arrays.asList(1, 2, 4, 8, 16, 32) ) { + for ( final int start : Arrays.asList(0) ) { + for ( final int end : Arrays.asList(1, 2, 11, 1000000) ) { + new NanoSchedulerBasicTest(bufferSize, nt, start, end); + } + } + } + } + + return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 2000) + public void testNanoSchedulerBasicTest(final NanoSchedulerBasicTest test) throws InterruptedException { + logger.warn("Running " + test); + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.nThreads, + test.makeReader(), test.makeMap(), test.makeReduce()); + final Integer sum = nanoScheduler.execute(); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testNanoSchedulerBasicTest") + public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + logger.warn("Running " + test); + for ( int i = 0; i < 10; i++ ) { + testNanoSchedulerBasicTest(test); + } + } +} From 9de8077eebe9f1ceef2caa8da8170db35acc6692 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 24 Aug 2012 15:34:23 -0400 Subject: [PATCH 6/6] Working (efficient?) implementation of NanoScheduler -- Groups inputs for each thread so that we don't have one thread execution per map() call -- Added shutdown function -- Documentation everywhere -- Code cleanup -- Extensive unittests -- At this point I'm ready to integrate it into the engine for CPU parallel read walkers --- .../org/broadinstitute/sting/utils/Utils.java | 21 ++ .../utils/nanoScheduler/MapFunction.java | 7 + .../sting/utils/nanoScheduler/MapResult.java | 31 --- .../utils/nanoScheduler/NanoScheduler.java | 206 ++++++++++++++---- .../utils/nanoScheduler/ReduceFunction.java | 9 +- .../nanoScheduler/NanoSchedulerUnitTest.java | 93 ++++++-- 6 files changed, 265 insertions(+), 102 deletions(-) delete mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java diff --git a/public/java/src/org/broadinstitute/sting/utils/Utils.java b/public/java/src/org/broadinstitute/sting/utils/Utils.java index a5b5eca6a..74b038032 100755 --- a/public/java/src/org/broadinstitute/sting/utils/Utils.java +++ b/public/java/src/org/broadinstitute/sting/utils/Utils.java @@ -810,4 +810,25 @@ public class Utils { return Collections.unmodifiableMap(map); } + /** + * Divides the input list into a list of sublists, which contains group size elements (except potentially the last one) + * + * list = [A, B, C, D, E] + * groupSize = 2 + * result = [[A, B], [C, D], [E]] + * + * @param list + * @param groupSize + * @return + */ + public static List> groupList(final List list, final int groupSize) { + if ( groupSize < 1 ) throw new IllegalArgumentException("groupSize >= 1"); + + final List> subLists = new LinkedList>(); + int n = list.size(); + for ( int i = 0; i < n; i += groupSize ) { + subLists.add(list.subList(i, Math.min(i + groupSize, n))); + } + return subLists; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java index dd18e09a9..440c263b7 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java @@ -3,10 +3,17 @@ package org.broadinstitute.sting.utils.nanoScheduler; /** * A function that maps from InputType -> ResultType * + * For use with the NanoScheduler + * * User: depristo * Date: 8/24/12 * Time: 9:49 AM */ public interface MapFunction { + /** + * Return function on input, returning a value of ResultType + * @param input + * @return + */ public ResultType apply(final InputType input); } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java deleted file mode 100644 index 90e7c5908..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -/** - * Created with IntelliJ IDEA. - * User: depristo - * Date: 8/24/12 - * Time: 9:57 AM - * To change this template use File | Settings | File Templates. - */ -public class MapResult implements Comparable> { - final Integer id; - final MapType value; - - public MapResult(final int id, final MapType value) { - this.id = id; - this.value = value; - } - - public Integer getId() { - return id; - } - - public MapType getValue() { - return value; - } - - @Override - public int compareTo(MapResult o) { - return getId().compareTo(o.getId()); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 48a941515..fcc6a5723 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -2,6 +2,8 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; @@ -13,45 +15,147 @@ import java.util.concurrent.*; /** * Framework for very fine grained MapReduce parallelism * + * The overall framework works like this + * + * nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads) + * List[Input] outerData : outerDataLoop ) + * result = nano.execute(outerData.iterator(), map, reduce) + * + * bufferSize determines how many elements from the input stream are read in one go by the + * nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well + * as up to inputBufferSize map results as well. + * + * numberOfMapElementsToProcessTogether determines how many input elements are processed + * together each thread cycle. For example, if this value is 10, then the input data + * is grouped together in units of 10 elements each, and map called on each in term. The more + * heavy-weight the map function is, in terms of CPU costs, the more it makes sense to + * have this number be small. The lighter the CPU cost per element, though, the more this + * parameter introduces overhead due to need to context switch among threads to process + * each input element. A value of -1 lets the nanoscheduler guess at a reasonable trade-off value. + * + * nThreads is a bit obvious yes? Note though that the nanoscheduler assumes that it gets 1 thread + * from its client during the execute call, as this call blocks until all work is done. The caller + * thread is put to work by execute to help with the processing of the data. So in reality the + * nanoScheduler only spawn nThreads - 1 additional workers (if this is > 1). + * * User: depristo * Date: 8/24/12 * Time: 9:47 AM */ public class NanoScheduler { - final int bufferSize; - final int nThreads; - final Iterator inputReader; - final MapFunction map; - final ReduceFunction reduce; + private static Logger logger = Logger.getLogger(NanoScheduler.class); + final int bufferSize; + final int mapGroupSize; + final int nThreads; + final ExecutorService executor; + boolean shutdown = false; + + /** + * Create a new nanoschedule with the desire characteristics requested by the argument + * + * @param bufferSize the number of input elements to read in each scheduling cycle. + * @param mapGroupSize How many inputs should be grouped together per map? If -1 we make a reasonable guess + * @param nThreads the number of threads to use to get work done, in addition to the thread calling execute + */ public NanoScheduler(final int bufferSize, - final int nThreads, - final Iterator inputReader, - final MapFunction map, - final ReduceFunction reduce) { + final int mapGroupSize, + final int nThreads) { if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize); if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads); + if ( mapGroupSize > bufferSize ) throw new IllegalArgumentException("mapGroupSize " + mapGroupSize + " must be <= bufferSize " + bufferSize); + if ( mapGroupSize == 0 || mapGroupSize < -1 ) throw new IllegalArgumentException("mapGroupSize cannot be <= 0" + mapGroupSize); + this.bufferSize = bufferSize; - this.inputReader = inputReader; - this.map = map; - this.reduce = reduce; this.nThreads = nThreads; + + if ( mapGroupSize == -1 ) { + this.mapGroupSize = (int)Math.ceil(this.bufferSize / (10.0*this.nThreads)); + logger.info(String.format("Dynamically setting grouping size to %d based on buffer size %d and n threads %d", + this.mapGroupSize, this.bufferSize, this.nThreads)); + } else { + this.mapGroupSize = mapGroupSize; + } + + this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads - 1); } + /** + * The number of parallel map threads in use with this NanoScheduler + * @return + */ public int getnThreads() { return nThreads; } - private int getBufferSize() { + /** + * The input buffer size used by this NanoScheduler + * @return + */ + public int getBufferSize() { return bufferSize; } - public ReduceType execute() { + /** + * The grouping size used by this NanoScheduler + * @return + */ + public int getMapGroupSize() { + return mapGroupSize; + } + + /** + * Tells this nanoScheduler to shutdown immediately, releasing all its resources. + * + * After this call, execute cannot be invoked without throwing an error + */ + public void shutdown() { + if ( executor != null ) { + final List remaining = executor.shutdownNow(); + if ( ! remaining.isEmpty() ) + throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); + } + shutdown = true; + } + + /** + * @return true if this nanoScheduler is shutdown, or false if its still open for business + */ + public boolean isShutdown() { + return shutdown; + } + + /** + * Execute a map/reduce job with this nanoScheduler + * + * Data comes from inputReader. Will be read until hasNext() == false. + * map is called on each element provided by inputReader. No order of operations is guarenteed + * reduce is called in order of the input data provided by inputReader on the result of map() applied + * to each element. + * + * Note that the caller thread is put to work with this function call. The call doesn't return + * until all elements have been processes. + * + * It is safe to call this function repeatedly on a single nanoScheduler, at least until the + * shutdown method is called. + * + * @param inputReader + * @param map + * @param reduce + * @return + */ + public ReduceType execute(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + if ( isShutdown() ) + throw new IllegalStateException("execute called on already shutdown NanoScheduler"); + if ( getnThreads() == 1 ) { - return executeSingleThreaded(); + return executeSingleThreaded(inputReader, map, initialValue, reduce); } else { - return executeMultiThreaded(); + return executeMultiThreaded(inputReader, map, initialValue, reduce); } } @@ -59,8 +163,11 @@ public class NanoScheduler { * Simple efficient reference implementation for single threaded execution * @return the reduce result of this map/reduce job */ - private ReduceType executeSingleThreaded() { - ReduceType sum = reduce.init(); + private ReduceType executeSingleThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; while ( inputReader.hasNext() ) { final InputType input = inputReader.next(); final MapType mapValue = map.apply(input); @@ -74,20 +181,21 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ - private ReduceType executeMultiThreaded() { - final ExecutorService executor = Executors.newFixedThreadPool(getnThreads() - 1); - - ReduceType sum = reduce.init(); + private ReduceType executeMultiThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; while ( inputReader.hasNext() ) { try { // read in our input values - final Queue inputs = readInputs(); + final List inputs = readInputs(inputReader); // send jobs for map - final Queue> mapQueue = submitMapJobs(executor, inputs); + final Queue>> mapQueue = submitMapJobs(map, executor, inputs); // send off the reduce job, and block until we get at least one reduce result - sum = reduceParallel(mapQueue, sum); + sum = reduceParallel(reduce, mapQueue, sum); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); } catch (ExecutionException ex) { @@ -95,23 +203,20 @@ public class NanoScheduler { } } - final List remaining = executor.shutdownNow(); - if ( ! remaining.isEmpty() ) - throw new ReviewedStingException("Remaining tasks found in the executor, unexpected behavior!"); - return sum; } @Requires("! mapQueue.isEmpty()") - private ReduceType reduceParallel(final Queue> mapQueue, final ReduceType initSum) + private ReduceType reduceParallel(final ReduceFunction reduce, + final Queue>> mapQueue, + final ReduceType initSum) throws InterruptedException, ExecutionException { ReduceType sum = initSum; // while mapQueue has something in it to reduce - for ( final Future future : mapQueue ) { - // block until we get the value for this task - final MapType value = future.get(); - sum = reduce.apply(value, sum); + for ( final Future> future : mapQueue ) { + for ( final MapType value : future.get() ) // block until we get the values for this task + sum = reduce.apply(value, sum); } return sum; @@ -124,9 +229,9 @@ public class NanoScheduler { */ @Requires("inputReader.hasNext()") @Ensures("!result.isEmpty()") - private Queue readInputs() { + private List readInputs(final Iterator inputReader) { int n = 0; - final Queue inputs = new LinkedList(); + final List inputs = new LinkedList(); while ( inputReader.hasNext() && n < getBufferSize() ) { final InputType input = inputReader.next(); inputs.add(input); @@ -136,12 +241,14 @@ public class NanoScheduler { } @Ensures("result.size() == inputs.size()") - private Queue> submitMapJobs(final ExecutorService executor, final Queue inputs) { - final Queue> mapQueue = new LinkedList>(); + private Queue>> submitMapJobs(final MapFunction map, + final ExecutorService executor, + final List inputs) { + final Queue>> mapQueue = new LinkedList>>(); - for ( final InputType input : inputs ) { - final CallableMap doMap = new CallableMap(input); - final Future future = executor.submit(doMap); + for ( final List subinputs : Utils.groupList(inputs, getMapGroupSize()) ) { + final CallableMap doMap = new CallableMap(map, subinputs); + final Future> future = executor.submit(doMap); mapQueue.add(future); } @@ -151,15 +258,20 @@ public class NanoScheduler { /** * A simple callable version of the map function for use with the executor pool */ - private class CallableMap implements Callable { - final InputType input; + private class CallableMap implements Callable> { + final List inputs; + final MapFunction map; - private CallableMap(final InputType input) { - this.input = input; + private CallableMap(final MapFunction map, final List inputs) { + this.inputs = inputs; + this.map = map; } - @Override public MapType call() throws Exception { - return map.apply(input); + @Override public List call() throws Exception { + final List outputs = new LinkedList(); + for ( final InputType input : inputs ) + outputs.add(map.apply(input)); + return outputs; } } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java index 274e22aff..8f1b0eddd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java @@ -1,13 +1,18 @@ package org.broadinstitute.sting.utils.nanoScheduler; /** - * A function that maps from InputType -> ResultType + * A function that combines a value of MapType with an existing ReduceValue into a new ResultType * * User: depristo * Date: 8/24/12 * Time: 9:49 AM */ public interface ReduceFunction { - public ReduceType init(); + /** + * Combine one with sum into a new ReduceType + * @param one the result of a map call on an input element + * @param sum the cumulative reduce result over all previous map calls + * @return + */ public ReduceType apply(MapType one, ReduceType sum); } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 18a9f3340..211e43dc1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -21,7 +21,6 @@ public class NanoSchedulerUnitTest extends BaseTest { } private class ReduceSum implements ReduceFunction { - @Override public Integer init() { return 0; } @Override public Integer apply(Integer one, Integer sum) { return one + sum; } } @@ -33,17 +32,18 @@ public class NanoSchedulerUnitTest extends BaseTest { } private class NanoSchedulerBasicTest extends TestDataProvider { - final int bufferSize, nThreads, start, end, expectedResult; + final int bufferSize, mapGroupSize, nThreads, start, end, expectedResult; - public NanoSchedulerBasicTest(final int bufferSize, final int nThreads, final int start, final int end) { + public NanoSchedulerBasicTest(final int bufferSize, final int mapGroupSize, final int nThreads, final int start, final int end) { super(NanoSchedulerBasicTest.class); this.bufferSize = bufferSize; + this.mapGroupSize = mapGroupSize; this.nThreads = nThreads; this.start = start; this.end = end; this.expectedResult = sum2x(start, end); - setName(String.format("%s nt=%d buf=%d start=%d end=%d sum=%d", - getClass().getSimpleName(), nThreads, bufferSize, start, end, expectedResult)); + setName(String.format("%s nt=%d buf=%d mapGroupSize=%d start=%d end=%d sum=%d", + getClass().getSimpleName(), nThreads, bufferSize, mapGroupSize, start, end, expectedResult)); } public Iterator makeReader() { @@ -54,16 +54,22 @@ public class NanoSchedulerUnitTest extends BaseTest { } public Map2x makeMap() { return new Map2x(); } + public Integer initReduce() { return 0; } public ReduceSum makeReduce() { return new ReduceSum(); } } + static NanoSchedulerBasicTest exampleTest = null; @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { - for ( final int bufferSize : Arrays.asList(1, 10, 10000, 1000000) ) { - for ( final int nt : Arrays.asList(1, 2, 4, 8, 16, 32) ) { - for ( final int start : Arrays.asList(0) ) { - for ( final int end : Arrays.asList(1, 2, 11, 1000000) ) { - new NanoSchedulerBasicTest(bufferSize, nt, start, end); + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int mapGroupSize : Arrays.asList(-1, 1, 10, 100, 1000) ) { + if ( mapGroupSize <= bufferSize ) { + for ( final int nt : Arrays.asList(1, 2, 4) ) { + for ( final int start : Arrays.asList(0) ) { + for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) { + exampleTest = new NanoSchedulerBasicTest(bufferSize, mapGroupSize, nt, start, end); + } + } } } } @@ -72,22 +78,65 @@ public class NanoSchedulerUnitTest extends BaseTest { return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class); } - @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 2000) - public void testNanoSchedulerBasicTest(final NanoSchedulerBasicTest test) throws InterruptedException { + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest") + public void testSingleThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { logger.warn("Running " + test); - final NanoScheduler nanoScheduler = - new NanoScheduler(test.bufferSize, test.nThreads, - test.makeReader(), test.makeMap(), test.makeReduce()); - final Integer sum = nanoScheduler.execute(); - Assert.assertNotNull(sum); - Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + if ( test.nThreads == 1 ) + testNanoScheduler(test); } - @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testNanoSchedulerBasicTest") - public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testSingleThreadedNanoScheduler") + public void testMultiThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { logger.warn("Running " + test); - for ( int i = 0; i < 10; i++ ) { - testNanoSchedulerBasicTest(test); + if ( test.nThreads >= 1 ) + testNanoScheduler(test); + } + + private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument"); + Assert.assertTrue(nanoScheduler.getMapGroupSize() >= test.mapGroupSize, "mapGroupSize argument"); + Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument"); + + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + nanoScheduler.shutdown(); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler") + public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + if ( test.bufferSize > 1 && (test.mapGroupSize > 1 || test.mapGroupSize == -1)) { + logger.warn("Running " + test); + + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + // test reusing the scheduler + for ( int i = 0; i < 10; i++ ) { + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + } + + nanoScheduler.shutdown(); } } + + @Test() + public void testShutdown() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + Assert.assertFalse(nanoScheduler.isShutdown(), "scheduler should be alive"); + nanoScheduler.shutdown(); + Assert.assertTrue(nanoScheduler.isShutdown(), "scheduler should be dead"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testShutdownExecuteFailure() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + nanoScheduler.shutdown(); + nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); + } }