diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java index acb5c9ebe..845fc68a6 100755 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java @@ -28,8 +28,10 @@ package org.broadinstitute.sting.gatk.walkers.haplotypecaller; import com.google.java.contract.Ensures; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.gatk.arguments.StandardCallerArgumentCollection; +import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.gatk.walkers.genotyper.*; import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult; +import org.broadinstitute.sting.utils.baq.BAQ; import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.commandline.*; import org.broadinstitute.sting.gatk.CommandLineGATK; @@ -40,10 +42,6 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContextUtils; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.filters.BadMateFilter; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; -import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension; -import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; -import org.broadinstitute.sting.gatk.walkers.PartitionBy; -import org.broadinstitute.sting.gatk.walkers.PartitionType; import org.broadinstitute.sting.gatk.walkers.annotator.VariantAnnotatorEngine; import org.broadinstitute.sting.gatk.walkers.annotator.interfaces.AnnotatorCompatible; import org.broadinstitute.sting.utils.*; @@ -103,6 +101,7 @@ import java.util.*; @DocumentedGATKFeature( groupName = "Variant Discovery Tools", extraDocs = {CommandLineGATK.class} ) @PartitionBy(PartitionType.LOCUS) +@BAQMode(ApplicationTime = BAQ.ApplicationTime.FORBIDDEN) @ActiveRegionExtension(extension=65, maxRegion=300) public class HaplotypeCaller extends ActiveRegionWalker implements AnnotatorCompatible { diff --git a/public/java/src/org/broadinstitute/sting/utils/Utils.java b/public/java/src/org/broadinstitute/sting/utils/Utils.java index a5b5eca6a..74b038032 100755 --- a/public/java/src/org/broadinstitute/sting/utils/Utils.java +++ b/public/java/src/org/broadinstitute/sting/utils/Utils.java @@ -810,4 +810,25 @@ public class Utils { return Collections.unmodifiableMap(map); } + /** + * Divides the input list into a list of sublists, which contains group size elements (except potentially the last one) + * + * list = [A, B, C, D, E] + * groupSize = 2 + * result = [[A, B], [C, D], [E]] + * + * @param list + * @param groupSize + * @return + */ + public static List> groupList(final List list, final int groupSize) { + if ( groupSize < 1 ) throw new IllegalArgumentException("groupSize >= 1"); + + final List> subLists = new LinkedList>(); + int n = list.size(); + for ( int i = 0; i < n; i += groupSize ) { + subLists.add(list.subList(i, Math.min(i + groupSize, n))); + } + return subLists; + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java new file mode 100644 index 000000000..440c263b7 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapFunction.java @@ -0,0 +1,19 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * A function that maps from InputType -> ResultType + * + * For use with the NanoScheduler + * + * User: depristo + * Date: 8/24/12 + * Time: 9:49 AM + */ +public interface MapFunction { + /** + * Return function on input, returning a value of ResultType + * @param input + * @return + */ + public ResultType apply(final InputType input); +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java new file mode 100644 index 000000000..fcc6a5723 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -0,0 +1,277 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import com.google.java.contract.Ensures; +import com.google.java.contract.Requires; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.Utils; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +/** + * Framework for very fine grained MapReduce parallelism + * + * The overall framework works like this + * + * nano <- new Nanoschedule(bufferSize, numberOfMapElementsToProcessTogether, nThreads) + * List[Input] outerData : outerDataLoop ) + * result = nano.execute(outerData.iterator(), map, reduce) + * + * bufferSize determines how many elements from the input stream are read in one go by the + * nanoscheduler. The scheduler may hold up to bufferSize in memory at one time, as well + * as up to inputBufferSize map results as well. + * + * numberOfMapElementsToProcessTogether determines how many input elements are processed + * together each thread cycle. For example, if this value is 10, then the input data + * is grouped together in units of 10 elements each, and map called on each in term. The more + * heavy-weight the map function is, in terms of CPU costs, the more it makes sense to + * have this number be small. The lighter the CPU cost per element, though, the more this + * parameter introduces overhead due to need to context switch among threads to process + * each input element. A value of -1 lets the nanoscheduler guess at a reasonable trade-off value. + * + * nThreads is a bit obvious yes? Note though that the nanoscheduler assumes that it gets 1 thread + * from its client during the execute call, as this call blocks until all work is done. The caller + * thread is put to work by execute to help with the processing of the data. So in reality the + * nanoScheduler only spawn nThreads - 1 additional workers (if this is > 1). + * + * User: depristo + * Date: 8/24/12 + * Time: 9:47 AM + */ +public class NanoScheduler { + private static Logger logger = Logger.getLogger(NanoScheduler.class); + + final int bufferSize; + final int mapGroupSize; + final int nThreads; + final ExecutorService executor; + boolean shutdown = false; + + /** + * Create a new nanoschedule with the desire characteristics requested by the argument + * + * @param bufferSize the number of input elements to read in each scheduling cycle. + * @param mapGroupSize How many inputs should be grouped together per map? If -1 we make a reasonable guess + * @param nThreads the number of threads to use to get work done, in addition to the thread calling execute + */ + public NanoScheduler(final int bufferSize, + final int mapGroupSize, + final int nThreads) { + if ( bufferSize < 1 ) throw new IllegalArgumentException("bufferSize must be >= 1, got " + bufferSize); + if ( nThreads < 1 ) throw new IllegalArgumentException("nThreads must be >= 1, got " + nThreads); + + if ( mapGroupSize > bufferSize ) throw new IllegalArgumentException("mapGroupSize " + mapGroupSize + " must be <= bufferSize " + bufferSize); + if ( mapGroupSize == 0 || mapGroupSize < -1 ) throw new IllegalArgumentException("mapGroupSize cannot be <= 0" + mapGroupSize); + + this.bufferSize = bufferSize; + this.nThreads = nThreads; + + if ( mapGroupSize == -1 ) { + this.mapGroupSize = (int)Math.ceil(this.bufferSize / (10.0*this.nThreads)); + logger.info(String.format("Dynamically setting grouping size to %d based on buffer size %d and n threads %d", + this.mapGroupSize, this.bufferSize, this.nThreads)); + } else { + this.mapGroupSize = mapGroupSize; + } + + this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads - 1); + } + + /** + * The number of parallel map threads in use with this NanoScheduler + * @return + */ + public int getnThreads() { + return nThreads; + } + + /** + * The input buffer size used by this NanoScheduler + * @return + */ + public int getBufferSize() { + return bufferSize; + } + + /** + * The grouping size used by this NanoScheduler + * @return + */ + public int getMapGroupSize() { + return mapGroupSize; + } + + /** + * Tells this nanoScheduler to shutdown immediately, releasing all its resources. + * + * After this call, execute cannot be invoked without throwing an error + */ + public void shutdown() { + if ( executor != null ) { + final List remaining = executor.shutdownNow(); + if ( ! remaining.isEmpty() ) + throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); + } + shutdown = true; + } + + /** + * @return true if this nanoScheduler is shutdown, or false if its still open for business + */ + public boolean isShutdown() { + return shutdown; + } + + /** + * Execute a map/reduce job with this nanoScheduler + * + * Data comes from inputReader. Will be read until hasNext() == false. + * map is called on each element provided by inputReader. No order of operations is guarenteed + * reduce is called in order of the input data provided by inputReader on the result of map() applied + * to each element. + * + * Note that the caller thread is put to work with this function call. The call doesn't return + * until all elements have been processes. + * + * It is safe to call this function repeatedly on a single nanoScheduler, at least until the + * shutdown method is called. + * + * @param inputReader + * @param map + * @param reduce + * @return + */ + public ReduceType execute(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + if ( isShutdown() ) + throw new IllegalStateException("execute called on already shutdown NanoScheduler"); + + if ( getnThreads() == 1 ) { + return executeSingleThreaded(inputReader, map, initialValue, reduce); + } else { + return executeMultiThreaded(inputReader, map, initialValue, reduce); + } + } + + /** + * Simple efficient reference implementation for single threaded execution + * @return the reduce result of this map/reduce job + */ + private ReduceType executeSingleThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; + while ( inputReader.hasNext() ) { + final InputType input = inputReader.next(); + final MapType mapValue = map.apply(input); + sum = reduce.apply(mapValue, sum); + } + return sum; + } + + /** + * Efficient parallel version of Map/Reduce + * + * @return the reduce result of this map/reduce job + */ + private ReduceType executeMultiThreaded(final Iterator inputReader, + final MapFunction map, + final ReduceType initialValue, + final ReduceFunction reduce) { + ReduceType sum = initialValue; + while ( inputReader.hasNext() ) { + try { + // read in our input values + final List inputs = readInputs(inputReader); + + // send jobs for map + final Queue>> mapQueue = submitMapJobs(map, executor, inputs); + + // send off the reduce job, and block until we get at least one reduce result + sum = reduceParallel(reduce, mapQueue, sum); + } catch (InterruptedException ex) { + throw new ReviewedStingException("got execution exception", ex); + } catch (ExecutionException ex) { + throw new ReviewedStingException("got execution exception", ex); + } + } + + return sum; + } + + @Requires("! mapQueue.isEmpty()") + private ReduceType reduceParallel(final ReduceFunction reduce, + final Queue>> mapQueue, + final ReduceType initSum) + throws InterruptedException, ExecutionException { + ReduceType sum = initSum; + + // while mapQueue has something in it to reduce + for ( final Future> future : mapQueue ) { + for ( final MapType value : future.get() ) // block until we get the values for this task + sum = reduce.apply(value, sum); + } + + return sum; + } + + /** + * Read up to inputBufferSize elements from inputReader + * + * @return a queue of inputs read in, containing one or more values of InputType read in + */ + @Requires("inputReader.hasNext()") + @Ensures("!result.isEmpty()") + private List readInputs(final Iterator inputReader) { + int n = 0; + final List inputs = new LinkedList(); + while ( inputReader.hasNext() && n < getBufferSize() ) { + final InputType input = inputReader.next(); + inputs.add(input); + n++; + } + return inputs; + } + + @Ensures("result.size() == inputs.size()") + private Queue>> submitMapJobs(final MapFunction map, + final ExecutorService executor, + final List inputs) { + final Queue>> mapQueue = new LinkedList>>(); + + for ( final List subinputs : Utils.groupList(inputs, getMapGroupSize()) ) { + final CallableMap doMap = new CallableMap(map, subinputs); + final Future> future = executor.submit(doMap); + mapQueue.add(future); + } + + return mapQueue; + } + + /** + * A simple callable version of the map function for use with the executor pool + */ + private class CallableMap implements Callable> { + final List inputs; + final MapFunction map; + + private CallableMap(final MapFunction map, final List inputs) { + this.inputs = inputs; + this.map = map; + } + + @Override public List call() throws Exception { + final List outputs = new LinkedList(); + for ( final InputType input : inputs ) + outputs.add(map.apply(input)); + return outputs; + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java new file mode 100644 index 000000000..8f1b0eddd --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReduceFunction.java @@ -0,0 +1,18 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +/** + * A function that combines a value of MapType with an existing ReduceValue into a new ResultType + * + * User: depristo + * Date: 8/24/12 + * Time: 9:49 AM + */ +public interface ReduceFunction { + /** + * Combine one with sum into a new ReduceType + * @param one the result of a map call on an input element + * @param sum the cumulative reduce result over all previous map calls + * @return + */ + public ReduceType apply(MapType one, ReduceType sum); +} diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java new file mode 100644 index 000000000..211e43dc1 --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -0,0 +1,142 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * UnitTests for the NanoScheduler + * + * User: depristo + * Date: 8/24/12 + * Time: 11:25 AM + * To change this template use File | Settings | File Templates. + */ +public class NanoSchedulerUnitTest extends BaseTest { + private class Map2x implements MapFunction { + @Override public Integer apply(Integer input) { return input * 2; } + } + + private class ReduceSum implements ReduceFunction { + @Override public Integer apply(Integer one, Integer sum) { return one + sum; } + } + + private static int sum2x(final int start, final int end) { + int sum = 0; + for ( int i = start; i < end; i++ ) + sum += 2 * i; + return sum; + } + + private class NanoSchedulerBasicTest extends TestDataProvider { + final int bufferSize, mapGroupSize, nThreads, start, end, expectedResult; + + public NanoSchedulerBasicTest(final int bufferSize, final int mapGroupSize, final int nThreads, final int start, final int end) { + super(NanoSchedulerBasicTest.class); + this.bufferSize = bufferSize; + this.mapGroupSize = mapGroupSize; + this.nThreads = nThreads; + this.start = start; + this.end = end; + this.expectedResult = sum2x(start, end); + setName(String.format("%s nt=%d buf=%d mapGroupSize=%d start=%d end=%d sum=%d", + getClass().getSimpleName(), nThreads, bufferSize, mapGroupSize, start, end, expectedResult)); + } + + public Iterator makeReader() { + final List ints = new ArrayList(); + for ( int i = start; i < end; i++ ) + ints.add(i); + return ints.iterator(); + } + + public Map2x makeMap() { return new Map2x(); } + public Integer initReduce() { return 0; } + public ReduceSum makeReduce() { return new ReduceSum(); } + } + + static NanoSchedulerBasicTest exampleTest = null; + @DataProvider(name = "NanoSchedulerBasicTest") + public Object[][] createNanoSchedulerBasicTest() { + for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { + for ( final int mapGroupSize : Arrays.asList(-1, 1, 10, 100, 1000) ) { + if ( mapGroupSize <= bufferSize ) { + for ( final int nt : Arrays.asList(1, 2, 4) ) { + for ( final int start : Arrays.asList(0) ) { + for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) { + exampleTest = new NanoSchedulerBasicTest(bufferSize, mapGroupSize, nt, start, end); + } + } + } + } + } + } + + return NanoSchedulerBasicTest.getTests(NanoSchedulerBasicTest.class); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest") + public void testSingleThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + logger.warn("Running " + test); + if ( test.nThreads == 1 ) + testNanoScheduler(test); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", timeOut = 10000, dependsOnMethods = "testSingleThreadedNanoScheduler") + public void testMultiThreadedNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + logger.warn("Running " + test); + if ( test.nThreads >= 1 ) + testNanoScheduler(test); + } + + private void testNanoScheduler(final NanoSchedulerBasicTest test) throws InterruptedException { + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument"); + Assert.assertTrue(nanoScheduler.getMapGroupSize() >= test.mapGroupSize, "mapGroupSize argument"); + Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument"); + + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + nanoScheduler.shutdown(); + } + + @Test(enabled = true, dataProvider = "NanoSchedulerBasicTest", dependsOnMethods = "testMultiThreadedNanoScheduler") + public void testNanoSchedulerInLoop(final NanoSchedulerBasicTest test) throws InterruptedException { + if ( test.bufferSize > 1 && (test.mapGroupSize > 1 || test.mapGroupSize == -1)) { + logger.warn("Running " + test); + + final NanoScheduler nanoScheduler = + new NanoScheduler(test.bufferSize, test.mapGroupSize, test.nThreads); + + // test reusing the scheduler + for ( int i = 0; i < 10; i++ ) { + final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); + Assert.assertNotNull(sum); + Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + } + + nanoScheduler.shutdown(); + } + } + + @Test() + public void testShutdown() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + Assert.assertFalse(nanoScheduler.isShutdown(), "scheduler should be alive"); + nanoScheduler.shutdown(); + Assert.assertTrue(nanoScheduler.isShutdown(), "scheduler should be dead"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testShutdownExecuteFailure() throws InterruptedException { + final NanoScheduler nanoScheduler = new NanoScheduler(1, 1, 2); + nanoScheduler.shutdown(); + nanoScheduler.execute(exampleTest.makeReader(), exampleTest.makeMap(), exampleTest.initReduce(), exampleTest.makeReduce()); + } +}