From 9de8077eebe9f1ceef2caa8da8170db35acc6692 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 24 Aug 2012 15:34:23 -0400 Subject: [PATCH] Working (efficient?) implementation of NanoScheduler -- Groups inputs for each thread so that we don't have one thread execution per map() call -- Added shutdown function -- Documentation everywhere -- Code cleanup -- Extensive unittests -- At this point I'm ready to integrate it into the engine for CPU parallel read walkers --- .../org/broadinstitute/sting/utils/Utils.java | 21 ++ .../utils/nanoScheduler/MapFunction.java | 7 + .../sting/utils/nanoScheduler/MapResult.java | 31 --- .../utils/nanoScheduler/NanoScheduler.java | 206 ++++++++++++++---- .../utils/nanoScheduler/ReduceFunction.java | 9 +- .../nanoScheduler/NanoSchedulerUnitTest.java | 93 ++++++-- 6 files changed, 265 insertions(+), 102 deletions(-) delete mode 100644 public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java diff --git a/public/java/src/org/broadinstitute/sting/utils/Utils.java b/public/java/src/org/broadinstitute/sting/utils/Utils.java index a5b5eca6a..74b038032 100755 --- a/public/java/src/org/broadinstitute/sting/utils/Utils.java +++ b/public/java/src/org/broadinstitute/sting/utils/Utils.java @@ -810,4 +810,25 @@ public class Utils { return Collections.unmodifiableMap(map); } + /** + * Divides the input list into a list of sublists, which contains group size elements (except potentially the last one) + * + * list = [A, B, C, D, E] + * groupSize = 2 + * result = [[A, B], [C, D], [E]] + * + * @param list + * @param groupSize + * @return + */ + public static List> groupList(final List list, final int groupSize) { + if ( groupSize < 1 ) throw new IllegalArgumentException("groupSize >= 1"); + + final List> subLists = new LinkedList>(); + int n = list.size(); + for ( int i = 0; i < n; i += groupSize ) { + subLists.add(list.subList(i, Math.min(i + groupSize, n))); + } + return subLists; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java index dd18e09a9..440c263b7 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java @@ -3,10 +3,17 @@ package org.broadinstitute.sting.utils.nanoScheduler; /** * A function that maps from InputType -> ResultType * + * For use with the NanoScheduler + * * User: depristo * Date: 8/24/12 * Time: 9:49 AM */ public interface MapFunction { + /** + * Return function on input, returning a value of ResultType + * @param input + * @return + */ public ResultType apply(final InputType input); } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java deleted file mode 100644 index 90e7c5908..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.broadinstitute.sting.utils.nanoScheduler; - -/** - * Created with IntelliJ IDEA. - * User: depristo - * Date: 8/24/12 - * Time: 9:57 AM - * To change this template use File | Settings | File Templates. - */ -public class MapResult implements Comparable> { - final Integer id; - final MapType value; - - public MapResult(final int id, final MapType value) { - this.id = id; - this.value = value; - } - - public Integer getId() { - return id; - } - - public MapType getValue() { - return value; - } - - @Override - public int compareTo(MapResult o) { - return getId().compareTo(o.getId()); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 48a941515..fcc6a5723 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -2,6 +2,8 @@ package org.broadinstitute.sting.utils.nanoScheduler; import com.google.java.contract.Ensures; import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.Iterator; @@ -13,45 +15,147 @@ import java.util.concurrent.*; /** * Framework for very fine grained MapReduce parallelism * + * The overall framework works like this + * + * nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads) + * List[Input] outerData : outerDataLoop ) + * result = nano.execute(outerData.iterator(), map, reduce) + * + * bufferSize determines how many elements from the input stream are read in one go by the + * nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well + * as up to inputBufferSize map results as well. + * + * numberOfMapElementsToProcessTogether determines how many input elements are processed + * together each thread cycle. For example, if this value is 10, then the input data + * is grouped together in units of 10 elements each, and map called on each in term. The more + * heavy-weight the map function is, in terms of CPU costs, the more it makes sense to + * have this number be small. The lighter the CPU cost per element, though, the more this + * parameter introduces overhead due to need to context switch among threads to process + * each input element. A value of -1 lets the nanoscheduler guess at a reasonable trade-off value. + * + * nThreads is a bit obvious yes? Note though that the nanoscheduler assumes that it gets 1 thread + * from its client during the execute call, as this call blocks until all work is done. The caller + * thread is put to work by execute to help with the processing of the data. So in reality the + * nanoScheduler only spawn nThreads - 1 additional workers (if this is > 1). + * * User: depristo * Date: 8/24/12 * Time: 9:47 AM */ public class NanoScheduler { - final int bufferSize; - final int nThreads; - final Iterator inputReader; - final MapFunction map; - final ReduceFunction reduce; + private static Logger logger = Logger.getLogger(NanoScheduler.class); + final int bufferSize; + final int mapGroupSize; + final int nThreads; + final ExecutorService executor; + boolean shutdown = false; + + /** + * Create a new nanoschedule with the desire characteristics requested by the argument + * + * @param bufferSize the number of input elements to read in each scheduling cycle. + * @param mapGroupSize How many inputs should be grouped together per map? If -1 we make a reasonable guess + * @param nThreads the number of threads to use to get work done, in addition to the thread calling execute + */ public NanoScheduler(final int bufferSize, - final int nThreads, - final Iterator inputReader, - final MapFunction map, - final ReduceFunction reduce) { + final int mapGroupSize, + final int nThreads) { if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize); if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads); + if ( mapGroupSize > bufferSize ) throw new IllegalArgumentException("mapGroupSize " + mapGroupSize + " must be <= bufferSize " + bufferSize); + if ( mapGroupSize == 0 || mapGroupSize < -1 ) throw new IllegalArgumentException("mapGroupSize cannot be <= 0" + mapGroupSize); + this.bufferSize = bufferSize; - this.inputReader = inputReader; - this.map = map; - this.reduce = reduce; this.nThreads = nThreads; + + if ( mapGroupSize == -1 ) { + this.mapGroupSize = (int)Math.ceil(this.bufferSize / (10.0*this.nThreads)); + logger.info(String.format("Dynamically setting grouping size to %d based on buffer size %d and n threads %d", + this.mapGroupSize, this.bufferSize, this.nThreads)); + } else { + this.mapGroupSize = mapGroupSize; + } + + this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads - 1); } + /** + * The number of parallel map threads in use with this NanoScheduler + * @return + */ public int getnThreads() { return nThreads; } - private int getBufferSize() { + /** + * The input buffer size used by this NanoScheduler + * @return + */ + public int getBufferSize() { return bufferSize; } - public ReduceType execute() { + /** + * The grouping size used by this NanoScheduler + * @return + */ + public int getMapGroupSize() { + return mapGroupSize; + } + + /** + * Tells this nanoScheduler to shutdown immediately, releasing all its resources. + * + * After this call, execute cannot be invoked without throwing an error + */ + public void shutdown() { + if ( executor != null ) { + final List remaining = executor.shutdownNow(); + if ( ! remaining.isEmpty() ) + throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); + } + shutdown = true; + } + + /** + * @return true if this nanoScheduler is shutdown, or false if its still open for business + */ + public boolean isShutdown() { + return shutdown; + } + + /** + * Execute a map/reduce job with this nanoScheduler + * + * Data comes from inputReader. Will be read until hasNext() == false. + * map is called on each element provided by inputReader. No order of operations is guarenteed + * reduce is called in order of the input data provided by inputReader on the result of map() applied + * to each element. + * + * Note that the caller thread is put to work with this function call. The call doesn't return + * until all elements have been processes. + * + * It is safe to call this function repeatedly on a single nanoScheduler, at least until the + * shutdown method is called. + * + * @param inputReader + * @param map + * @param reduce + * @return + */ + public ReduceType execute(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + if ( isShutdown() ) + throw new IllegalStateException("execute called on already shutdown NanoScheduler"); + if ( getnThreads() == 1 ) { - return executeSingleThreaded(); + return executeSingleThreaded(inputReader, map, initialValue, reduce); } else { - return executeMultiThreaded(); + return executeMultiThreaded(inputReader, map, initialValue, reduce); } } @@ -59,8 +163,11 @@ public class NanoScheduler { * Simple efficient reference implementation for single threaded execution * @return the reduce result of this map/reduce job */ - private ReduceType executeSingleThreaded() { - ReduceType sum = reduce.init(); + private ReduceType executeSingleThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; while ( inputReader.hasNext() ) { final InputType input = inputReader.next(); final MapType mapValue = map.apply(input); @@ -74,20 +181,21 @@ public class NanoScheduler { * * @return the reduce result of this map/reduce job */ - private ReduceType executeMultiThreaded() { - final ExecutorService executor = Executors.newFixedThreadPool(getnThreads() - 1); - - ReduceType sum = reduce.init(); + private ReduceType executeMultiThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; while ( inputReader.hasNext() ) { try { // read in our input values - final Queue inputs = readInputs(); + final List inputs = readInputs(inputReader); // send jobs for map - final Queue> mapQueue = submitMapJobs(executor, inputs); + final Queue>> mapQueue = submitMapJobs(map, executor, inputs); // send off the reduce job, and block until we get at least one reduce result - sum = reduceParallel(mapQueue, sum); + sum = reduceParallel(reduce, mapQueue, sum); } catch (InterruptedException ex) { throw new ReviewedStingException("got execution exception", ex); } catch (ExecutionException ex) { @@ -95,23 +203,20 @@ public class NanoScheduler { } } - final List remaining = executor.shutdownNow(); - if ( ! remaining.isEmpty() ) - throw new ReviewedStingException("Remaining tasks found in the executor, unexpected behavior!"); - return sum; } @Requires("! mapQueue.isEmpty()") - private ReduceType reduceParallel(final Queue> mapQueue, final ReduceType initSum) + private ReduceType reduceParallel(final ReduceFunction reduce, + final Queue>> mapQueue, + final ReduceType initSum) throws InterruptedException, ExecutionException { ReduceType sum = initSum; // while mapQueue has something in it to reduce - for ( final Future future : mapQueue ) { - // block until we get the value for this task - final MapType value = future.get(); - sum = reduce.apply(value, sum); + for ( final Future> future : mapQueue ) { + for ( final MapType value : future.get() ) // block until we get the values for this task + sum = reduce.apply(value, sum); } return sum; @@ -124,9 +229,9 @@ public class NanoScheduler { */ @Requires("inputReader.hasNext()") @Ensures("!result.isEmpty()") - private Queue readInputs() { + private List readInputs(final Iterator inputReader) { int n = 0; - final Queue inputs = new LinkedList(); + final List inputs = new LinkedList(); while ( inputReader.hasNext() && n < getBufferSize() ) { final InputType input = inputReader.next(); inputs.add(input); @@ -136,12 +241,14 @@ public class NanoScheduler { } @Ensures("result.size() == inputs.size()") - private Queue> submitMapJobs(final ExecutorService executor, final Queue inputs) { - final Queue> mapQueue = new LinkedList>(); + private Queue>> submitMapJobs(final MapFunction map, + final ExecutorService executor, + final List inputs) { + final Queue>> mapQueue = new LinkedList>>(); - for ( final InputType input : inputs ) { - final CallableMap doMap = new CallableMap(input); - final Future future = executor.submit(doMap); + for ( final List subinputs : Utils.groupList(inputs, getMapGroupSize()) ) { + final CallableMap doMap = new CallableMap(map, subinputs); + final Future> future = executor.submit(doMap); mapQueue.add(future); } @@ -151,15 +258,20 @@ public class NanoScheduler { /** * A simple callable version of the map function for use with the executor pool */ - private class CallableMap implements Callable { - final InputType input; + private class CallableMap implements Callable> { + final List inputs; + final MapFunction map; - private CallableMap(final InputType input) { - this.input = input; + private CallableMap(final MapFunction map, final List inputs) { + this.inputs = inputs; + this.map = map; } - @Override public MapType call() throws Exception { - return map.apply(input); + @Override public List call() throws Exception { + final List outputs = new LinkedList(); + for ( final InputType input : inputs ) + outputs.add(map.apply(input)); + return outputs; } } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java index 274e22aff..8f1b0eddd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java @@ -1,13 +1,18 @@ package org.broadinstitute.sting.utils.nanoScheduler; /** - * A function that maps from InputType -> ResultType + * A function that combines a value of MapType with an existing ReduceValue into a new ResultType * * User: depristo * Date: 8/24/12 * Time: 9:49 AM */ public interface ReduceFunction { - public ReduceType init(); + /** + * Combine one with sum into a new ReduceType + * @param one the result of a map call on an input element + * @param sum the cumulative reduce result over all previous map calls + * @return + */ public ReduceType apply(MapType one, ReduceType sum); } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 18a9f3340..211e43dc1 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -21,7 +21,6 @@ public class NanoSchedulerUnitTest extends BaseTest { } private class ReduceSum implements ReduceFunction { - @Override public Integer init() { return 0; } @Override public Integer apply(Integer one, Integer sum) { return one + sum; } } @@ -33,17 +32,18 @@ public class NanoSchedulerUnitTest extends BaseTest { } private class NanoSchedulerBasicTest extends TestDataProvider { - final int bufferSize, nThreads, start, end, expectedResult; + final int bufferSize, mapGroupSize, nThreads, start, end, expectedResult; - public NanoSchedulerBasicTest(final int bufferSize, final int nThreads, final int start, final int end) { + public NanoSchedulerBasicTest(final int bufferSize, final int mapGroupSize, final int nThreads, final int start, final int end) { super(NanoSchedulerBasicTest.class); this.bufferSize = bufferSize; + this.mapGroupSize = mapGroupSize; this.nThreads = nThreads; this.start = start; this.end = end; this.expectedResult = sum2x(start, end); - setName(String.format("%s nt=%d buf=%d start=%d end=%d sum=%d", - getClass().getSimpleName(), nThreads, bufferSize, start, end, expectedResult)); + setName(String.format("%s nt=%d buf=%d mapGroupSize=%d start=%d end=%d sum=%d", + getClass().getSimpleName(), nThreads, bufferSize, mapGroupSize, start, end, expectedResult)); } public Iterator makeReader() { @@ -54,16 +54,22 @@ public class NanoSchedulerUnitTest extends BaseTest { } public Map2x makeMap() { return new Map2x(); } + public Integer initReduce() { return 0; } public ReduceSum makeReduce() { return new ReduceSum(); } } + static NanoSchedulerBasicTest exampleTest = null; @DataProvider(name = "NanoSchedulerBasicTest") public Object[][] createNanoSchedulerBasicTest() { - for ( final int bufferSize : Arrays.asList(1, 10, 10000, 1000000) ) { - for ( final int nt : Arrays.asList(1, 2, 4, 8, 16, 32) ) { - for ( final int start : Arrays.asList(0) ) { - for ( final int end : Arrays.asList(1, 2, 11, 1000000) ) { - new NanoSchedulerBasicTest(bufferSize, nt, start, end); + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int mapGroupSize : Arrays.asList(-1, 1, 10, 100, 1000) ) { + if ( mapGroupSize <= bufferSize ) { + for ( final int nt : Arrays.asList(1, 2, 4) ) { + for ( final int start : Arrays.asList(0) ) { + for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) { + exampleTest = new NanoSchedulerBasicTest(bufferSize, mapGroupSize, nt, start, end); + } + } } } } @@ -72,22 +78,65 @@ public class NanoSchedulerUnitTest extends BaseTest { return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class); } - @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 2000) - public void testNanoSchedulerBasicTest(final NanoSchedulerBasicTest test) throws InterruptedException { + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest") + public void testSingleThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { logger.warn("Running " + test); - final NanoScheduler nanoScheduler = - new NanoScheduler(test.bufferSize, test.nThreads, - test.makeReader(), test.makeMap(), test.makeReduce()); - final Integer sum = nanoScheduler.execute(); - Assert.assertNotNull(sum); - Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + if ( test.nThreads == 1 ) + testNanoScheduler(test); } - @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testNanoSchedulerBasicTest") - public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testSingleThreadedNanoScheduler") + public void testMultiThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { logger.warn("Running " + test); - for ( int i = 0; i < 10; i++ ) { - testNanoSchedulerBasicTest(test); + if ( test.nThreads >= 1 ) + testNanoScheduler(test); + } + + private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument"); + Assert.assertTrue(nanoScheduler.getMapGroupSize() >= test.mapGroupSize, "mapGroupSize argument"); + Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument"); + + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + nanoScheduler.shutdown(); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler") + public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + if ( test.bufferSize > 1 && (test.mapGroupSize > 1 || test.mapGroupSize == -1)) { + logger.warn("Running " + test); + + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + // test reusing the scheduler + for ( int i = 0; i < 10; i++ ) { + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + } + + nanoScheduler.shutdown(); } } + + @Test() + public void testShutdown() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + Assert.assertFalse(nanoScheduler.isShutdown(), "scheduler should be alive"); + nanoScheduler.shutdown(); + Assert.assertTrue(nanoScheduler.isShutdown(), "scheduler should be dead"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testShutdownExecuteFailure() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + nanoScheduler.shutdown(); + nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); + } }