2009-04-27 07:08:12 +08:00
|
|
|
package org.broadinstitute.sting.gatk.executive;
|
|
|
|
|
|
2009-05-18 09:31:57 +08:00
|
|
|
import org.broadinstitute.sting.gatk.traversals.TraverseLoci;
|
2009-05-23 05:20:24 +08:00
|
|
|
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
2009-04-27 07:08:12 +08:00
|
|
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
2009-04-30 04:26:16 +08:00
|
|
|
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
|
|
|
|
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
|
|
|
|
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
2009-05-12 06:33:00 +08:00
|
|
|
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
|
2009-05-02 05:40:46 +08:00
|
|
|
import org.broadinstitute.sting.gatk.OutputTracker;
|
2009-05-16 05:02:12 +08:00
|
|
|
import org.broadinstitute.sting.gatk.Reads;
|
2009-05-07 07:26:21 +08:00
|
|
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
|
|
|
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
2009-04-27 07:08:12 +08:00
|
|
|
import org.broadinstitute.sting.utils.GenomeLoc;
|
2009-05-02 03:34:09 +08:00
|
|
|
import org.broadinstitute.sting.utils.StingException;
|
2009-04-30 05:07:07 +08:00
|
|
|
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
2009-04-27 07:08:12 +08:00
|
|
|
|
|
|
|
|
import java.io.File;
|
|
|
|
|
import java.util.List;
|
2009-04-30 04:26:16 +08:00
|
|
|
import java.util.Queue;
|
|
|
|
|
import java.util.LinkedList;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
import java.util.concurrent.FutureTask;
|
2009-04-27 07:08:12 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Created by IntelliJ IDEA.
|
|
|
|
|
* User: mhanna
|
|
|
|
|
* Date: Apr 26, 2009
|
|
|
|
|
* Time: 5:41:04 PM
|
|
|
|
|
* To change this template use File | Settings | File Templates.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A microscheduler that schedules shards according to a tree-like structure.
|
|
|
|
|
* Requires a special walker tagged with a 'TreeReducible' interface.
|
|
|
|
|
*/
|
2009-04-30 04:26:16 +08:00
|
|
|
public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier {
|
2009-05-02 05:40:46 +08:00
|
|
|
/**
|
|
|
|
|
* How many outstanding output merges are allowed before the scheduler stops
|
|
|
|
|
* allowing new processes and starts merging flat-out.
|
|
|
|
|
*/
|
|
|
|
|
private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50;
|
|
|
|
|
|
2009-04-27 07:08:12 +08:00
|
|
|
/**
|
2009-04-30 04:26:16 +08:00
|
|
|
* Manage currently running threads.
|
2009-04-27 07:08:12 +08:00
|
|
|
*/
|
2009-04-30 04:26:16 +08:00
|
|
|
private ExecutorService threadPool;
|
|
|
|
|
|
|
|
|
|
private Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
|
|
|
|
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
2009-05-02 06:01:04 +08:00
|
|
|
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
|
2009-04-27 07:08:12 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a new hierarchical microscheduler to process the given reads and reference.
|
|
|
|
|
* @param reads Reads file(s) to process.
|
|
|
|
|
* @param refFile Reference for driving the traversal.
|
|
|
|
|
* @param nThreadsToUse maximum number of threads to use to do the work
|
|
|
|
|
*/
|
2009-05-16 05:02:12 +08:00
|
|
|
protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
2009-05-09 05:27:54 +08:00
|
|
|
super( walker, reads, refFile, rods );
|
|
|
|
|
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
2009-04-27 07:08:12 +08:00
|
|
|
}
|
|
|
|
|
|
2009-05-16 04:20:27 +08:00
|
|
|
public Object execute( Walker walker, List<GenomeLoc> intervals ) {
|
2009-04-30 04:26:16 +08:00
|
|
|
// Fast fail for walkers not supporting TreeReducible interface.
|
|
|
|
|
if( !(walker instanceof TreeReducible) )
|
|
|
|
|
throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers");
|
|
|
|
|
|
2009-05-19 06:54:18 +08:00
|
|
|
ShardStrategy shardStrategy = getShardStrategy( walker, reference, intervals );
|
2009-04-30 04:26:16 +08:00
|
|
|
ReduceTree reduceTree = new ReduceTree( this );
|
|
|
|
|
|
|
|
|
|
walker.initialize();
|
2009-04-27 07:08:12 +08:00
|
|
|
|
2009-04-30 04:26:16 +08:00
|
|
|
for(Shard shard: shardStrategy)
|
|
|
|
|
traverseTasks.add(shard);
|
|
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
while( isShardTraversePending() || isTreeReducePending() ) {
|
|
|
|
|
// Too many files sitting around taking up space? Merge them.
|
|
|
|
|
if( isMergeLimitExceeded() )
|
|
|
|
|
mergeExistingOutput();
|
|
|
|
|
|
|
|
|
|
// Wait for the next slot in the queue to become free.
|
2009-04-30 04:26:16 +08:00
|
|
|
waitForFreeQueueSlot();
|
|
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
// Pick the next most appropriate task and run it. In the interest of
|
|
|
|
|
// memory conservation, hierarchical reduces always run before traversals.
|
2009-04-30 04:26:16 +08:00
|
|
|
if( isTreeReduceReady() )
|
|
|
|
|
queueNextTreeReduce( walker );
|
2009-05-02 05:40:46 +08:00
|
|
|
else if( isShardTraversePending() )
|
2009-05-09 05:27:54 +08:00
|
|
|
queueNextShardTraverse( walker, reduceTree );
|
2009-04-30 04:26:16 +08:00
|
|
|
}
|
|
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
// Merge any lingering output files. If these files aren't ready,
|
|
|
|
|
// sit around and wait for them, then merge them.
|
|
|
|
|
mergeRemainingOutput();
|
|
|
|
|
|
2009-05-02 03:34:09 +08:00
|
|
|
threadPool.shutdown();
|
|
|
|
|
|
|
|
|
|
Object result = null;
|
|
|
|
|
try {
|
|
|
|
|
result = reduceTree.getResult().get();
|
|
|
|
|
}
|
|
|
|
|
catch(Exception ex) {
|
|
|
|
|
throw new StingException("Unable to retrieve result", ex );
|
|
|
|
|
}
|
2009-04-30 04:26:16 +08:00
|
|
|
|
2009-05-08 22:12:45 +08:00
|
|
|
traversalEngine.printOnTraversalDone(result);
|
2009-04-30 04:26:16 +08:00
|
|
|
walker.onTraversalDone(result);
|
2009-05-16 04:20:27 +08:00
|
|
|
|
|
|
|
|
return result;
|
2009-04-30 04:26:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if there are unscheduled shard traversal waiting to run.
|
|
|
|
|
* @return true if a shard traversal is waiting; false otherwise.
|
|
|
|
|
*/
|
|
|
|
|
protected boolean isShardTraversePending() {
|
|
|
|
|
return traverseTasks.size() > 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if there are tree reduces that can be run without
|
|
|
|
|
* blocking.
|
|
|
|
|
* @return true if a tree reduce is ready; false otherwise.
|
|
|
|
|
*/
|
|
|
|
|
protected boolean isTreeReduceReady() {
|
|
|
|
|
if( reduceTasks.size() == 0 )
|
|
|
|
|
return false;
|
|
|
|
|
return reduceTasks.peek().isReadyForReduce();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if there are tree reduces that need to be run before
|
|
|
|
|
* the computation is complete. Returns true if any entries are in the queue,
|
|
|
|
|
* blocked or otherwise.
|
|
|
|
|
* @return true if a tree reduce is pending; false otherwise.
|
|
|
|
|
*/
|
|
|
|
|
protected boolean isTreeReducePending() {
|
|
|
|
|
return reduceTasks.size() > 0;
|
|
|
|
|
}
|
|
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
/**
|
|
|
|
|
* Returns whether the maximum number of files is sitting in the temp directory
|
|
|
|
|
* waiting to be merged back in.
|
|
|
|
|
* @return True if the merging needs to take priority. False otherwise.
|
|
|
|
|
*/
|
|
|
|
|
protected boolean isMergeLimitExceeded() {
|
|
|
|
|
if( outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES )
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
// If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit
|
|
|
|
|
// has not been exceeded.
|
2009-05-02 06:01:04 +08:00
|
|
|
OutputMerger[] outputMergers = outputMergeTasks.toArray( new OutputMerger[0] );
|
2009-05-02 05:40:46 +08:00
|
|
|
for( int i = 0; i < outputMergers.length; i++ ) {
|
|
|
|
|
if( !outputMergers[i].isComplete() )
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Everything's ready?
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2009-05-02 03:34:09 +08:00
|
|
|
/**
|
|
|
|
|
* Returns whether there is output waiting to be merged into the global output
|
|
|
|
|
* streams right now.
|
|
|
|
|
* @return True if this output is ready to be merged. False otherwise.
|
|
|
|
|
*/
|
|
|
|
|
protected boolean isOutputMergeReady() {
|
2009-05-02 05:40:46 +08:00
|
|
|
return outputMergeTasks.peek().isComplete();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Merging all output that's sitting ready in the OutputMerger queue into
|
|
|
|
|
* the final data streams.
|
|
|
|
|
*/
|
|
|
|
|
protected void mergeExistingOutput() {
|
2009-05-12 06:33:00 +08:00
|
|
|
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
2009-05-02 05:40:46 +08:00
|
|
|
while( isOutputMergeReady() )
|
|
|
|
|
outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
2009-05-02 03:34:09 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2009-05-02 05:40:46 +08:00
|
|
|
* Merge any output that hasn't yet been taken care of by the blocking thread.
|
2009-05-02 03:34:09 +08:00
|
|
|
*/
|
2009-05-02 05:40:46 +08:00
|
|
|
protected void mergeRemainingOutput() {
|
2009-05-12 06:33:00 +08:00
|
|
|
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
|
2009-05-02 05:40:46 +08:00
|
|
|
while( outputMergeTasks.size() > 0 ) {
|
2009-05-02 06:01:04 +08:00
|
|
|
OutputMerger outputMerger = outputMergeTasks.remove();
|
|
|
|
|
synchronized(outputMerger) {
|
|
|
|
|
if( !outputMerger.isComplete() )
|
|
|
|
|
outputMerger.waitForOutputComplete();
|
2009-05-02 05:40:46 +08:00
|
|
|
}
|
2009-05-02 06:01:04 +08:00
|
|
|
outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
2009-05-02 05:40:46 +08:00
|
|
|
}
|
2009-05-02 03:34:09 +08:00
|
|
|
}
|
|
|
|
|
|
2009-04-30 04:26:16 +08:00
|
|
|
/**
|
|
|
|
|
* Queues the next traversal of a walker from the traversal tasks queue.
|
|
|
|
|
* @param walker Walker to apply to the dataset.
|
2009-05-09 05:27:54 +08:00
|
|
|
* @param reduceTree Tree of reduces to which to add this shard traverse.
|
2009-04-30 04:26:16 +08:00
|
|
|
*/
|
2009-05-09 05:27:54 +08:00
|
|
|
protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) {
|
2009-04-30 04:26:16 +08:00
|
|
|
if( traverseTasks.size() == 0 )
|
|
|
|
|
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
|
2009-05-02 03:34:09 +08:00
|
|
|
|
2009-05-09 05:27:54 +08:00
|
|
|
Shard shard = traverseTasks.remove();
|
2009-05-02 06:01:04 +08:00
|
|
|
OutputMerger outputMerger = new OutputMerger();
|
2009-05-02 03:34:09 +08:00
|
|
|
|
2009-05-23 05:20:24 +08:00
|
|
|
ShardTraverser traverser = new ShardTraverser( getTraversalEngine(),
|
2009-04-30 05:07:07 +08:00
|
|
|
walker,
|
2009-05-09 05:27:54 +08:00
|
|
|
shard,
|
|
|
|
|
getShardDataProvider(shard),
|
2009-05-02 06:01:04 +08:00
|
|
|
outputMerger );
|
2009-05-02 03:34:09 +08:00
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
Future traverseResult = threadPool.submit(traverser);
|
2009-05-02 03:34:09 +08:00
|
|
|
|
2009-05-02 05:40:46 +08:00
|
|
|
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
|
|
|
|
reduceTree.addEntry( traverseResult );
|
|
|
|
|
|
|
|
|
|
// No more data? Let the reduce tree know so it can finish processing what it's got.
|
|
|
|
|
if( !isShardTraversePending() )
|
|
|
|
|
reduceTree.complete();
|
|
|
|
|
|
2009-05-02 06:01:04 +08:00
|
|
|
outputMergeTasks.add(outputMerger);
|
2009-05-02 05:40:46 +08:00
|
|
|
|
|
|
|
|
return traverseResult;
|
2009-04-30 04:26:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Pulls the next reduce from the queue and runs it.
|
|
|
|
|
*/
|
|
|
|
|
protected void queueNextTreeReduce( Walker walker ) {
|
|
|
|
|
if( reduceTasks.size() == 0 )
|
|
|
|
|
throw new IllegalStateException( "Cannot reduce; no pending reduces exist.");
|
|
|
|
|
TreeReduceTask reducer = reduceTasks.remove();
|
|
|
|
|
reducer.setWalker( (TreeReducible)walker );
|
|
|
|
|
|
|
|
|
|
threadPool.submit( reducer );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Blocks until a free slot appears in the thread queue.
|
|
|
|
|
*/
|
|
|
|
|
protected void waitForFreeQueueSlot() {
|
|
|
|
|
ThreadPoolMonitor monitor = new ThreadPoolMonitor();
|
|
|
|
|
synchronized(monitor) {
|
|
|
|
|
threadPool.submit( monitor );
|
|
|
|
|
monitor.watch();
|
|
|
|
|
}
|
2009-04-27 07:08:12 +08:00
|
|
|
}
|
2009-04-30 04:26:16 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Callback for adding reduce tasks to the run queue.
|
|
|
|
|
* @return A new, composite future of the result of this reduce.
|
|
|
|
|
*/
|
|
|
|
|
public Future notifyReduce( Future lhs, Future rhs ) {
|
|
|
|
|
TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( lhs, rhs ) );
|
|
|
|
|
reduceTasks.add(reducer);
|
|
|
|
|
return reducer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics.
|
|
|
|
|
*/
|
|
|
|
|
private class TreeReduceTask extends FutureTask {
|
|
|
|
|
private TreeReducer treeReducer = null;
|
|
|
|
|
|
|
|
|
|
public TreeReduceTask( TreeReducer treeReducer ) {
|
|
|
|
|
super(treeReducer);
|
|
|
|
|
this.treeReducer = treeReducer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void setWalker( TreeReducible walker ) {
|
|
|
|
|
treeReducer.setWalker( walker );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public boolean isReadyForReduce() {
|
|
|
|
|
return treeReducer.isReadyForReduce();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-05-02 03:34:09 +08:00
|
|
|
|
2009-04-27 07:08:12 +08:00
|
|
|
}
|