Test for progressFunction in NanoScheduler; bugfix for single threaded fast path
This commit is contained in:
parent
8cdeb51b78
commit
03dd470ec1
|
|
@ -210,9 +210,12 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final ReduceType initialValue,
|
||||
final NanoSchedulerReduceFunction<MapType, ReduceType> reduce) {
|
||||
ReduceType sum = initialValue;
|
||||
int i = 0;
|
||||
while ( inputReader.hasNext() ) {
|
||||
final InputType input = inputReader.next();
|
||||
final MapType mapValue = map.apply(input);
|
||||
if ( i++ % bufferSize == 0 && progressFunction != null )
|
||||
progressFunction.progress(input);
|
||||
sum = reduce.apply(mapValue, sum);
|
||||
}
|
||||
return sum;
|
||||
|
|
|
|||
|
|
@ -34,6 +34,16 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static class ProgressCallback implements NanoSchedulerProgressFunction<Integer> {
|
||||
int callBacks = 0;
|
||||
|
||||
@Override
|
||||
public void progress(Integer lastMapInput) {
|
||||
callBacks++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static int sum2x(final int start, final int end) {
|
||||
int sum = 0;
|
||||
for ( int i = start; i < end; i++ )
|
||||
|
|
@ -62,6 +72,11 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
return ints.iterator();
|
||||
}
|
||||
|
||||
public int nExpectedCallbacks() {
|
||||
int nElements = Math.max(end - start, 0);
|
||||
return nElements / bufferSize;
|
||||
}
|
||||
|
||||
public Map2x makeMap() { return new Map2x(); }
|
||||
public Integer initReduce() { return 0; }
|
||||
public ReduceSum makeReduce() { return new ReduceSum(); }
|
||||
|
|
@ -73,7 +88,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
for ( final int bufferSize : Arrays.asList(1, 10, 1000, 1000000) ) {
|
||||
for ( final int nt : Arrays.asList(1, 2, 4) ) {
|
||||
for ( final int start : Arrays.asList(0) ) {
|
||||
for ( final int end : Arrays.asList(1, 2, 11, 10000, 100000) ) {
|
||||
for ( final int end : Arrays.asList(0, 1, 2, 11, 10000, 100000) ) {
|
||||
exampleTest = new NanoSchedulerBasicTest(bufferSize, nt, start, end);
|
||||
}
|
||||
}
|
||||
|
|
@ -101,12 +116,17 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
final NanoScheduler<Integer, Integer, Integer> nanoScheduler =
|
||||
new NanoScheduler<Integer, Integer, Integer>(test.bufferSize, test.nThreads);
|
||||
|
||||
final ProgressCallback callback = new ProgressCallback();
|
||||
nanoScheduler.setProgressFunction(callback);
|
||||
|
||||
Assert.assertEquals(nanoScheduler.getBufferSize(), test.bufferSize, "bufferSize argument");
|
||||
Assert.assertEquals(nanoScheduler.getnThreads(), test.nThreads, "nThreads argument");
|
||||
|
||||
final Integer sum = nanoScheduler.execute(test.makeReader(), test.makeMap(), test.initReduce(), test.makeReduce());
|
||||
Assert.assertNotNull(sum);
|
||||
Assert.assertEquals((int)sum, test.expectedResult, "NanoScheduler sum not the same as calculated directly");
|
||||
|
||||
Assert.assertTrue(callback.callBacks >= test.nExpectedCallbacks(), "Not enough callbacks detected");
|
||||
nanoScheduler.shutdown();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue