From a259bfefd4458638812751c09616e09761479622 Mon Sep 17 00:00:00 2001 From: Matt Hanna Date: Thu, 29 Dec 2011 16:22:14 -0500 Subject: [PATCH] First commit addressing problems running RTC in parallel. Turns out that because the RTC is the first walker to 'correctly' tree reduce according to functional programming standards, the RTC has revealed a few problems with the tree reducer holding on to too much data. This is the first and smaller of two commits to reduce memory consumption. The second commit will likely be pushed after GATK1.4 is released. --- .../executive/HierarchicalMicroScheduler.java | 32 +++++++------------ .../HierarchicalMicroSchedulerMBean.java | 12 ------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index b0043e68c..39e1bdc72 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -15,6 +15,7 @@ import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; @@ -41,7 +42,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker(); - private final Queue traverseTasks = new LinkedList(); private final Queue reduceTasks = new LinkedList(); /** @@ -49,6 +49,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private Throwable error = null; + /** + * Queue of incoming shards. + */ + private Iterator traversalTasks; + /** * Keep a queue of shard traversals, and constantly monitor it to see what output * merge tasks remain. @@ -56,9 +61,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private final Queue outputMergeTasks = new LinkedList(); - /** How many total tasks were in the queue at the start of run. */ - private int totalTraversals = 0; - /** How many shard traversals have run to date? */ private int totalCompletedTraversals = 0; @@ -92,13 +94,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar if (!( walker instanceof TreeReducible )) throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers"); + this.traversalTasks = shardStrategy.iterator(); + ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); - for (Shard shard : shardStrategy) - traverseTasks.add(shard); - totalTraversals = traverseTasks.size(); - while (isShardTraversePending() || isTreeReducePending()) { // Check for errors during execution. if(hasTraversalErrorOccurred()) @@ -190,7 +190,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @return true if a shard traversal is waiting; false otherwise. */ protected boolean isShardTraversePending() { - return traverseTasks.size() > 0; + return traversalTasks.hasNext(); } /** @@ -283,10 +283,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * @param reduceTree Tree of reduces to which to add this shard traverse. */ protected void queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) { - if (traverseTasks.size() == 0) + if (!traversalTasks.hasNext()) throw new IllegalStateException("Cannot traverse; no pending traversals exist."); - Shard shard = traverseTasks.remove(); + Shard shard = traversalTasks.next(); // todo -- add ownership claim here @@ -398,16 +398,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } - /** {@inheritDoc} */ - public int getTotalNumberOfShards() { - return totalTraversals; - } - - /** {@inheritDoc} */ - public int getRemainingNumberOfShards() { - return traverseTasks.size(); - } - /** {@inheritDoc} */ public int getNumberOfTasksInReduceQueue() { return reduceTasks.size(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 21a87963b..530285db0 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -17,18 +17,6 @@ package org.broadinstitute.sting.gatk.executive; * microscheduler is behaving. */ public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { - /** - * What is the total number of shards assigned to this microscheduler? - * @return Total number of shards to process. - */ - public int getTotalNumberOfShards(); - - /** - * How many shards are remaining for this microscheduler to process? - * @return Remaining number of shards to process. - */ - public int getRemainingNumberOfShards(); - /** * How many tree reduces are waiting in the tree reduce queue? * @return Total number of reduces waiting in the tree reduce queue?