diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 05e1d94f4..7338e6369 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -9,11 +9,13 @@ import org.broadinstitute.sting.gatk.OutputTracker; import org.broadinstitute.sting.gatk.Reads; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; -import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.GenomeLocSortedSet; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.JMException; import java.io.File; import java.util.List; import java.util.Queue; @@ -22,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.lang.management.ManagementFactory; /** * Created by IntelliJ IDEA. @@ -35,7 +38,7 @@ import java.util.concurrent.FutureTask; * A microscheduler that schedules shards according to a tree-like structure. * Requires a special walker tagged with a 'TreeReducible' interface. */ -public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier { +public class HierarchicalMicroScheduler extends MicroScheduler implements HierarchicalMicroSchedulerMBean, ReduceTree.TreeReduceNotifier { /** * How many outstanding output merges are allowed before the scheduler stops * allowing new processes and starts merging flat-out. @@ -51,6 +54,36 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce private Queue reduceTasks = new LinkedList(); private Queue outputMergeTasks = new LinkedList(); + /** + * How many total tasks were in the queue at the start of run. + */ + private int totalTraversals = 0; + + /** + * How many shard traversals have run to date? + */ + private int totalCompletedTraversals = 0; + + /** + * What is the total time spent traversing shards? + */ + private long totalShardTraverseTime = 0; + + /** + * What is the total time spent tree reducing shard output? + */ + private long totalTreeReduceTime = 0; + + /** + * How many tree reduces have been completed? + */ + private long totalCompletedTreeReduces = 0; + + /** + * What is the total time spent merging output? + */ + private long totalOutputMergeTime = 0; + /** * Create a new hierarchical microscheduler to process the given reads and reference. * @param reads Reads file(s) to process. @@ -60,6 +93,15 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List> rods, int nThreadsToUse ) { super( walker, reads, refFile, rods ); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler"); + mbs.registerMBean(this,name); + } + catch( JMException ex ) { + throw new StingException("Unable to register microscheduler with JMX", ex); + } } public Object execute( Walker walker, GenomeLocSortedSet intervals ) { @@ -74,6 +116,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce for(Shard shard: shardStrategy) traverseTasks.add(shard); + totalTraversals = traverseTasks.size(); while( isShardTraversePending() || isTreeReducePending() ) { // Too many files sitting around taking up space? Merge them. @@ -178,15 +221,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce * the final data streams. */ protected void mergeExistingOutput() { + long startTime = System.currentTimeMillis(); + OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); while( isOutputMergeReady() ) outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); + + long endTime = System.currentTimeMillis(); + + totalOutputMergeTime += (endTime - startTime); } /** * Merge any output that hasn't yet been taken care of by the blocking thread. */ protected void mergeRemainingOutput() { + long startTime = System.currentTimeMillis(); + OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); while( outputMergeTasks.size() > 0 ) { OutputMerger outputMerger = outputMergeTasks.remove(); @@ -196,6 +247,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce } outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); } + + long endTime = System.currentTimeMillis(); + + totalOutputMergeTime += (endTime - startTime); } /** @@ -210,7 +265,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce Shard shard = traverseTasks.remove(); OutputMerger outputMerger = new OutputMerger(); - ShardTraverser traverser = new ShardTraverser( getTraversalEngine(), + ShardTraverser traverser = new ShardTraverser( this, + getTraversalEngine(), walker, shard, getShardDataProvider(shard), @@ -258,7 +314,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce * @return A new, composite future of the result of this reduce. */ public Future notifyReduce( Future lhs, Future rhs ) { - TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( lhs, rhs ) ); + TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( this, lhs, rhs ) ); reduceTasks.add(reducer); return reducer; } @@ -284,5 +340,89 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce } } + /** + * Used by the ShardTraverser to report time consumed traversing a given shard. + * @param shardTraversalTime Elapsed time traversing a given shard. + */ + synchronized void reportShardTraverseTime( long shardTraversalTime ) { + totalShardTraverseTime += shardTraversalTime; + totalCompletedTraversals++; + } + /** + * Used by the TreeReducer to report time consumed reducing two shards. + * @param treeReduceTime Elapsed time reducing two shards. + */ + synchronized void reportTreeReduceTime( long treeReduceTime ) { + totalTreeReduceTime += treeReduceTime; + totalCompletedTreeReduces++; + + } + + /** + * {@inheritDoc} + */ + public int getTotalNumberOfShards() { + return totalTraversals; + } + + /** + * {@inheritDoc} + */ + public int getRemainingNumberOfShards() { + return traverseTasks.size(); + } + + /** + * {@inheritDoc} + */ + public int getNumberOfTasksInReduceQueue() { + return reduceTasks.size(); + } + + /** + * {@inheritDoc} + */ + public int getNumberOfTasksInIOQueue() { + return outputMergeTasks.size(); + } + + /** + * {@inheritDoc} + */ + public long getTotalShardTraverseTimeMillis() { + return totalShardTraverseTime; + } + + /** + * {@inheritDoc} + */ + public long getAvgShardTraverseTimeMillis() { + if( totalCompletedTraversals == 0 ) + return 0; + return totalShardTraverseTime / totalCompletedTraversals; + } + + /** + * {@inheritDoc} + */ + public long getTotalTreeReduceTimeMillis() { + return totalTreeReduceTime; + } + + /** + * {@inheritDoc} + */ + public long getAvgTreeReduceTimeMillis() { + if( totalCompletedTreeReduces == 0 ) + return 0; + return totalTreeReduceTime / totalCompletedTreeReduces; + } + + /** + * {@inheritDoc} + */ + public long getTotalOutputMergeTimeMillis() { + return totalOutputMergeTime; + } } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java new file mode 100644 index 000000000..3bb86e737 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -0,0 +1,73 @@ +package org.broadinstitute.sting.gatk.executive; +/** + * User: hanna + * Date: May 29, 2009 + * Time: 4:05:27 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * An interface for retrieving runtime statistics about how the hierarchical + * microscheduler is behaving. + */ +public interface HierarchicalMicroSchedulerMBean { + /** + * What is the total number of shards assigned to this microscheduler? + * @return Total number of shards to process. + */ + public int getTotalNumberOfShards(); + + /** + * How many shards are remaining for this microscheduler to process? + * @return Remaining number of shards to process. + */ + public int getRemainingNumberOfShards(); + + /** + * How many tree reduces are waiting in the tree reduce queue? + * @return Total number of reduces waiting in the tree reduce queue? + */ + public int getNumberOfTasksInReduceQueue(); + + /** + * How many pending I/O combining tasks are waiting in the queue? + * @return Total number of I/O tasks waiting in the I/O queue. + */ + public int getNumberOfTasksInIOQueue(); + + /** + * What is the total time spent running traversals? + * @return Total time spent traversing shards; 0 if none have been traversed. + */ + public long getTotalShardTraverseTimeMillis(); + + /** + * What is the average time spent running traversals? + * @return Average time spent traversing shards; 0 if none have been traversed. + */ + public long getAvgShardTraverseTimeMillis(); + + /** + * What is the total time spent merging output? + */ + public long getTotalOutputMergeTimeMillis(); + + /** + * What is the total time spent running tree reduces? + * @return Total time spent running tree reduces; 0 if none have been run. + */ + public long getTotalTreeReduceTimeMillis(); + + /** + * What is the average time spent running tree reduces? + * @return Average time spent running tree reduces; 0 if none have been run. + */ + public long getAvgTreeReduceTimeMillis(); +} + diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index fe2a8e939..2d517513a 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -24,17 +24,20 @@ import java.util.concurrent.Callable; * Carries the walker over a given shard, in a callable interface. */ public class ShardTraverser implements Callable { + private HierarchicalMicroScheduler microScheduler; private Walker walker; private TraversalEngine traversalEngine; private Shard shard; private ShardDataProvider dataProvider; private OutputMerger output; - public ShardTraverser( TraversalEngine traversalEngine, + public ShardTraverser( HierarchicalMicroScheduler microScheduler, + TraversalEngine traversalEngine, Walker walker, Shard shard, ShardDataProvider dataProvider, OutputMerger output ) { + this.microScheduler = microScheduler; this.walker = walker; this.traversalEngine = traversalEngine; this.shard = shard; @@ -43,6 +46,8 @@ public class ShardTraverser implements Callable { } public Object call() { + long startTime = System.currentTimeMillis(); + Object accumulator = walker.reduceInit(); OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() ); @@ -56,6 +61,10 @@ public class ShardTraverser implements Callable { outputTracker.cleanup(); } + long endTime = System.currentTimeMillis(); + + microScheduler.reportShardTraverseTime(endTime-startTime); + return accumulator; } } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java index 434c196db..4ea2ee447 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -24,24 +24,28 @@ import java.util.concurrent.ExecutionException; * interface to force the reduce. */ public class TreeReducer implements Callable { + private HierarchicalMicroScheduler microScheduler; private TreeReducible walker; private final Future lhs; private final Future rhs; /** * Create a one-sided reduce. Result will be a simple pass-through of the result. + * @param microScheduler The parent hierarchical microscheduler for this reducer. * @param lhs The one side of the reduce. */ - public TreeReducer( Future lhs ) { - this( lhs, null ); + public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs ) { + this( microScheduler, lhs, null ); } /** * Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future. + * @param microScheduler The parent hierarchical microscheduler for this reducer. * @param lhs Left-hand side of the reduce. * @param rhs Right-hand side of the reduce. */ - public TreeReducer( Future lhs, Future rhs ) { + public TreeReducer( HierarchicalMicroScheduler microScheduler, Future lhs, Future rhs ) { + this.microScheduler = microScheduler; this.lhs = lhs; this.rhs = rhs; } @@ -73,11 +77,15 @@ public class TreeReducer implements Callable { * @return Result of the reduce. */ public Object call() { + Object result = null; + + long startTime = System.currentTimeMillis(); + try { if( lhs == null ) - return lhs.get(); + result = lhs.get(); else - return walker.treeReduce( lhs.get(), rhs.get() ); + result = walker.treeReduce( lhs.get(), rhs.get() ); } catch( InterruptedException ex ) { throw new RuntimeException("Hierarchical reduce interrupted", ex); @@ -85,6 +93,12 @@ public class TreeReducer implements Callable { catch( ExecutionException ex ) { throw new RuntimeException("Hierarchical reduce failed", ex); } + + long endTime = System.currentTimeMillis(); + + microScheduler.reportTreeReduceTime( endTime - startTime ); + + return result; } }