Merge branch 'nanoScheduler'

Conflicts:
	private/scala/qscript/org/broadinstitute/sting/queue/qscripts/performance/GATKPerformanceOverTime.scala
This commit is contained in:
Mark DePristo 2012-08-24 15:59:36 -04:00
commit 649b82ce85
6 changed files with 480 additions and 4 deletions

View File

@ -28,8 +28,10 @@ package org.broadinstitute.sting.gatk.walkers.haplotypecaller;
import com.google.java.contract.Ensures;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.arguments.StandardCallerArgumentCollection;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.gatk.walkers.genotyper.*;
import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult;
import org.broadinstitute.sting.utils.baq.BAQ;
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
import org.broadinstitute.sting.commandline.*;
import org.broadinstitute.sting.gatk.CommandLineGATK;
@ -40,10 +42,6 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContextUtils;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.filters.BadMateFilter;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.gatk.walkers.PartitionBy;
import org.broadinstitute.sting.gatk.walkers.PartitionType;
import org.broadinstitute.sting.gatk.walkers.annotator.VariantAnnotatorEngine;
import org.broadinstitute.sting.gatk.walkers.annotator.interfaces.AnnotatorCompatible;
import org.broadinstitute.sting.utils.*;
@ -103,6 +101,7 @@ import java.util.*;
@DocumentedGATKFeature( groupName = "Variant Discovery Tools", extraDocs = {CommandLineGATK.class} )
@PartitionBy(PartitionType.LOCUS)
@BAQMode(ApplicationTime = BAQ.ApplicationTime.FORBIDDEN)
@ActiveRegionExtension(extension=65, maxRegion=300)
public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implements AnnotatorCompatible {

View File

@ -810,4 +810,25 @@ public class Utils {
return Collections.unmodifiableMap(map);
}
/**
* Divides the input list into a list of sublists, which contains group size elements (except potentially the last one)
*
* list = [A, B, C, D, E]
* groupSize = 2
* result = [[A, B], [C, D], [E]]
*
* @param list
* @param groupSize
* @return
*/
public static <T> List<List<T>> groupList(final List<T> list, final int groupSize) {
if ( groupSize < 1 ) throw new IllegalArgumentException("groupSize >= 1");
final List<List<T>> subLists = new LinkedList<List<T>>();
int n = list.size();
for ( int i = 0; i < n; i += groupSize ) {
subLists.add(list.subList(i, Math.min(i + groupSize, n)));
}
return subLists;
}
}

View File

@ -0,0 +1,19 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* A function that maps from InputType -> ResultType
*
* For use with the NanoScheduler
*
* User: depristo
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface MapFunction<InputType, ResultType> {
/**
* Return function on input, returning a value of ResultType
* @param input
* @return
*/
public ResultType apply(final InputType input);
}

View File

@ -0,0 +1,277 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
/**
* Framework for very fine grained MapReduce parallelism
*
* The overall framework works like this
*
* nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads)
* List[Input] outerData : outerDataLoop )
* result = nano.execute(outerData.iterator(), map, reduce)
*
* bufferSize determines how many elements from the input stream are read in one go by the
* nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well
* as up to inputBufferSize map results as well.
*
* numberOfMapElementsToProcessTogether determines how many input elements are processed
* together each thread cycle. For example, if this value is 10, then the input data
* is grouped together in units of 10 elements each, and map called on each in term. The more
* heavy-weight the map function is, in terms of CPU costs, the more it makes sense to
* have this number be small. The lighter the CPU cost per element, though, the more this
* parameter introduces overhead due to need to context switch among threads to process
* each input element. A value of -1 lets the nanoscheduler guess at a reasonable trade-off value.
*
* nThreads is a bit obvious yes? Note though that the nanoscheduler assumes that it gets 1 thread
* from its client during the execute call, as this call blocks until all work is done. The caller
* thread is put to work by execute to help with the processing of the data. So in reality the
* nanoScheduler only spawn nThreads - 1 additional workers (if this is > 1).
*
* User: depristo
* Date: 8/24/12
* Time: 9:47 AM
*/
public class NanoScheduler<InputType, MapType, ReduceType> {
private static Logger logger = Logger.getLogger(NanoScheduler.class);
final int bufferSize;
final int mapGroupSize;
final int nThreads;
final ExecutorService executor;
boolean shutdown = false;
/**
* Create a new nanoschedule with the desire characteristics requested by the argument
*
* @param bufferSize the number of input elements to read in each scheduling cycle.
* @param mapGroupSize How many inputs should be grouped together per map? If -1 we make a reasonable guess
* @param nThreads the number of threads to use to get work done, in addition to the thread calling execute
*/
public NanoScheduler(final int bufferSize,
final int mapGroupSize,
final int nThreads) {
if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize);
if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads);
if ( mapGroupSize > bufferSize ) throw new IllegalArgumentException("mapGroupSize " + mapGroupSize + " must be <= bufferSize " + bufferSize);
if ( mapGroupSize == 0 || mapGroupSize < -1 ) throw new IllegalArgumentException("mapGroupSize cannot be <= 0" + mapGroupSize);
this.bufferSize = bufferSize;
this.nThreads = nThreads;
if ( mapGroupSize == -1 ) {
this.mapGroupSize = (int)Math.ceil(this.bufferSize / (10.0*this.nThreads));
logger.info(String.format("Dynamically setting grouping size to %d based on buffer size %d and n threads %d",
this.mapGroupSize, this.bufferSize, this.nThreads));
} else {
this.mapGroupSize = mapGroupSize;
}
this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads - 1);
}
/**
* The number of parallel map threads in use with this NanoScheduler
* @return
*/
public int getnThreads() {
return nThreads;
}
/**
* The input buffer size used by this NanoScheduler
* @return
*/
public int getBufferSize() {
return bufferSize;
}
/**
* The grouping size used by this NanoScheduler
* @return
*/
public int getMapGroupSize() {
return mapGroupSize;
}
/**
* Tells this nanoScheduler to shutdown immediately, releasing all its resources.
*
* After this call, execute cannot be invoked without throwing an error
*/
public void shutdown() {
if ( executor != null ) {
final List<Runnable> remaining = executor.shutdownNow();
if ( ! remaining.isEmpty() )
throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!");
}
shutdown = true;
}
/**
* @return true if this nanoScheduler is shutdown, or false if its still open for business
*/
public boolean isShutdown() {
return shutdown;
}
/**
* Execute a map/reduce job with this nanoScheduler
*
* Data comes from inputReader. Will be read until hasNext() == false.
* map is called on each element provided by inputReader. No order of operations is guarenteed
* reduce is called in order of the input data provided by inputReader on the result of map() applied
* to each element.
*
* Note that the caller thread is put to work with this function call. The call doesn't return
* until all elements have been processes.
*
* It is safe to call this function repeatedly on a single nanoScheduler, at least until the
* shutdown method is called.
*
* @param inputReader
* @param map
* @param reduce
* @return
*/
public ReduceType execute(final Iterator<InputType> inputReader,
final MapFunction<InputType, MapType> map,
final ReduceType initialValue,
final ReduceFunction<MapType, ReduceType> reduce) {
if ( isShutdown() )
throw new IllegalStateException("execute called on already shutdown NanoScheduler");
if ( getnThreads() == 1 ) {
return executeSingleThreaded(inputReader, map, initialValue, reduce);
} else {
return executeMultiThreaded(inputReader, map, initialValue, reduce);
}
}
/**
* Simple efficient reference implementation for single threaded execution
* @return the reduce result of this map/reduce job
*/
private ReduceType executeSingleThreaded(final Iterator<InputType> inputReader,
final MapFunction<InputType, MapType> map,
final ReduceType initialValue,
final ReduceFunction<MapType, ReduceType> reduce) {
ReduceType sum = initialValue;
while ( inputReader.hasNext() ) {
final InputType input = inputReader.next();
final MapType mapValue = map.apply(input);
sum = reduce.apply(mapValue, sum);
}
return sum;
}
/**
* Efficient parallel version of Map/Reduce
*
* @return the reduce result of this map/reduce job
*/
private ReduceType executeMultiThreaded(final Iterator<InputType> inputReader,
final MapFunction<InputType, MapType> map,
final ReduceType initialValue,
final ReduceFunction<MapType, ReduceType> reduce) {
ReduceType sum = initialValue;
while ( inputReader.hasNext() ) {
try {
// read in our input values
final List<InputType> inputs = readInputs(inputReader);
// send jobs for map
final Queue<Future<List<MapType>>> mapQueue = submitMapJobs(map, executor, inputs);
// send off the reduce job, and block until we get at least one reduce result
sum = reduceParallel(reduce, mapQueue, sum);
} catch (InterruptedException ex) {
throw new ReviewedStingException("got execution exception", ex);
} catch (ExecutionException ex) {
throw new ReviewedStingException("got execution exception", ex);
}
}
return sum;
}
@Requires("! mapQueue.isEmpty()")
private ReduceType reduceParallel(final ReduceFunction<MapType, ReduceType> reduce,
final Queue<Future<List<MapType>>> mapQueue,
final ReduceType initSum)
throws InterruptedException, ExecutionException {
ReduceType sum = initSum;
// while mapQueue has something in it to reduce
for ( final Future<List<MapType>> future : mapQueue ) {
for ( final MapType value : future.get() ) // block until we get the values for this task
sum = reduce.apply(value, sum);
}
return sum;
}
/**
* Read up to inputBufferSize elements from inputReader
*
* @return a queue of inputs read in, containing one or more values of InputType read in
*/
@Requires("inputReader.hasNext()")
@Ensures("!result.isEmpty()")
private List<InputType> readInputs(final Iterator<InputType> inputReader) {
int n = 0;
final List<InputType> inputs = new LinkedList<InputType>();
while ( inputReader.hasNext() && n < getBufferSize() ) {
final InputType input = inputReader.next();
inputs.add(input);
n++;
}
return inputs;
}
@Ensures("result.size() == inputs.size()")
private Queue<Future<List<MapType>>> submitMapJobs(final MapFunction<InputType, MapType> map,
final ExecutorService executor,
final List<InputType> inputs) {
final Queue<Future<List<MapType>>> mapQueue = new LinkedList<Future<List<MapType>>>();
for ( final List<InputType> subinputs : Utils.groupList(inputs, getMapGroupSize()) ) {
final CallableMap doMap = new CallableMap(map, subinputs);
final Future<List<MapType>> future = executor.submit(doMap);
mapQueue.add(future);
}
return mapQueue;
}
/**
* A simple callable version of the map function for use with the executor pool
*/
private class CallableMap implements Callable<List<MapType>> {
final List<InputType> inputs;
final MapFunction<InputType, MapType> map;
private CallableMap(final MapFunction<InputType, MapType> map, final List<InputType> inputs) {
this.inputs = inputs;
this.map = map;
}
@Override public List<MapType> call() throws Exception {
final List<MapType> outputs = new LinkedList<MapType>();
for ( final InputType input : inputs )
outputs.add(map.apply(input));
return outputs;
}
}
}

View File

@ -0,0 +1,18 @@
package org.broadinstitute.sting.utils.nanoScheduler;
/**
* A function that combines a value of MapType with an existing ReduceValue into a new ResultType
*
* User: depristo
* Date: 8/24/12
* Time: 9:49 AM
*/
public interface ReduceFunction<MapType, ReduceType> {
/**
* Combine one with sum into a new ReduceType
* @param one the result of a map call on an input element
* @param sum the cumulative reduce result over all previous map calls
* @return
*/
public ReduceType apply(MapType one, ReduceType sum);
}

View File

@ -0,0 +1,142 @@
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.*;
/**
* UnitTests for the NanoScheduler
*
* User: depristo
* Date: 8/24/12
* Time: 11:25 AM
* To change this template use File | Settings | File Templates.
*/
public class NanoSchedulerUnitTest extends BaseTest {
private class Map2x implements MapFunction<Integer, Integer> {
@Override public Integer apply(Integer input) { return input * 2; }
}
private class ReduceSum implements ReduceFunction<Integer, Integer> {
@Override public Integer apply(Integer one, Integer sum) { return one + sum; }
}
private static int sum2x(final int start, final int end) {
int sum = 0;
for ( int i = start; i < end; i++ )
sum += 2 * i;
return sum;
}
private class NanoSchedulerBasicTest extends TestDataProvider {
final int bufferSize, mapGroupSize, nThreads, start, end, expectedResult;
public NanoSchedulerBasicTest(final int bufferSize, final int mapGroupSize, final int nThreads, final int start, final int end) {
super(NanoSchedulerBasicTest.class);
this.bufferSize = bufferSize;
this.mapGroupSize = mapGroupSize;
this.nThreads = nThreads;
this.start = start;
this.end = end;
this.expectedResult = sum2x(start, end);
setName(String.format("%s nt=%d buf=%d mapGroupSize=%d start=%d end=%d sum=%d",
getClass().getSimpleName(), nThreads, bufferSize, mapGroupSize, start, end, expectedResult));
}
public Iterator<Integer> makeReader() {
final List<Integer> ints = new ArrayList<Integer>();
for ( int i = start; i < end; i++ )
ints.add(i);
return ints.iterator();
}
public Map2x makeMap() { return new Map2x(); }
public Integer initReduce() { return 0; }
public ReduceSum makeReduce() { return new ReduceSum(); }
}
static NanoSchedulerBasicTest exampleTest = null;
@DataProvider(name = "NanoSchedulerBasicTest")
public Object[][] createNanoSchedulerBasicTest() {
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) {
for ( final int mapGroupSize : Arrays.asList(-1, 1, 10, 100, 1000) ) {
if ( mapGroupSize <= bufferSize ) {
for ( final int nt : Arrays.asList(1, 2, 4) ) {
for ( final int start : Arrays.asList(0) ) {
for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) {
exampleTest = new NanoSchedulerBasicTest(bufferSize, mapGroupSize, nt, start, end);
}
}
}
}
}
}
return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class);
}
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest")
public void testSingleThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
logger.warn("Running " + test);
if ( test.nThreads == 1 )
testNanoScheduler(test);
}
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testSingleThreadedNanoScheduler")
public void testMultiThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
logger.warn("Running " + test);
if ( test.nThreads >= 1 )
testNanoScheduler(test);
}
private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException {
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.mapGroupSize, test.nThreads);
Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument");
Assert.assertTrue(nanoScheduler.getMapGroupSize() >= test.mapGroupSize, "mapGroupSize argument");
Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument");
final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());
Assert.assertNotNull(sum);
Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly");
nanoScheduler.shutdown();
}
@Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler")
public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException {
if ( test.bufferSize > 1 && (test.mapGroupSize > 1 || test.mapGroupSize == -1)) {
logger.warn("Running " + test);
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.mapGroupSize, test.nThreads);
// test reusing the scheduler
for ( int i = 0; i < 10; i++ ) {
final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());
Assert.assertNotNull(sum);
Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly");
}
nanoScheduler.shutdown();
}
}
@Test()
public void testShutdown() throws InterruptedException {
final NanoScheduler<Integer, Integer, Integer> nanoScheduler = new NanoScheduler<Integer, Integer, Integer>(1, 1, 2);
Assert.assertFalse(nanoScheduler.isShutdown(), "scheduler should be alive");
nanoScheduler.shutdown();
Assert.assertTrue(nanoScheduler.isShutdown(), "scheduler should be dead");
}
@Test(expectedExceptions = IllegalStateException.class)
public void testShutdownExecuteFailure() throws InterruptedException {
final NanoScheduler<Integer, Integer, Integer> nanoScheduler = new NanoScheduler<Integer, Integer, Integer>(1, 1, 2);
nanoScheduler.shutdown();
nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce());
}
}