Initial implementation of GSA-515: Nanoscheduler

– Write general NanoScheduler framework in utils.threading. Test with reading via iterator from list of integers, map is int * 2, reduce is sum. Should be efficiency using resources to do sum of 2 * (sum(1 - X)).

Done!

CPU parallelism is nano threads. Pfor across read / map / reduce. Use work queue to implement.
Create general read map reduce framework in utils. Test parallelism independently before hooking up to Locus iterator
Represent explicitly the dependency graph. Scheduler should choose the work units that are ready for computation, that are marked as "completing a computation", and then finally that maximize the number of sequent available work units. May be worth measuring expected cost for read read / map / reduce unit and use it to balance the compute
As input is single threaded just need one thread to populate inputs, which runs as fast as possible on parallel pushing data to fixed size queue. Each push creates map job and links to upcoming reduce job.
Note that there's at most one thread for IO tasks, and all of the threads can contribute to CPU tasks
This commit is contained in:
Mark DePristo 2012-08-24 14:07:44 -04:00
parent b3fd74f0c4
commit d6e6b30caf
5 changed files with 314 additions and 0 deletions

View File

@ -0,0 +1,12 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* A function that maps from InputType -> ResultType
*
* User: depristo
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface MapFunction<InputType, ResultType> {
public ResultType apply(final InputType input);
}

View File

@ -0,0 +1,31 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 8/24/12
* Time: 9:57 AM
* To change this template use File | Settings | File Templates.
*/
public class MapResult<MapType> implements Comparable<MapResult<MapType>> {
final Integer id;
final MapType value;
public MapResult(final int id, final MapType value) {
this.id = id;
this.value = value;
}
public Integer getId() {
return id;
}
public MapType getValue() {
return value;
}
@Override
public int compareTo(MapResult<MapType> o) {
return getId().compareTo(o.getId());
}
}

View File

@ -0,0 +1,165 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
/**
* Framework for very fine grained MapReduce parallelism
*
* User: depristo
* Date: 8/24/12
* Time: 9:47 AM
*/
public class NanoScheduler<InputType, MapType, ReduceType> {
final int bufferSize;
final int nThreads;
final Iterator<InputType> inputReader;
final MapFunction<InputType, MapType> map;
final ReduceFunction<MapType, ReduceType> reduce;
public NanoScheduler(final int bufferSize,
final int nThreads,
final Iterator<InputType> inputReader,
final MapFunction<InputType, MapType> map,
final ReduceFunction<MapType, ReduceType> reduce) {
if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize);
if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads);
this.bufferSize = bufferSize;
this.inputReader = inputReader;
this.map = map;
this.reduce = reduce;
this.nThreads = nThreads;
}
public int getnThreads() {
return nThreads;
}
private int getBufferSize() {
return bufferSize;
}
public ReduceType execute() {
if ( getnThreads() == 1 ) {
return executeSingleThreaded();
} else {
return executeMultiThreaded();
}
}
/**
* Simple efficient reference implementation for single threaded execution
* @return the reduce result of this map/reduce job
*/
private ReduceType executeSingleThreaded() {
ReduceType sum = reduce.init();
while ( inputReader.hasNext() ) {
final InputType input = inputReader.next();
final MapType mapValue = map.apply(input);
sum = reduce.apply(mapValue, sum);
}
return sum;
}
/**
* Efficient parallel version of Map/Reduce
*
* @return the reduce result of this map/reduce job
*/
private ReduceType executeMultiThreaded() {
final ExecutorService executor = Executors.newFixedThreadPool(getnThreads() - 1);
ReduceType sum = reduce.init();
while ( inputReader.hasNext() ) {
try {
// read in our input values
final Queue<InputType> inputs = readInputs();
// send jobs for map
final Queue<Future<MapType>> mapQueue = submitMapJobs(executor, inputs);
// send off the reduce job, and block until we get at least one reduce result
sum = reduceParallel(mapQueue, sum);
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
} catch (ExecutionException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
final List<Runnable> remaining = executor.shutdownNow();
if ( ! remaining.isEmpty() )
throw new ReviewedStingException("Remaining tasks found in the executor, unexpected behavior!");
return sum;
}
@Requires("! mapQueue.isEmpty()")
private ReduceType reduceParallel(final Queue<Future<MapType>> mapQueue, final ReduceType initSum)
throws InterruptedException, ExecutionException {
ReduceType sum = initSum;
// while mapQueue has something in it to reduce
for ( final Future<MapType> future : mapQueue ) {
// block until we get the value for this task
final MapType value = future.get();
sum = reduce.apply(value, sum);
}
return sum;
}
/**
* Read up to inputBufferSize elements from inputReader
*
* @return a queue of inputs read in, containing one or more values of InputType read in
*/
@Requires("inputReader.hasNext()")
@Ensures("!result.isEmpty()")
private Queue<InputType> readInputs() {
int n = 0;
final Queue<InputType> inputs = new LinkedList<InputType>();
while ( inputReader.hasNext() && n < getBufferSize() ) {
final InputType input = inputReader.next();
inputs.add(input);
n++;
}
return inputs;
}
@Ensures("result.size() == inputs.size()")
private Queue<Future<MapType>> submitMapJobs(final ExecutorService executor, final Queue<InputType> inputs) {
final Queue<Future<MapType>> mapQueue = new LinkedList<Future<MapType>>();
for ( final InputType input : inputs ) {
final CallableMap doMap = new CallableMap(input);
final Future<MapType> future = executor.submit(doMap);
mapQueue.add(future);
}
return mapQueue;
}
/**
* A simple callable version of the map function for use with the executor pool
*/
private class CallableMap implements Callable<MapType> {
final InputType input;
private CallableMap(final InputType input) {
this.input = input;
}
@Override public MapType call() throws Exception {
return map.apply(input);
}
}
}

View File

@ -0,0 +1,13 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* A function that maps from InputType -> ResultType
*
* User: depristo
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface ReduceFunction<MapType, ReduceType> {
public ReduceType init();
public ReduceType apply(MapType one, ReduceType sum);
}

View File

@ -0,0 +1,93 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.*;
/**
* UnitTests for the NanoScheduler
*
* User: depristo
* Date: 8/24/12
* Time: 11:25 AM
* To change this template use File | Settings | File Templates.
*/
public class NanoSchedulerUnitTest extends BaseTest {
private class Map2x implements MapFunction<Integer, Integer> {
@Override public Integer apply(Integer input) { return input * 2; }
}
private class ReduceSum implements ReduceFunction<Integer, Integer> {
@Override public Integer init() { return 0; }
@Override public Integer apply(Integer one, Integer sum) { return one + sum; }
}
private static int sum2x(final int start, final int end) {
int sum = 0;
for ( int i = start; i < end; i++ )
sum += 2 * i;
return sum;
}
private class NanoSchedulerBasicTest extends TestDataProvider {
final int bufferSize, nThreads, start, end, expectedResult;
public NanoSchedulerBasicTest(final int bufferSize, final int nThreads, final int start, final int end) {
super(NanoSchedulerBasicTest.class);
this.bufferSize = bufferSize;
this.nThreads = nThreads;
this.start = start;
this.end = end;
this.expectedResult = sum2x(start, end);
setName(String.format("%s nt=%d buf=%d start=%d end=%d sum=%d",
getClass().getSimpleName(), nThreads, bufferSize, start, end, expectedResult));
}
public Iterator<Integer> makeReader() {
final List<Integer> ints = new ArrayList<Integer>();
for ( int i = start; i < end; i++ )
ints.add(i);
return ints.iterator();
}
public Map2x makeMap() { return new Map2x(); }
public ReduceSum makeReduce() { return new ReduceSum(); }
}
@DataProvider(name = "NanoSchedulerBasicTest")
public Object[][] createNanoSchedulerBasicTest() {
for ( final int bufferSize : Arrays.asList(1, 10, 10000, 1000000) ) {
for ( final int nt : Arrays.asList(1, 2, 4, 8, 16, 32) ) {
for ( final int start : Arrays.asList(0) ) {
for ( final int end : Arrays.asList(1, 2, 11, 1000000) ) {
new NanoSchedulerBasicTest(bufferSize, nt, start, end);
}
}
}
}
return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class);
}
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 2000)
public void testNanoSchedulerBasicTest(final NanoSchedulerBasicTest test) throws InterruptedException {
logger.warn("Running " + test);
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads,
test.makeReader(), test.makeMap(), test.makeReduce());
final Integer sum = nanoScheduler.execute();
Assert.assertNotNull(sum);
Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly");
}
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testNanoSchedulerBasicTest")
public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException {
logger.warn("Running " + test);
for ( int i = 0; i < 10; i++ ) {
testNanoSchedulerBasicTest(test);
}
}
}