diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index b0043e68c..39e1bdc72 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -15,6 +15,7 @@ import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; @@ -41,7 +42,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker(); - private final Queue traverseTasks = new LinkedList(); private final Queue reduceTasks = new LinkedList(); /** @@ -49,6 +49,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private Throwable error = null; + /** + * Queue of incoming shards. + */ + private Iterator traversalTasks; + /** * Keep a queue of shard traversals, and constantly monitor it to see what output * merge tasks remain. @@ -56,9 +61,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private final Queue outputMergeTasks = new LinkedList(); - /** How many total tasks were in the queue at the start of run. */ - private int totalTraversals = 0; - /** How many shard traversals have run to date? */ private int totalCompletedTraversals = 0; @@ -92,13 +94,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar if (!( walker instanceof TreeReducible )) throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers"); + this.traversalTasks = shardStrategy.iterator(); + ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); - for (Shard shard : shardStrategy) - traverseTasks.add(shard); - totalTraversals = traverseTasks.size(); - while (isShardTraversePending() || isTreeReducePending()) { // Check for errors during execution. if(hasTraversalErrorOccurred()) @@ -190,7 +190,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @return true if a shard traversal is waiting; false otherwise. */ protected boolean isShardTraversePending() { - return traverseTasks.size() > 0; + return traversalTasks.hasNext(); } /** @@ -283,10 +283,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @param reduceTree Tree of reduces to which to add this shard traverse. */ protected void queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) { - if (traverseTasks.size() == 0) + if (!traversalTasks.hasNext()) throw new IllegalStateException("Cannot traverse; no pending traversals exist."); - Shard shard = traverseTasks.remove(); + Shard shard = traversalTasks.next(); // todo -- add ownership claim here @@ -398,16 +398,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } - /** {@inheritDoc} */ - public int getTotalNumberOfShards() { - return totalTraversals; - } - - /** {@inheritDoc} */ - public int getRemainingNumberOfShards() { - return traverseTasks.size(); - } - /** {@inheritDoc} */ public int getNumberOfTasksInReduceQueue() { return reduceTasks.size(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 21a87963b..530285db0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -17,18 +17,6 @@ package org.broadinstitute.sting.gatk.executive; * microscheduler is behaving. */ public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { - /** - * What is the total number of shards assigned to this microscheduler? - * @return Total number of shards to process. - */ - public int getTotalNumberOfShards(); - - /** - * How many shards are remaining for this microscheduler to process? - * @return Remaining number of shards to process. - */ - public int getRemainingNumberOfShards(); - /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue?