diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index b3454ef50..5f282bf63 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -3,26 +3,20 @@ package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; -import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; -import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; -import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.io.File; import java.util.List; import java.util.Queue; import java.util.LinkedList; -import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** @@ -60,7 +54,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); - traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); } @@ -143,7 +136,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) { if( traverseTasks.size() == 0 ) throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); - ShardTraverser traverser = new ShardTraverser( walker, traverseTasks.remove(), dataSource ); + ShardTraverser traverser = new ShardTraverser( traversalEngine, + walker, + traverseTasks.remove(), + reference, + dataSource ); return threadPool.submit(traverser); } @@ -180,62 +177,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce return reducer; } - /** - * Carries the walker over a given shard. - */ - private class ShardTraverser implements Callable { - private Walker walker; - private Shard shard; - private SAMDataSource dataSource; - - public ShardTraverser( Walker walker, Shard shard, SAMDataSource dataSource ) { - this.walker = walker; - this.shard = shard; - this.dataSource = dataSource; - } - - public Object call() { - GenomeLoc span = shard.getGenomeLoc(); - Object accumulator = ((LocusWalker)walker).reduceInit(); - - MergingSamRecordIterator2 readShard = null; - try { - readShard = dataSource.seek( span ); - } - catch( SimpleDataSourceLoadException ex ) { - throw new RuntimeException( ex ); - } - - ReferenceProvider referenceProvider = new ReferenceProvider( reference, span ); - LocusContextProvider locusProvider = new LocusContextProvider( readShard ); - - accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); - - readShard.close(); - - return accumulator; - } - } - - /** - * Waits for a signal to come through that the thread pool has run - * a given task and therefore has a free slot. - */ - private class ThreadPoolMonitor implements Runnable { - public synchronized void watch() { - try { - wait(); - } - catch( InterruptedException ex ) { - logger.error("ThreadPoolMonitor interrupted:" + ex.getStackTrace()); - throw new RuntimeException("ThreadPoolMonitor interrupted", ex); - } - } - - public synchronized void run() { - notify(); - } - } /** * A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. @@ -257,54 +198,4 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce } } - - /** - * Represents a 'potential' reduce...a reduce that will be ready at some point in the future. - * Provides services for indicating when all data is prepared for the reduce and services to - * actually make that reduce happen. - */ - private class TreeReducer implements Callable { - private TreeReducible walker; - private final Future lhs; - private final Future rhs; - - public TreeReducer( Future lhs ) { - this( lhs, null ); - } - - public TreeReducer( Future lhs, Future rhs ) { - this.lhs = lhs; - this.rhs = rhs; - } - - public void setWalker( TreeReducible walker ) { - this.walker = walker; - } - - public boolean isReadyForReduce() { - if( lhs == null ) - throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); - - if( rhs == null ) - return lhs.isDone(); - - return lhs.isDone() && rhs.isDone(); - } - - public Object call() { - try { - if( lhs == null ) - return lhs.get(); - else - return walker.reduce( lhs.get(), rhs.get() ); - } - catch( InterruptedException ex ) { - throw new RuntimeException("Hierarchical reduce interrupted", ex); - } - catch( ExecutionException ex ) { - throw new RuntimeException("Hierarchical reduce failed", ex); - } - } - } - } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java new file mode 100755 index 000000000..774318c36 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -0,0 +1,71 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; +import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; +import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; +import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; +import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; + +import java.util.concurrent.Callable; +/** + * User: hanna + * Date: Apr 29, 2009 + * Time: 4:40:38 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ +/** + * Carries the walker over a given shard, in a callable interface. + */ +public class ShardTraverser implements Callable { + private Walker walker; + private TraverseLociByReference traversalEngine; + private Shard shard; + private IndexedFastaSequenceFile reference; + private SAMDataSource reads; + + public ShardTraverser( TraverseLociByReference traversalEngine, + Walker walker, + Shard shard, + IndexedFastaSequenceFile reference, + SAMDataSource reads ) { + this.walker = walker; + this.traversalEngine = traversalEngine; + this.shard = shard; + this.reference = reference; + this.reads = reads; + } + + public Object call() { + GenomeLoc span = shard.getGenomeLoc(); + Object accumulator = ((LocusWalker)walker).reduceInit(); + + MergingSamRecordIterator2 readShard = null; + try { + readShard = reads.seek( span ); + } + catch( SimpleDataSourceLoadException ex ) { + throw new RuntimeException( ex ); + } + + ReferenceProvider referenceProvider = new ReferenceProvider( reference, span ); + LocusContextProvider locusProvider = new LocusContextProvider( readShard ); + + accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + + readShard.close(); + + return accumulator; + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java new file mode 100755 index 000000000..693157847 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -0,0 +1,90 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.gatk.walkers.TreeReducible; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +/** + * User: hanna + * Date: Apr 29, 2009 + * Time: 4:47:35 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Represents a future reduce...a reduce that will be ready at some point in the future. + * Provides services for indicating when all data is prepared for the reduce a callable + * interface to force the reduce. + */ +public class TreeReducer implements Callable { + private TreeReducible walker; + private final Future lhs; + private final Future rhs; + + /** + * Create a one-sided reduce. Result will be a simple pass-through of the result. + * @param lhs The one side of the reduce. + */ + public TreeReducer( Future lhs ) { + this( lhs, null ); + } + + /** + * Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future. + * @param lhs Left-hand side of the reduce. + * @param rhs Right-hand side of the reduce. + */ + public TreeReducer( Future lhs, Future rhs ) { + this.lhs = lhs; + this.rhs = rhs; + } + + /** + * Provide a walker for the future reduce. + * @param walker walker to use when performing the reduce. + */ + public void setWalker( TreeReducible walker ) { + this.walker = walker; + } + + /** + * Is the data ready for reduce? True if lhs and rhs have already been resolved. + * @return True if data is ready and waiting, false otherwise. + */ + public boolean isReadyForReduce() { + if( lhs == null ) + throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) ); + + if( rhs == null ) + return lhs.isDone(); + + return lhs.isDone() && rhs.isDone(); + } + + /** + * Returns the value of the reduce. If not isReadyForReduce(), this call will until all entries become ready. + * @return Result of the reduce. + */ + public Object call() { + try { + if( lhs == null ) + return lhs.get(); + else + return walker.reduce( lhs.get(), rhs.get() ); + } + catch( InterruptedException ex ) { + throw new RuntimeException("Hierarchical reduce interrupted", ex); + } + catch( ExecutionException ex ) { + throw new RuntimeException("Hierarchical reduce failed", ex); + } + } +} + diff --git a/java/src/org/broadinstitute/sting/utils/threading/ThreadPoolMonitor.java b/java/src/org/broadinstitute/sting/utils/threading/ThreadPoolMonitor.java new file mode 100755 index 000000000..92240911a --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/ThreadPoolMonitor.java @@ -0,0 +1,52 @@ +package org.broadinstitute.sting.utils.threading; + +import org.apache.log4j.Logger; +/** + * User: hanna + * Date: Apr 29, 2009 + * Time: 4:27:58 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Waits for a signal to come through that the thread pool has run + * a given task and therefore has a free slot. + * + * Make sure, that, when using, the submit and the run are both + * protected by the same synchronized(monitor) lock. See the test + * case for an example. + */ +public class ThreadPoolMonitor implements Runnable { + /** + * Logger for reporting interruptions, etc. + */ + private static Logger logger = Logger.getLogger(ThreadPoolMonitor.class); + + /** + * Watch the monitor + */ + public synchronized void watch() { + try { + wait(); + } + catch( InterruptedException ex ) { + logger.error("ThreadPoolMonitor interrupted:" + ex.getStackTrace()); + throw new RuntimeException("ThreadPoolMonitor interrupted", ex); + } + } + + /** + * Instruct the monitor that the thread pool has run for the class. + * Only the thread pool should execute this method. + */ + public synchronized void run() { + notify(); + } +} + diff --git a/java/test/org/broadinstitute/sting/utils/threading/ThreadPoolMonitorTest.java b/java/test/org/broadinstitute/sting/utils/threading/ThreadPoolMonitorTest.java new file mode 100755 index 000000000..551533fec --- /dev/null +++ b/java/test/org/broadinstitute/sting/utils/threading/ThreadPoolMonitorTest.java @@ -0,0 +1,37 @@ +package org.broadinstitute.sting.utils.threading; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** + * User: hanna + * Date: Apr 29, 2009 + * Time: 4:30:55 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Tests for the thread pool monitor class. + */ + +public class ThreadPoolMonitorTest { + private ExecutorService threadPool = Executors.newFixedThreadPool(1); + + /** + * Test to make sure the thread pool wait works properly. + */ + @Test(timeout=2000) + public void testThreadPoolMonitor() { + ThreadPoolMonitor monitor = new ThreadPoolMonitor(); + synchronized(monitor) { + threadPool.submit(monitor); + monitor.watch(); + } + } +}