diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index ca727a22e..b3454ef50 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -3,10 +3,27 @@ package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; +import org.broadinstitute.sting.gatk.walkers.TreeReducible; +import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; +import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; +import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; +import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.utils.GenomeLoc; import java.io.File; import java.util.List; +import java.util.Queue; +import java.util.LinkedList; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; /** * Created by IntelliJ IDEA. @@ -20,14 +37,18 @@ import java.util.List; * A microscheduler that schedules shards according to a tree-like structure. * Requires a special walker tagged with a 'TreeReducible' interface. */ -public class HierarchicalMicroScheduler extends MicroScheduler { - /** - * How many threads should the hierarchical scheduler try to keep busy. - */ - private int nThreadsToUse; - +public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier { private TraverseLociByReference traversalEngine = null; + /** + * Manage currently running threads. + */ + private ExecutorService threadPool; + + private Queue traverseTasks = new LinkedList(); + private Queue reduceTasks = new LinkedList(); + + /** * Create a new hierarchical microscheduler to process the given reads and reference. * @param reads Reads file(s) to process. @@ -36,7 +57,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler { */ protected HierarchicalMicroScheduler( List reads, File refFile, int nThreadsToUse ) { super( reads, refFile ); - this.nThreadsToUse = nThreadsToUse; + + + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); } @@ -45,6 +69,242 @@ public class HierarchicalMicroScheduler extends MicroScheduler { } public void execute( Walker walker, List intervals ) { + // Fast fail for walkers not supporting TreeReducible interface. + if( !(walker instanceof TreeReducible) ) + throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers"); + + ShardStrategy shardStrategy = getShardStrategy( reference, intervals ); + SAMDataSource dataSource = getReadsDataSource(); + + ReduceTree reduceTree = new ReduceTree( this ); + + walker.initialize(); + for(Shard shard: shardStrategy) + traverseTasks.add(shard); + + while( isShardTraversePending() || isTreeReducePending() ) { + waitForFreeQueueSlot(); + + if( isTreeReduceReady() ) + queueNextTreeReduce( walker ); + else { + Future traverseResult = queueNextShardTraverse( walker, dataSource ); + + // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. + reduceTree.addEntry( traverseResult ); + + // No more data? Let the reduce tree know so it can finish processing what it's got. + if( !isShardTraversePending() ) + reduceTree.complete(); + } + } + + Object result = reduceTree.getResult(); + + traversalEngine.printOnTraversalDone("loci", result); + walker.onTraversalDone(result); } + + /** + * Returns true if there are unscheduled shard traversal waiting to run. + * @return true if a shard traversal is waiting; false otherwise. + */ + protected boolean isShardTraversePending() { + return traverseTasks.size() > 0; + } + + /** + * Returns true if there are tree reduces that can be run without + * blocking. + * @return true if a tree reduce is ready; false otherwise. + */ + protected boolean isTreeReduceReady() { + if( reduceTasks.size() == 0 ) + return false; + return reduceTasks.peek().isReadyForReduce(); + } + + /** + * Returns true if there are tree reduces that need to be run before + * the computation is complete. Returns true if any entries are in the queue, + * blocked or otherwise. + * @return true if a tree reduce is pending; false otherwise. + */ + protected boolean isTreeReducePending() { + return reduceTasks.size() > 0; + } + + /** + * Queues the next traversal of a walker from the traversal tasks queue. + * @param walker Walker to apply to the dataset. + * @param dataSource Source of the reads + */ + protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) { + if( traverseTasks.size() == 0 ) + throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); + ShardTraverser traverser = new ShardTraverser( walker, traverseTasks.remove(), dataSource ); + return threadPool.submit(traverser); + } + + /** + * Pulls the next reduce from the queue and runs it. + */ + protected void queueNextTreeReduce( Walker walker ) { + if( reduceTasks.size() == 0 ) + throw new IllegalStateException( "Cannot reduce; no pending reduces exist."); + TreeReduceTask reducer = reduceTasks.remove(); + reducer.setWalker( (TreeReducible)walker ); + + threadPool.submit( reducer ); + } + + /** + * Blocks until a free slot appears in the thread queue. + */ + protected void waitForFreeQueueSlot() { + ThreadPoolMonitor monitor = new ThreadPoolMonitor(); + synchronized(monitor) { + threadPool.submit( monitor ); + monitor.watch(); + } + } + + /** + * Callback for adding reduce tasks to the run queue. + * @return A new, composite future of the result of this reduce. + */ + public Future notifyReduce( Future lhs, Future rhs ) { + TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( lhs, rhs ) ); + reduceTasks.add(reducer); + return reducer; + } + + /** + * Carries the walker over a given shard. + */ + private class ShardTraverser implements Callable { + private Walker walker; + private Shard shard; + private SAMDataSource dataSource; + + public ShardTraverser( Walker walker, Shard shard, SAMDataSource dataSource ) { + this.walker = walker; + this.shard = shard; + this.dataSource = dataSource; + } + + public Object call() { + GenomeLoc span = shard.getGenomeLoc(); + Object accumulator = ((LocusWalker)walker).reduceInit(); + + MergingSamRecordIterator2 readShard = null; + try { + readShard = dataSource.seek( span ); + } + catch( SimpleDataSourceLoadException ex ) { + throw new RuntimeException( ex ); + } + + ReferenceProvider referenceProvider = new ReferenceProvider( reference, span ); + LocusContextProvider locusProvider = new LocusContextProvider( readShard ); + + accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + + readShard.close(); + + return accumulator; + } + } + + /** + * Waits for a signal to come through that the thread pool has run + * a given task and therefore has a free slot. + */ + private class ThreadPoolMonitor implements Runnable { + public synchronized void watch() { + try { + wait(); + } + catch( InterruptedException ex ) { + logger.error("ThreadPoolMonitor interrupted:" + ex.getStackTrace()); + throw new RuntimeException("ThreadPoolMonitor interrupted", ex); + } + } + + public synchronized void run() { + notify(); + } + } + + /** + * A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. + */ + private class TreeReduceTask extends FutureTask { + private TreeReducer treeReducer = null; + + public TreeReduceTask( TreeReducer treeReducer ) { + super(treeReducer); + this.treeReducer = treeReducer; + } + + public void setWalker( TreeReducible walker ) { + treeReducer.setWalker( walker ); + } + + public boolean isReadyForReduce() { + return treeReducer.isReadyForReduce(); + } + } + + + /** + * Represents a 'potential' reduce...a reduce that will be ready at some point in the future. + * Provides services for indicating when all data is prepared for the reduce and services to + * actually make that reduce happen. + */ + private class TreeReducer implements Callable { + private TreeReducible walker; + private final Future lhs; + private final Future rhs; + + public TreeReducer( Future lhs ) { + this( lhs, null ); + } + + public TreeReducer( Future lhs, Future rhs ) { + this.lhs = lhs; + this.rhs = rhs; + } + + public void setWalker( TreeReducible walker ) { + this.walker = walker; + } + + public boolean isReadyForReduce() { + if( lhs == null ) + throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); + + if( rhs == null ) + return lhs.isDone(); + + return lhs.isDone() && rhs.isDone(); + } + + public Object call() { + try { + if( lhs == null ) + return lhs.get(); + else + return walker.reduce( lhs.get(), rhs.get() ); + } + catch( InterruptedException ex ) { + throw new RuntimeException("Hierarchical reduce interrupted", ex); + } + catch( ExecutionException ex ) { + throw new RuntimeException("Hierarchical reduce failed", ex); + } + } + } + } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ReduceTree.java b/java/src/org/broadinstitute/sting/gatk/executive/ReduceTree.java new file mode 100755 index 000000000..151a1ba26 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/ReduceTree.java @@ -0,0 +1,162 @@ +package org.broadinstitute.sting.gatk.executive; + +import java.util.Queue; +import java.util.List; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.concurrent.Future; +/** + * User: hanna + * Date: Apr 28, 2009 + * Time: 11:09:29 AM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * A tree for organizing reduce results and detecting when enough dependencies + * are resolved for a reduce to be scheduled. The tree can trigger a callback + * whenever it believes a reduce operation is pending. + * + * Not thread-safe. All calls should be made sequentially from the same thread. + */ +public class ReduceTree { + /** + * Data structure for the tree. Each entry in the outer list represents a level + * of the tree, and each entry in the inner queues represent nodes in that level. + * + * Whenever a reduce can happen, the entries to be reduced are pulled out of + * their slots in level n of the tree and the composite entry is added to level n+1. + */ + private List> treeNodes = new ArrayList>(); + + /** + * The entire contents have been added to the tree. Completely schedule the reductions. + */ + private boolean treeComplete = false; + + /** + * Called to indicate that all data required to perform a given reduce has been scheduled. + */ + private TreeReduceNotifier treeReduceNotifier = null; + + /** + * Creates a ReduceTree. + * @param notifier A callback indicating that all data required to perform a given reduce has been scheduled. + */ + public ReduceTree( TreeReduceNotifier notifier ) { + this.treeReduceNotifier = notifier; + } + + /** + * A callback indicating that all computations have been scheduled to complete the given reduce. + */ + public interface TreeReduceNotifier { + /** + * Indicates that a reduce is ready to happen. + * @param lhs Left-hand side of the tree reduce. + * @param rhs Right-hand side of the tree reduce. + * @return The future result of the computation reduce(lhs,rhs) + */ + Future notifyReduce( Future lhs, Future rhs ); + } + + /** + * Add an entry to the list of data to be reduced. The results of entry.get() will + * be scheduled for reduction with neighboring elements. + * @param entry Entry to be paired with other elements. + */ + public void addEntry( Future entry ) { + addNodeAtLevel( entry, 0 ); + } + + /** + * Signal to the ReduceTree that all possible data has been added and it should reduce + * as much as is possible. + */ + public void complete() { + treeComplete = true; + reduce(); + } + + /** + * Gets the placeholder for the final result of the tree reduce. + * @return Future whose get() method will return the final result. Null if nothing has been added. + */ + public Future getResult() { + if( !treeComplete ) + throw new IllegalStateException( "Cannot get the final result for an incomplete tree."); + + // If nothing has been added to the tree, return null. + if( treeNodes.size() == 0 ) + return null; + + // Assert that there aren't any pending computations that were forgotten along the way. + for( int i = 0; i < treeNodes.size() - 2; i++ ) { + if( treeNodes.get(i).size() > 0 ) + throw new IllegalStateException( "Some inner reduces were missed along the way."); + } + + Queue lastLevel = treeNodes.get(treeNodes.size() - 1); + + // Assert that there's only one reduce left at the last level. + if( lastLevel.size() != 1 ) + throw new IllegalStateException( "Invalid number of entries at the tip of the tree: " + lastLevel.size() ); + + // Get the placeholder for the last result. + return lastLevel.element(); + } + + /** + * Recursively collapse the tree whereever possible. + */ + protected void reduce() { + reduce( 0 ); + } + + /** + * Recursively collapse the tree, starting at the specified level. + * @param level Level at which to start reducing. + */ + private void reduce( int level ) { + // base case for recursion. + if( treeNodes.size() <= level ) + return; + + Queue treeLevel = treeNodes.get(level); + + while( treeLevel.size() >= 2 ) { + addNodeAtLevel( treeReduceNotifier.notifyReduce( treeLevel.remove(), treeLevel.remove() ), level + 1 ); + } + + if( treeLevel.size() == 1 && treeComplete && !isDeepestLevel(level) ) { + Future element = treeLevel.remove(); + addNodeAtLevel( element, level + 1 ); + } + + reduce( level + 1 ); + } + + private boolean isDeepestLevel( int level ) { + return level == (treeNodes.size() - 1); + } + + /** + * Add the given node to the tree at the corresponding level. Create the level + * if it doesn't exist. + * @param node Node to add. Must not be null. + * @param level Level number at which to add. 0-based index into treeNodes list. + */ + protected void addNodeAtLevel( Future node, int level ) { + while( treeNodes.size() <= level ) + treeNodes.add( new LinkedList() ); + treeNodes.get(level).add(node); + reduce(level); + } + +} diff --git a/java/test/org/broadinstitute/sting/gatk/executive/ReduceTreeTest.java b/java/test/org/broadinstitute/sting/gatk/executive/ReduceTreeTest.java new file mode 100755 index 000000000..7e076250f --- /dev/null +++ b/java/test/org/broadinstitute/sting/gatk/executive/ReduceTreeTest.java @@ -0,0 +1,226 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ExecutionException; +import java.util.List; +import java.util.ArrayList; +/** + * User: hanna + * Date: Apr 29, 2009 + * Time: 10:40:49 AM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Make sure the reduce tree organizes reduces in the correct way. + */ + +public class ReduceTreeTest implements ReduceTree.TreeReduceNotifier { + + /** + * The tree indicating reduce order. + */ + private ReduceTree reduceTree = null; + + /** + * + */ + private List> reduces = new ArrayList>(); + + @Before + public void createTree() { + reduceTree = new ReduceTree( this ); + } + + @After + public void destroyTree() { + reduceTree = null; + reduces.clear(); + } + + @Test + public void testNoValueReduce() + throws InterruptedException, ExecutionException { + reduceTree.complete(); + Assert.assertEquals("Single-value reduce failed", null, reduceTree.getResult()); + } + + @Test + public void testSingleValueReduce() + throws InterruptedException, ExecutionException { + reduceTree.addEntry( getReduceTestEntry(1) ); + reduceTree.complete(); + Assert.assertEquals("Single-value reduce failed", 1, reduceTree.getResult().get()); + } + + @Test(expected=IllegalStateException.class) + public void testIncompleteReduce() + throws InterruptedException, ExecutionException { + reduceTree.addEntry( getReduceTestEntry(1) ); + reduceTree.getResult().get(); + } + + @Test + public void testDualValueReduce() + throws InterruptedException, ExecutionException { + reduceTree.addEntry( getReduceTestEntry(1) ); + reduceTree.addEntry( getReduceTestEntry(2) ); + reduceTree.complete(); + + List expected = new ArrayList(); + expected.add( 1 ); + expected.add( 2 ); + + // Test the result + Assert.assertEquals("Dual-value reduce failed", expected, reduceTree.getResult().get()); + + // Test the intermediate steps + Assert.assertEquals("Size of incoming tree reduces incorrect", 1, reduces.size() ); + Assert.assertEquals("Incoming tree reduce incorrect", expected, reduces.get(0) ); + } + + @Test + public void testThreeValueReduce() + throws InterruptedException, ExecutionException { + List firstExpected = new ArrayList(); + firstExpected.add(1); + firstExpected.add(2); + + List finalExpected = new ArrayList(); + finalExpected.addAll( firstExpected ); + finalExpected.add(3); + + reduceTree.addEntry( getReduceTestEntry(1) ); + + Assert.assertEquals("Reduce queue should be empty after entering a single element", 0, reduces.size()); + + reduceTree.addEntry( getReduceTestEntry(2) ); + + Assert.assertEquals("Reduce queue should have one element after two entries", 1, reduces.size()); + Assert.assertEquals("Reduce queue element is incorrect after two entries", firstExpected, reduces.get(0)); + + reduceTree.addEntry( getReduceTestEntry(3) ); + + Assert.assertEquals("Reduce queue should have one element after three entries", 1, reduces.size()); + Assert.assertEquals("Reduce queue element is incorrect after three entries", firstExpected, reduces.get(0)); + + reduceTree.complete(); + + // Test the result + Assert.assertEquals("Three value reduce failed", finalExpected, reduceTree.getResult().get()); + + Assert.assertEquals("Reduce queue should have two elements after three entries (complete)", 2, reduces.size()); + Assert.assertEquals("Reduce queue element is incorrect after three entries", firstExpected, reduces.get(0)); + Assert.assertEquals("Reduce queue element is incorrect after three entries", finalExpected, reduces.get(1)); + } + + @Test + public void testFourValueReduce() + throws InterruptedException, ExecutionException { + List lhsExpected = new ArrayList(); + lhsExpected.add(1); + lhsExpected.add(2); + + List rhsExpected = new ArrayList(); + rhsExpected.add(3); + rhsExpected.add(4); + + List finalExpected = new ArrayList(); + finalExpected.addAll(lhsExpected); + finalExpected.addAll(rhsExpected); + + reduceTree.addEntry( getReduceTestEntry(1) ); + + Assert.assertEquals("Reduce queue should be empty after entering a single element", 0, reduces.size()); + + reduceTree.addEntry( getReduceTestEntry(2) ); + + Assert.assertEquals("Reduce queue should have one element after two entries", 1, reduces.size()); + Assert.assertEquals("Reduce queue element is incorrect after two entries", lhsExpected, reduces.get(0)); + + reduceTree.addEntry( getReduceTestEntry(3) ); + + Assert.assertEquals("Reduce queue should have one element after three entries", 1, reduces.size()); + Assert.assertEquals("Reduce queue element is incorrect after three entries", lhsExpected, reduces.get(0)); + + reduceTree.addEntry( getReduceTestEntry(4) ); + + Assert.assertEquals("Reduce queue should have three elements after four entries", 3, reduces.size()); + Assert.assertEquals("Reduce queue element 0 is incorrect after three entries", lhsExpected, reduces.get(0)); + Assert.assertEquals("Reduce queue element 1 is incorrect after three entries", rhsExpected, reduces.get(1)); + Assert.assertEquals("Reduce queue element 2 is incorrect after three entries", finalExpected, reduces.get(2)); + + reduceTree.complete(); + + // Test the result + Assert.assertEquals("Four-valued reduce failed",finalExpected,reduceTree.getResult().get()); + + // Test the working tree + Assert.assertEquals("Didn't see correct number of reduces", 3, reduces.size()); + Assert.assertEquals("lhs of four value reduce failed", lhsExpected, reduces.get(0)); + Assert.assertEquals("rhs of four value reduce failed", rhsExpected, reduces.get(1)); + Assert.assertEquals("final value four value reduce failed", finalExpected, reduces.get(2)); + } + + + private Future getReduceTestEntry( Object value ) { + // Create a task and run it, assuring that the tests won't block on a get. + FutureTask task = new FutureTask( new ReduceTestEntry( value ) ); + task.run(); + return task; + } + + public Future notifyReduce( Future lhs, Future rhs ) { + List reduce = new ArrayList(); + + try { + if( lhs == null && rhs == null ) + throw new IllegalStateException("lhs and rhs are null"); + + if( lhs.get() instanceof List ) + reduce.addAll((List)lhs.get()); + else + reduce.add((Integer)lhs.get()); + + if( rhs != null ) { + if( rhs.get() instanceof List ) + reduce.addAll((List)rhs.get()); + else + reduce.add((Integer)rhs.get()); + } + } + catch( Exception ex ) { + // just rethrow any exceptions + throw new RuntimeException(ex); + } + + reduces.add( reduce ); + + return getReduceTestEntry( reduce ); + } + + private class ReduceTestEntry implements Callable { + private Object data; + + public ReduceTestEntry( Object data ) { + this.data = data; + } + + public Object call() { + return data; + } + } +}