More tests for NanoScheduler
-- Add more contracts -- Test in the UnitTest that the reduce is being called in the correct order
This commit is contained in:
parent
6db0988898
commit
275a5e5439
|
|
@ -85,6 +85,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
* The number of parallel map threads in use with this NanoScheduler
|
||||
* @return
|
||||
*/
|
||||
@Ensures("result > 0")
|
||||
public int getnThreads() {
|
||||
return nThreads;
|
||||
}
|
||||
|
|
@ -93,6 +94,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
* The input buffer size used by this NanoScheduler
|
||||
* @return
|
||||
*/
|
||||
@Ensures("result > 0")
|
||||
public int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
|
@ -101,6 +103,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
* The grouping size used by this NanoScheduler
|
||||
* @return
|
||||
*/
|
||||
@Ensures("result > 0")
|
||||
public int getMapGroupSize() {
|
||||
return mapGroupSize;
|
||||
}
|
||||
|
|
@ -149,8 +152,10 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final MapFunction<InputType, MapType> map,
|
||||
final ReduceType initialValue,
|
||||
final ReduceFunction<MapType, ReduceType> reduce) {
|
||||
if ( isShutdown() )
|
||||
throw new IllegalStateException("execute called on already shutdown NanoScheduler");
|
||||
if ( isShutdown() ) throw new IllegalStateException("execute called on already shutdown NanoScheduler");
|
||||
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
|
||||
if ( map == null ) throw new IllegalArgumentException("map function cannot be null");
|
||||
if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null");
|
||||
|
||||
if ( getnThreads() == 1 ) {
|
||||
return executeSingleThreaded(inputReader, map, initialValue, reduce);
|
||||
|
|
@ -206,7 +211,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
return sum;
|
||||
}
|
||||
|
||||
@Requires("! mapQueue.isEmpty()")
|
||||
@Requires({"reduce != null", "! mapQueue.isEmpty()"})
|
||||
private ReduceType reduceParallel(final ReduceFunction<MapType, ReduceType> reduce,
|
||||
final Queue<Future<List<MapType>>> mapQueue,
|
||||
final ReduceType initSum)
|
||||
|
|
@ -240,7 +245,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
return inputs;
|
||||
}
|
||||
|
||||
@Ensures("result.size() == inputs.size()")
|
||||
@Requires({"map != null", "! inputs.isEmpty()"})
|
||||
private Queue<Future<List<MapType>>> submitMapJobs(final MapFunction<InputType, MapType> map,
|
||||
final ExecutorService executor,
|
||||
final List<InputType> inputs) {
|
||||
|
|
@ -262,11 +267,13 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
final List<InputType> inputs;
|
||||
final MapFunction<InputType, MapType> map;
|
||||
|
||||
@Requires({"map != null", "inputs.size() <= getMapGroupSize()"})
|
||||
private CallableMap(final MapFunction<InputType, MapType> map, final List<InputType> inputs) {
|
||||
this.inputs = inputs;
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
@Ensures("result.size() == inputs.size()")
|
||||
@Override public List<MapType> call() throws Exception {
|
||||
final List<MapType> outputs = new LinkedList<MapType>();
|
||||
for ( final InputType input : inputs )
|
||||
|
|
|
|||
|
|
@ -21,7 +21,12 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
}
|
||||
|
||||
private class ReduceSum implements ReduceFunction<Integer, Integer> {
|
||||
@Override public Integer apply(Integer one, Integer sum) { return one + sum; }
|
||||
int prevOne = Integer.MIN_VALUE;
|
||||
|
||||
@Override public Integer apply(Integer one, Integer sum) {
|
||||
Assert.assertTrue(prevOne < one, "Reduce came in out of order. Prev " + prevOne + " cur " + one);
|
||||
return one + sum;
|
||||
}
|
||||
}
|
||||
|
||||
private static int sum2x(final int start, final int end) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue