Break out some of the weird inner classes out of the HierachicalMicroScheduler.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@566 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-04-29 21:07:07 +00:00
parent 95d10ba314
commit ba9a0b5da8
5 changed files with 256 additions and 115 deletions

View File

@ -3,26 +3,20 @@ package org.broadinstitute.sting.gatk.executive;
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
import java.io.File;
import java.util.List;
import java.util.Queue;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
@ -60,7 +54,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
}
@ -143,7 +136,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) {
if( traverseTasks.size() == 0 )
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
ShardTraverser traverser = new ShardTraverser( walker, traverseTasks.remove(), dataSource );
ShardTraverser traverser = new ShardTraverser( traversalEngine,
walker,
traverseTasks.remove(),
reference,
dataSource );
return threadPool.submit(traverser);
}
@ -180,62 +177,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
return reducer;
}
/**
* Carries the walker over a given shard.
*/
private class ShardTraverser implements Callable {
private Walker walker;
private Shard shard;
private SAMDataSource dataSource;
public ShardTraverser( Walker walker, Shard shard, SAMDataSource dataSource ) {
this.walker = walker;
this.shard = shard;
this.dataSource = dataSource;
}
public Object call() {
GenomeLoc span = shard.getGenomeLoc();
Object accumulator = ((LocusWalker<?,?>)walker).reduceInit();
MergingSamRecordIterator2 readShard = null;
try {
readShard = dataSource.seek( span );
}
catch( SimpleDataSourceLoadException ex ) {
throw new RuntimeException( ex );
}
ReferenceProvider referenceProvider = new ReferenceProvider( reference, span );
LocusContextProvider locusProvider = new LocusContextProvider( readShard );
accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator );
readShard.close();
return accumulator;
}
}
/**
* Waits for a signal to come through that the thread pool has run
* a given task and therefore has a free slot.
*/
private class ThreadPoolMonitor implements Runnable {
public synchronized void watch() {
try {
wait();
}
catch( InterruptedException ex ) {
logger.error("ThreadPoolMonitor interrupted:" + ex.getStackTrace());
throw new RuntimeException("ThreadPoolMonitor interrupted", ex);
}
}
public synchronized void run() {
notify();
}
}
/**
* A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics.
@ -257,54 +198,4 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
}
}
/**
* Represents a 'potential' reduce...a reduce that will be ready at some point in the future.
* Provides services for indicating when all data is prepared for the reduce and services to
* actually make that reduce happen.
*/
private class TreeReducer implements Callable {
private TreeReducible walker;
private final Future lhs;
private final Future rhs;
public TreeReducer( Future lhs ) {
this( lhs, null );
}
public TreeReducer( Future lhs, Future rhs ) {
this.lhs = lhs;
this.rhs = rhs;
}
public void setWalker( TreeReducible walker ) {
this.walker = walker;
}
public boolean isReadyForReduce() {
if( lhs == null )
throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
if( rhs == null )
return lhs.isDone();
return lhs.isDone() && rhs.isDone();
}
public Object call() {
try {
if( lhs == null )
return lhs.get();
else
return walker.reduce( lhs.get(), rhs.get() );
}
catch( InterruptedException ex ) {
throw new RuntimeException("Hierarchical reduce interrupted", ex);
}
catch( ExecutionException ex ) {
throw new RuntimeException("Hierarchical reduce failed", ex);
}
}
}
}

View File

@ -0,0 +1,71 @@
package org.broadinstitute.sting.gatk.executive;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import java.util.concurrent.Callable;
/**
* User: hanna
* Date: Apr 29, 2009
* Time: 4:40:38 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Carries the walker over a given shard, in a callable interface.
*/
public class ShardTraverser implements Callable {
private Walker walker;
private TraverseLociByReference traversalEngine;
private Shard shard;
private IndexedFastaSequenceFile reference;
private SAMDataSource reads;
public ShardTraverser( TraverseLociByReference traversalEngine,
Walker walker,
Shard shard,
IndexedFastaSequenceFile reference,
SAMDataSource reads ) {
this.walker = walker;
this.traversalEngine = traversalEngine;
this.shard = shard;
this.reference = reference;
this.reads = reads;
}
public Object call() {
GenomeLoc span = shard.getGenomeLoc();
Object accumulator = ((LocusWalker<?,?>)walker).reduceInit();
MergingSamRecordIterator2 readShard = null;
try {
readShard = reads.seek( span );
}
catch( SimpleDataSourceLoadException ex ) {
throw new RuntimeException( ex );
}
ReferenceProvider referenceProvider = new ReferenceProvider( reference, span );
LocusContextProvider locusProvider = new LocusContextProvider( readShard );
accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator );
readShard.close();
return accumulator;
}
}

View File

@ -0,0 +1,90 @@
package org.broadinstitute.sting.gatk.executive;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
/**
* User: hanna
* Date: Apr 29, 2009
* Time: 4:47:35 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Represents a future reduce...a reduce that will be ready at some point in the future.
* Provides services for indicating when all data is prepared for the reduce a callable
* interface to force the reduce.
*/
public class TreeReducer implements Callable {
private TreeReducible walker;
private final Future lhs;
private final Future rhs;
/**
* Create a one-sided reduce. Result will be a simple pass-through of the result.
* @param lhs The one side of the reduce.
*/
public TreeReducer( Future lhs ) {
this( lhs, null );
}
/**
* Create a full tree reduce. Combine this two results using an unspecified walker at some point in the future.
* @param lhs Left-hand side of the reduce.
* @param rhs Right-hand side of the reduce.
*/
public TreeReducer( Future lhs, Future rhs ) {
this.lhs = lhs;
this.rhs = rhs;
}
/**
* Provide a walker for the future reduce.
* @param walker walker to use when performing the reduce.
*/
public void setWalker( TreeReducible walker ) {
this.walker = walker;
}
/**
* Is the data ready for reduce? True if lhs and rhs have already been resolved.
* @return True if data is ready and waiting, false otherwise.
*/
public boolean isReadyForReduce() {
if( lhs == null )
throw new IllegalStateException(String.format("Insufficient data on which to reduce; lhs = %s, rhs = %s", lhs, rhs) );
if( rhs == null )
return lhs.isDone();
return lhs.isDone() && rhs.isDone();
}
/**
* Returns the value of the reduce. If not isReadyForReduce(), this call will until all entries become ready.
* @return Result of the reduce.
*/
public Object call() {
try {
if( lhs == null )
return lhs.get();
else
return walker.reduce( lhs.get(), rhs.get() );
}
catch( InterruptedException ex ) {
throw new RuntimeException("Hierarchical reduce interrupted", ex);
}
catch( ExecutionException ex ) {
throw new RuntimeException("Hierarchical reduce failed", ex);
}
}
}

View File

@ -0,0 +1,52 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
/**
* User: hanna
* Date: Apr 29, 2009
* Time: 4:27:58 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Waits for a signal to come through that the thread pool has run
* a given task and therefore has a free slot.
*
* Make sure, that, when using, the submit and the run are both
* protected by the same synchronized(monitor) lock. See the test
* case for an example.
*/
public class ThreadPoolMonitor implements Runnable {
/**
* Logger for reporting interruptions, etc.
*/
private static Logger logger = Logger.getLogger(ThreadPoolMonitor.class);
/**
* Watch the monitor
*/
public synchronized void watch() {
try {
wait();
}
catch( InterruptedException ex ) {
logger.error("ThreadPoolMonitor interrupted:" + ex.getStackTrace());
throw new RuntimeException("ThreadPoolMonitor interrupted", ex);
}
}
/**
* Instruct the monitor that the thread pool has run for the class.
* Only the thread pool should execute this method.
*/
public synchronized void run() {
notify();
}
}

View File

@ -0,0 +1,37 @@
package org.broadinstitute.sting.utils.threading;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; /**
* User: hanna
* Date: Apr 29, 2009
* Time: 4:30:55 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute.
* All rights are reserved.
*
* Users acknowledge that this software is supplied without any warranty or support.
* The Broad Institute is not responsible for its use, misuse, or
* functionality.
*/
/**
* Tests for the thread pool monitor class.
*/
public class ThreadPoolMonitorTest {
private ExecutorService threadPool = Executors.newFixedThreadPool(1);
/**
* Test to make sure the thread pool wait works properly.
*/
@Test(timeout=2000)
public void testThreadPoolMonitor() {
ThreadPoolMonitor monitor = new ThreadPoolMonitor();
synchronized(monitor) {
threadPool.submit(monitor);
monitor.watch();
}
}
}