First commit addressing problems running RTC in parallel.

Turns out that because the RTC is the first walker to 'correctly' tree reduce according to functional programming
standards, the RTC has revealed a few problems with the tree reducer holding on to too much data.  This is the first
and smaller of two commits to reduce memory consumption.  The second commit will likely be pushed after GATK1.4 is
released.
This commit is contained in:
Matt Hanna 2011-12-29 16:22:14 -05:00
parent e6e80e8d3f
commit a259bfefd4
2 changed files with 11 additions and 33 deletions

View File

@ -15,6 +15,7 @@ import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
@ -41,7 +42,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
*/
private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker();
private final Queue<Shard> traverseTasks = new LinkedList<Shard>();
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
/**
@ -49,6 +49,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
*/
private Throwable error = null;
/**
* Queue of incoming shards.
*/
private Iterator<Shard> traversalTasks;
/**
* Keep a queue of shard traversals, and constantly monitor it to see what output
* merge tasks remain.
@ -56,9 +61,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
*/
private final Queue<ShardTraverser> outputMergeTasks = new LinkedList<ShardTraverser>();
/** How many total tasks were in the queue at the start of run. */
private int totalTraversals = 0;
/** How many shard traversals have run to date? */
private int totalCompletedTraversals = 0;
@ -92,13 +94,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
if (!( walker instanceof TreeReducible ))
throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers");
this.traversalTasks = shardStrategy.iterator();
ReduceTree reduceTree = new ReduceTree(this);
initializeWalker(walker);
for (Shard shard : shardStrategy)
traverseTasks.add(shard);
totalTraversals = traverseTasks.size();
while (isShardTraversePending() || isTreeReducePending()) {
// Check for errors during execution.
if(hasTraversalErrorOccurred())
@ -190,7 +190,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* @return true if a shard traversal is waiting; false otherwise.
*/
protected boolean isShardTraversePending() {
return traverseTasks.size() > 0;
return traversalTasks.hasNext();
}
/**
@ -283,10 +283,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* @param reduceTree Tree of reduces to which to add this shard traverse.
*/
protected void queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) {
if (traverseTasks.size() == 0)
if (!traversalTasks.hasNext())
throw new IllegalStateException("Cannot traverse; no pending traversals exist.");
Shard shard = traverseTasks.remove();
Shard shard = traversalTasks.next();
// todo -- add ownership claim here
@ -398,16 +398,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
}
/** {@inheritDoc} */
public int getTotalNumberOfShards() {
return totalTraversals;
}
/** {@inheritDoc} */
public int getRemainingNumberOfShards() {
return traverseTasks.size();
}
/** {@inheritDoc} */
public int getNumberOfTasksInReduceQueue() {
return reduceTasks.size();

View File

@ -17,18 +17,6 @@ package org.broadinstitute.sting.gatk.executive;
* microscheduler is behaving.
*/
public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean {
/**
* What is the total number of shards assigned to this microscheduler?
* @return Total number of shards to process.
*/
public int getTotalNumberOfShards();
/**
* How many shards are remaining for this microscheduler to process?
* @return Remaining number of shards to process.
*/
public int getRemainingNumberOfShards();
/**
* How many tree reduces are waiting in the tree reduce queue?
* @return Total number of reduces waiting in the tree reduce queue?