diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index f0c2a6723..61d4fdd01 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -210,9 +210,12 @@ public class NanoScheduler { final ReduceType initialValue, final NanoSchedulerReduceFunction reduce) { ReduceType sum = initialValue; + int i = 0; while ( inputReader.hasNext() ) { final InputType input = inputReader.next(); final MapType mapValue = map.apply(input); + if ( i++ % bufferSize == 0 && progressFunction != null ) + progressFunction.progress(input); sum = reduce.apply(mapValue, sum); } return sum; diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java index 0ec3035e2..3bd006ffe 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerUnitTest.java @@ -34,6 +34,16 @@ public class NanoSchedulerUnitTest extends BaseTest { } } + private static class ProgressCallback implements NanoSchedulerProgressFunction { + int callBacks = 0; + + @Override + public void progress(Integer lastMapInput) { + callBacks++; + } + } + + private static int sum2x(final int start, final int end) { int sum = 0; for ( int i = start; i < end; i++ ) @@ -62,6 +72,11 @@ public class NanoSchedulerUnitTest extends BaseTest { return ints.iterator(); } + public int nExpectedCallbacks() { + int nElements = Math.max(end - start, 0); + return nElements / bufferSize; + } + public Map2x makeMap() { return new Map2x(); } public Integer initReduce() { return 0; } public ReduceSum makeReduce() { return new ReduceSum(); } @@ -73,7 +88,7 @@ public class NanoSchedulerUnitTest extends BaseTest { for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) { for ( final int nt : Arrays.asList(1, 2, 4) ) { for ( final int start : Arrays.asList(0) ) { - for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) { + for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) { exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end); } } @@ -101,12 +116,17 @@ public class NanoSchedulerUnitTest extends BaseTest { final NanoScheduler nanoScheduler = new NanoScheduler(test.bufferSize, test.nThreads); + final ProgressCallback callback = new ProgressCallback(); + nanoScheduler.setProgressFunction(callback); + Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument"); Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument"); final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce()); Assert.assertNotNull(sum); Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly"); + + Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected"); nanoScheduler.shutdown(); }