diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index fcc6a5723..63ae1958c 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -85,6 +85,7 @@ public class NanoScheduler { * The number of parallel map threads in use with this NanoScheduler * @return */ + @Ensures("result > 0") public int getnThreads() { return nThreads; } @@ -93,6 +94,7 @@ public class NanoScheduler { * The input buffer size used by this NanoScheduler * @return */ + @Ensures("result > 0") public int getBufferSize() { return bufferSize; } @@ -101,6 +103,7 @@ public class NanoScheduler { * The grouping size used by this NanoScheduler * @return */ + @Ensures("result > 0") public int getMapGroupSize() { return mapGroupSize; } @@ -149,8 +152,10 @@ public class NanoScheduler { final MapFunction map, final ReduceType initialValue, final ReduceFunction reduce) { - if ( isShutdown() ) - throw new IllegalStateException("execute called on already shutdown NanoScheduler"); + if ( isShutdown() ) throw new IllegalStateException("execute called on already shutdown NanoScheduler"); + if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null"); + if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); + if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); if ( getnThreads() == 1 ) { return executeSingleThreaded(inputReader, map, initialValue, reduce); @@ -206,7 +211,7 @@ public class NanoScheduler { return sum; } - @Requires("! mapQueue.isEmpty()") + @Requires({"reduce != null", "! mapQueue.isEmpty()"}) private ReduceType reduceParallel(final ReduceFunction reduce, final Queue>> mapQueue, final ReduceType initSum) @@ -240,7 +245,7 @@ public class NanoScheduler { return inputs; } - @Ensures("result.size() == inputs.size()") + @Requires({"map != null", "! inputs.isEmpty()"}) private Queue>> submitMapJobs(final MapFunction map, final ExecutorService executor, final List inputs) { @@ -262,11 +267,13 @@ public class NanoScheduler { final List inputs; final MapFunction map; + @Requires({"map != null", "inputs.size() <= getMapGroupSize()"}) private CallableMap(final MapFunction map, final List inputs) { this.inputs = inputs; this.map = map; } + @Ensures("result.size() == inputs.size()") @Override public List call() throws Exception { final List outputs = new LinkedList(); for ( final InputType input : inputs ) diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 211e43dc1..454441240 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -21,7 +21,12 @@ public class NanoSchedulerUnitTest extends BaseTest { } private class ReduceSum implements ReduceFunction { - @Override public Integer apply(Integer one, Integer sum) { return one + sum; } + int prevOne = Integer.MIN_VALUE; + + @Override public Integer apply(Integer one, Integer sum) { + Assert.assertTrue(prevOne < one, "Reduce came in out of order. Prev " + prevOne + " cur " + one); + return one + sum; + } } private static int sum2x(final int start, final int end) {