Basic instrumentation support for the hierarchical microscheduler.x
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@862 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
c8347c3c94
commit
c04b67c969
|
|
@ -9,11 +9,13 @@ import org.broadinstitute.sting.gatk.OutputTracker;
|
||||||
import org.broadinstitute.sting.gatk.Reads;
|
import org.broadinstitute.sting.gatk.Reads;
|
||||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
|
||||||
import org.broadinstitute.sting.utils.StingException;
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
|
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
|
||||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.JMException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
|
@ -22,6 +24,7 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.FutureTask;
|
import java.util.concurrent.FutureTask;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by IntelliJ IDEA.
|
* Created by IntelliJ IDEA.
|
||||||
|
|
@ -35,7 +38,7 @@ import java.util.concurrent.FutureTask;
|
||||||
* A microscheduler that schedules shards according to a tree-like structure.
|
* A microscheduler that schedules shards according to a tree-like structure.
|
||||||
* Requires a special walker tagged with a 'TreeReducible' interface.
|
* Requires a special walker tagged with a 'TreeReducible' interface.
|
||||||
*/
|
*/
|
||||||
public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier {
|
public class HierarchicalMicroScheduler extends MicroScheduler implements HierarchicalMicroSchedulerMBean, ReduceTree.TreeReduceNotifier {
|
||||||
/**
|
/**
|
||||||
* How many outstanding output merges are allowed before the scheduler stops
|
* How many outstanding output merges are allowed before the scheduler stops
|
||||||
* allowing new processes and starts merging flat-out.
|
* allowing new processes and starts merging flat-out.
|
||||||
|
|
@ -51,6 +54,36 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||||
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
|
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many total tasks were in the queue at the start of run.
|
||||||
|
*/
|
||||||
|
private int totalTraversals = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many shard traversals have run to date?
|
||||||
|
*/
|
||||||
|
private int totalCompletedTraversals = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent traversing shards?
|
||||||
|
*/
|
||||||
|
private long totalShardTraverseTime = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent tree reducing shard output?
|
||||||
|
*/
|
||||||
|
private long totalTreeReduceTime = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many tree reduces have been completed?
|
||||||
|
*/
|
||||||
|
private long totalCompletedTreeReduces = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent merging output?
|
||||||
|
*/
|
||||||
|
private long totalOutputMergeTime = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new hierarchical microscheduler to process the given reads and reference.
|
* Create a new hierarchical microscheduler to process the given reads and reference.
|
||||||
* @param reads Reads file(s) to process.
|
* @param reads Reads file(s) to process.
|
||||||
|
|
@ -60,6 +93,15 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
||||||
super( walker, reads, refFile, rods );
|
super( walker, reads, refFile, rods );
|
||||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||||
|
|
||||||
|
try {
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler");
|
||||||
|
mbs.registerMBean(this,name);
|
||||||
|
}
|
||||||
|
catch( JMException ex ) {
|
||||||
|
throw new StingException("Unable to register microscheduler with JMX", ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object execute( Walker walker, GenomeLocSortedSet intervals ) {
|
public Object execute( Walker walker, GenomeLocSortedSet intervals ) {
|
||||||
|
|
@ -74,6 +116,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
|
|
||||||
for(Shard shard: shardStrategy)
|
for(Shard shard: shardStrategy)
|
||||||
traverseTasks.add(shard);
|
traverseTasks.add(shard);
|
||||||
|
totalTraversals = traverseTasks.size();
|
||||||
|
|
||||||
while( isShardTraversePending() || isTreeReducePending() ) {
|
while( isShardTraversePending() || isTreeReducePending() ) {
|
||||||
// Too many files sitting around taking up space? Merge them.
|
// Too many files sitting around taking up space? Merge them.
|
||||||
|
|
@ -178,15 +221,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
* the final data streams.
|
* the final data streams.
|
||||||
*/
|
*/
|
||||||
protected void mergeExistingOutput() {
|
protected void mergeExistingOutput() {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
||||||
while( isOutputMergeReady() )
|
while( isOutputMergeReady() )
|
||||||
outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
||||||
|
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
totalOutputMergeTime += (endTime - startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge any output that hasn't yet been taken care of by the blocking thread.
|
* Merge any output that hasn't yet been taken care of by the blocking thread.
|
||||||
*/
|
*/
|
||||||
protected void mergeRemainingOutput() {
|
protected void mergeRemainingOutput() {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
||||||
while( outputMergeTasks.size() > 0 ) {
|
while( outputMergeTasks.size() > 0 ) {
|
||||||
OutputMerger outputMerger = outputMergeTasks.remove();
|
OutputMerger outputMerger = outputMergeTasks.remove();
|
||||||
|
|
@ -196,6 +247,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
}
|
}
|
||||||
outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
totalOutputMergeTime += (endTime - startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -210,7 +265,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
Shard shard = traverseTasks.remove();
|
Shard shard = traverseTasks.remove();
|
||||||
OutputMerger outputMerger = new OutputMerger();
|
OutputMerger outputMerger = new OutputMerger();
|
||||||
|
|
||||||
ShardTraverser traverser = new ShardTraverser( getTraversalEngine(),
|
ShardTraverser traverser = new ShardTraverser( this,
|
||||||
|
getTraversalEngine(),
|
||||||
walker,
|
walker,
|
||||||
shard,
|
shard,
|
||||||
getShardDataProvider(shard),
|
getShardDataProvider(shard),
|
||||||
|
|
@ -258,7 +314,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
* @return A new, composite future of the result of this reduce.
|
* @return A new, composite future of the result of this reduce.
|
||||||
*/
|
*/
|
||||||
public Future notifyReduce( Future lhs, Future rhs ) {
|
public Future notifyReduce( Future lhs, Future rhs ) {
|
||||||
TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( lhs, rhs ) );
|
TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( this, lhs, rhs ) );
|
||||||
reduceTasks.add(reducer);
|
reduceTasks.add(reducer);
|
||||||
return reducer;
|
return reducer;
|
||||||
}
|
}
|
||||||
|
|
@ -284,5 +340,89 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by the ShardTraverser to report time consumed traversing a given shard.
|
||||||
|
* @param shardTraversalTime Elapsed time traversing a given shard.
|
||||||
|
*/
|
||||||
|
synchronized void reportShardTraverseTime( long shardTraversalTime ) {
|
||||||
|
totalShardTraverseTime += shardTraversalTime;
|
||||||
|
totalCompletedTraversals++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by the TreeReducer to report time consumed reducing two shards.
|
||||||
|
* @param treeReduceTime Elapsed time reducing two shards.
|
||||||
|
*/
|
||||||
|
synchronized void reportTreeReduceTime( long treeReduceTime ) {
|
||||||
|
totalTreeReduceTime += treeReduceTime;
|
||||||
|
totalCompletedTreeReduces++;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public int getTotalNumberOfShards() {
|
||||||
|
return totalTraversals;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public int getRemainingNumberOfShards() {
|
||||||
|
return traverseTasks.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public int getNumberOfTasksInReduceQueue() {
|
||||||
|
return reduceTasks.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public int getNumberOfTasksInIOQueue() {
|
||||||
|
return outputMergeTasks.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public long getTotalShardTraverseTimeMillis() {
|
||||||
|
return totalShardTraverseTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public long getAvgShardTraverseTimeMillis() {
|
||||||
|
if( totalCompletedTraversals == 0 )
|
||||||
|
return 0;
|
||||||
|
return totalShardTraverseTime / totalCompletedTraversals;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public long getTotalTreeReduceTimeMillis() {
|
||||||
|
return totalTreeReduceTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public long getAvgTreeReduceTimeMillis() {
|
||||||
|
if( totalCompletedTreeReduces == 0 )
|
||||||
|
return 0;
|
||||||
|
return totalTreeReduceTime / totalCompletedTreeReduces;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
public long getTotalOutputMergeTimeMillis() {
|
||||||
|
return totalOutputMergeTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
/**
|
||||||
|
* User: hanna
|
||||||
|
* Date: May 29, 2009
|
||||||
|
* Time: 4:05:27 PM
|
||||||
|
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||||
|
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||||
|
* All rights are reserved.
|
||||||
|
*
|
||||||
|
* Users acknowledge that this software is supplied without any warranty or support.
|
||||||
|
* The Broad Institute is not responsible for its use, misuse, or
|
||||||
|
* functionality.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface for retrieving runtime statistics about how the hierarchical
|
||||||
|
* microscheduler is behaving.
|
||||||
|
*/
|
||||||
|
public interface HierarchicalMicroSchedulerMBean {
|
||||||
|
/**
|
||||||
|
* What is the total number of shards assigned to this microscheduler?
|
||||||
|
* @return Total number of shards to process.
|
||||||
|
*/
|
||||||
|
public int getTotalNumberOfShards();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many shards are remaining for this microscheduler to process?
|
||||||
|
* @return Remaining number of shards to process.
|
||||||
|
*/
|
||||||
|
public int getRemainingNumberOfShards();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many tree reduces are waiting in the tree reduce queue?
|
||||||
|
* @return Total number of reduces waiting in the tree reduce queue?
|
||||||
|
*/
|
||||||
|
public int getNumberOfTasksInReduceQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many pending I/O combining tasks are waiting in the queue?
|
||||||
|
* @return Total number of I/O tasks waiting in the I/O queue.
|
||||||
|
*/
|
||||||
|
public int getNumberOfTasksInIOQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent running traversals?
|
||||||
|
* @return Total time spent traversing shards; 0 if none have been traversed.
|
||||||
|
*/
|
||||||
|
public long getTotalShardTraverseTimeMillis();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the average time spent running traversals?
|
||||||
|
* @return Average time spent traversing shards; 0 if none have been traversed.
|
||||||
|
*/
|
||||||
|
public long getAvgShardTraverseTimeMillis();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent merging output?
|
||||||
|
*/
|
||||||
|
public long getTotalOutputMergeTimeMillis();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the total time spent running tree reduces?
|
||||||
|
* @return Total time spent running tree reduces; 0 if none have been run.
|
||||||
|
*/
|
||||||
|
public long getTotalTreeReduceTimeMillis();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the average time spent running tree reduces?
|
||||||
|
* @return Average time spent running tree reduces; 0 if none have been run.
|
||||||
|
*/
|
||||||
|
public long getAvgTreeReduceTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -24,17 +24,20 @@ import java.util.concurrent.Callable;
|
||||||
* Carries the walker over a given shard, in a callable interface.
|
* Carries the walker over a given shard, in a callable interface.
|
||||||
*/
|
*/
|
||||||
public class ShardTraverser implements Callable {
|
public class ShardTraverser implements Callable {
|
||||||
|
private HierarchicalMicroScheduler microScheduler;
|
||||||
private Walker walker;
|
private Walker walker;
|
||||||
private TraversalEngine traversalEngine;
|
private TraversalEngine traversalEngine;
|
||||||
private Shard shard;
|
private Shard shard;
|
||||||
private ShardDataProvider dataProvider;
|
private ShardDataProvider dataProvider;
|
||||||
private OutputMerger output;
|
private OutputMerger output;
|
||||||
|
|
||||||
public ShardTraverser( TraversalEngine traversalEngine,
|
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
|
||||||
|
TraversalEngine traversalEngine,
|
||||||
Walker walker,
|
Walker walker,
|
||||||
Shard shard,
|
Shard shard,
|
||||||
ShardDataProvider dataProvider,
|
ShardDataProvider dataProvider,
|
||||||
OutputMerger output ) {
|
OutputMerger output ) {
|
||||||
|
this.microScheduler = microScheduler;
|
||||||
this.walker = walker;
|
this.walker = walker;
|
||||||
this.traversalEngine = traversalEngine;
|
this.traversalEngine = traversalEngine;
|
||||||
this.shard = shard;
|
this.shard = shard;
|
||||||
|
|
@ -43,6 +46,8 @@ public class ShardTraverser implements Callable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object call() {
|
public Object call() {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
Object accumulator = walker.reduceInit();
|
Object accumulator = walker.reduceInit();
|
||||||
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
||||||
outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() );
|
outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() );
|
||||||
|
|
@ -56,6 +61,10 @@ public class ShardTraverser implements Callable {
|
||||||
outputTracker.cleanup();
|
outputTracker.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
microScheduler.reportShardTraverseTime(endTime-startTime);
|
||||||
|
|
||||||
return accumulator;
|
return accumulator;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,24 +24,28 @@ import java.util.concurrent.ExecutionException;
|
||||||
* interface to force the reduce.
|
* interface to force the reduce.
|
||||||
*/
|
*/
|
||||||
public class TreeReducer implements Callable {
|
public class TreeReducer implements Callable {
|
||||||
|
private HierarchicalMicroScheduler microScheduler;
|
||||||
private TreeReducible walker;
|
private TreeReducible walker;
|
||||||
private final Future lhs;
|
private final Future lhs;
|
||||||
private final Future rhs;
|
private final Future rhs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a one-sided reduce. Result will be a simple pass-through of the result.
|
* Create a one-sided reduce. Result will be a simple pass-through of the result.
|
||||||
|
* @param microScheduler The parent hierarchical microscheduler for this reducer.
|
||||||
* @param lhs The one side of the reduce.
|
* @param lhs The one side of the reduce.
|
||||||
*/
|
*/
|
||||||
public TreeReducer( Future lhs ) {
|
public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs ) {
|
||||||
this( lhs, null );
|
this( microScheduler, lhs, null );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future.
|
* Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future.
|
||||||
|
* @param microScheduler The parent hierarchical microscheduler for this reducer.
|
||||||
* @param lhs Left-hand side of the reduce.
|
* @param lhs Left-hand side of the reduce.
|
||||||
* @param rhs Right-hand side of the reduce.
|
* @param rhs Right-hand side of the reduce.
|
||||||
*/
|
*/
|
||||||
public TreeReducer( Future lhs, Future rhs ) {
|
public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs, Future rhs ) {
|
||||||
|
this.microScheduler = microScheduler;
|
||||||
this.lhs = lhs;
|
this.lhs = lhs;
|
||||||
this.rhs = rhs;
|
this.rhs = rhs;
|
||||||
}
|
}
|
||||||
|
|
@ -73,11 +77,15 @@ public class TreeReducer implements Callable {
|
||||||
* @return Result of the reduce.
|
* @return Result of the reduce.
|
||||||
*/
|
*/
|
||||||
public Object call() {
|
public Object call() {
|
||||||
|
Object result = null;
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if( lhs == null )
|
if( lhs == null )
|
||||||
return lhs.get();
|
result = lhs.get();
|
||||||
else
|
else
|
||||||
return walker.treeReduce( lhs.get(), rhs.get() );
|
result = walker.treeReduce( lhs.get(), rhs.get() );
|
||||||
}
|
}
|
||||||
catch( InterruptedException ex ) {
|
catch( InterruptedException ex ) {
|
||||||
throw new RuntimeException("Hierarchical reduce interrupted", ex);
|
throw new RuntimeException("Hierarchical reduce interrupted", ex);
|
||||||
|
|
@ -85,6 +93,12 @@ public class TreeReducer implements Callable {
|
||||||
catch( ExecutionException ex ) {
|
catch( ExecutionException ex ) {
|
||||||
throw new RuntimeException("Hierarchical reduce failed", ex);
|
throw new RuntimeException("Hierarchical reduce failed", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
microScheduler.reportTreeReduceTime( endTime - startTime );
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue