Minor optimizations for NanoScheduler
-- Reducer.maybeReleaseLatch is no longer synchronized -- NanoScheduler only prints progress every 100 or so map calls
This commit is contained in:
parent
0f04485c24
commit
7796ba7601
|
|
@ -43,7 +43,7 @@ import java.util.concurrent.*;
|
|||
public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
|
||||
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
|
||||
private final static boolean LOG_MAP_TIMES = false;
|
||||
protected final static int UPDATE_PROGRESS_FREQ = 100;
|
||||
|
||||
final int bufferSize;
|
||||
final int nThreads;
|
||||
|
|
@ -243,8 +243,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
// map
|
||||
final MapType mapValue = map.apply(input);
|
||||
|
||||
if ( progressFunction != null )
|
||||
progressFunction.progress(input);
|
||||
updateProgress(i++, input);
|
||||
|
||||
// reduce
|
||||
sum = reduce.apply(mapValue, sum);
|
||||
|
|
@ -254,6 +253,16 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maybe update the progress meter (maybe because we don't want to do so so often that it costs cpu time)
|
||||
* @param counter increasing counter to use to cut down on updates
|
||||
* @param input the input we're currently at
|
||||
*/
|
||||
private void updateProgress(final int counter, final InputType input) {
|
||||
if ( progressFunction != null && counter % UPDATE_PROGRESS_FREQ == 0 )
|
||||
progressFunction.progress(input);
|
||||
}
|
||||
|
||||
/**
|
||||
* Efficient parallel version of Map/Reduce
|
||||
*
|
||||
|
|
@ -453,8 +462,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
// enqueue the result into the mapResultQueue
|
||||
result = new MapResult<MapType>(mapValue, inputWrapper.getId());
|
||||
|
||||
if ( progressFunction != null )
|
||||
progressFunction.progress(input);
|
||||
updateProgress(inputWrapper.getId(), input);
|
||||
} else {
|
||||
// if there's no input we push empty MapResults with jobIDs for synchronization with Reducer
|
||||
result = new MapResult<MapType>(inputWrapper.getId());
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ class Reducer<MapType, ReduceType> {
|
|||
*
|
||||
* Appropriate means we've seen the last job, or there's only a single job id
|
||||
*/
|
||||
private synchronized void maybeReleaseLatch() {
|
||||
private void maybeReleaseLatch() {
|
||||
if ( numJobsReduced == numSubmittedJobs ) {
|
||||
// either we've already seen the last one prevJobID == numSubmittedJobs or
|
||||
// the last job ID is -1, meaning that no jobs were ever submitted
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
|
|||
|
||||
public int nExpectedCallbacks() {
|
||||
int nElements = Math.max(end - start, 0);
|
||||
return nElements / bufferSize;
|
||||
return nElements / bufferSize / NanoScheduler.UPDATE_PROGRESS_FREQ;
|
||||
}
|
||||
|
||||
public Map2x makeMap() { return addDelays ? new Map2xWithDelays() : new Map2x(); }
|
||||
|
|
|
|||
Loading…
Reference in New Issue