Placeholder for parallel MicroManager.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@542 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
1daa011387
commit
9a8902571c
|
|
@ -11,7 +11,6 @@ import org.apache.commons.cli.Option;
|
||||||
import org.apache.commons.cli.OptionBuilder;
|
import org.apache.commons.cli.OptionBuilder;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.broadinstitute.sting.gatk.executive.MicroScheduler;
|
import org.broadinstitute.sting.gatk.executive.MicroScheduler;
|
||||||
import org.broadinstitute.sting.gatk.executive.LinearMicroScheduler;
|
|
||||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||||
import org.broadinstitute.sting.gatk.traversals.*;
|
import org.broadinstitute.sting.gatk.traversals.*;
|
||||||
|
|
@ -268,7 +267,7 @@ public class GenomeAnalysisTK extends CommandLineProgram {
|
||||||
// is not filtered.
|
// is not filtered.
|
||||||
if( !DISABLE_THREADING ) {
|
if( !DISABLE_THREADING ) {
|
||||||
logger.warn("Preliminary threading support ENABLED");
|
logger.warn("Preliminary threading support ENABLED");
|
||||||
microScheduler = new LinearMicroScheduler( INPUT_FILES, REF_FILE_ARG, numThreads );
|
microScheduler = MicroScheduler.create( walker, INPUT_FILES, REF_FILE_ARG, numThreads );
|
||||||
this.engine = microScheduler.getTraversalEngine();
|
this.engine = microScheduler.getTraversalEngine();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
|
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||||
|
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||||
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
|
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by IntelliJ IDEA.
|
||||||
|
* User: mhanna
|
||||||
|
* Date: Apr 26, 2009
|
||||||
|
* Time: 5:41:04 PM
|
||||||
|
* To change this template use File | Settings | File Templates.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A microscheduler that schedules shards according to a tree-like structure.
|
||||||
|
* Requires a special walker tagged with a 'TreeReducible' interface.
|
||||||
|
*/
|
||||||
|
public class HierarchicalMicroScheduler extends MicroScheduler {
|
||||||
|
/**
|
||||||
|
* How many threads should the hierarchical scheduler try to keep busy.
|
||||||
|
*/
|
||||||
|
private int nThreadsToUse;
|
||||||
|
|
||||||
|
private TraverseLociByReference traversalEngine = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new hierarchical microscheduler to process the given reads and reference.
|
||||||
|
* @param reads Reads file(s) to process.
|
||||||
|
* @param refFile Reference for driving the traversal.
|
||||||
|
* @param nThreadsToUse maximum number of threads to use to do the work
|
||||||
|
*/
|
||||||
|
protected HierarchicalMicroScheduler( List<File> reads, File refFile, int nThreadsToUse ) {
|
||||||
|
super( reads, refFile );
|
||||||
|
this.nThreadsToUse = nThreadsToUse;
|
||||||
|
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
|
||||||
|
}
|
||||||
|
|
||||||
|
public TraversalEngine getTraversalEngine() {
|
||||||
|
return traversalEngine;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute( Walker walker, List<GenomeLoc> intervals ) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,8 +17,7 @@ import java.io.File;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A micro-scheduling manager for N-way threaded execution of a traversal
|
* A micro-scheduling manager for single-threaded execution of a traversal.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class LinearMicroScheduler extends MicroScheduler {
|
public class LinearMicroScheduler extends MicroScheduler {
|
||||||
|
|
||||||
|
|
@ -28,16 +27,22 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
return traversalEngine;
|
return traversalEngine;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinearMicroScheduler( List<File> reads, // the reads file(s)
|
/**
|
||||||
File refFile, // the reference file driving the traversal
|
* Create a new linear microscheduler to process the given reads and reference.
|
||||||
int nThreadsToUse ) { // maximum number of threads to use to do the work
|
* @param reads Reads file(s) to process.
|
||||||
|
* @param refFile Reference for driving the traversal.
|
||||||
|
*/
|
||||||
|
protected LinearMicroScheduler( List<File> reads, File refFile ) {
|
||||||
super( reads, refFile );
|
super( reads, refFile );
|
||||||
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
|
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute( Walker walker, // the analysis technique to use.
|
/**
|
||||||
List<GenomeLoc> locations ) { // list of work to do
|
* Run this traversal over the specified subsection of the dataset.
|
||||||
|
* @param walker Computation to perform over dataset.
|
||||||
|
* @param locations Subset of the dataset over which to walk.
|
||||||
|
*/
|
||||||
|
public void execute( Walker walker, List<GenomeLoc> locations ) {
|
||||||
ShardStrategy shardStrategy = getShardStrategy( reference, locations );
|
ShardStrategy shardStrategy = getShardStrategy( reference, locations );
|
||||||
SAMDataSource dataSource = getReadsDataSource();
|
SAMDataSource dataSource = getReadsDataSource();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource
|
||||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
|
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
|
||||||
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||||
|
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||||
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
||||||
|
|
||||||
|
|
@ -23,6 +24,10 @@ import edu.mit.broad.picard.reference.ReferenceSequenceFile;
|
||||||
* Time: 12:37:23 PM
|
* Time: 12:37:23 PM
|
||||||
* To change this template use File | Settings | File Templates.
|
* To change this template use File | Settings | File Templates.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shards and schedules data in manageable chunks.
|
||||||
|
*/
|
||||||
public abstract class MicroScheduler {
|
public abstract class MicroScheduler {
|
||||||
private List<File> reads;
|
private List<File> reads;
|
||||||
private static long SHARD_SIZE = 100000L;
|
private static long SHARD_SIZE = 100000L;
|
||||||
|
|
@ -31,6 +36,24 @@ public abstract class MicroScheduler {
|
||||||
|
|
||||||
protected IndexedFastaSequenceFile reference;
|
protected IndexedFastaSequenceFile reference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
||||||
|
* selected walker.
|
||||||
|
* @param walker Which walker to use.
|
||||||
|
* @param nThreadsToUse Number of threads to utilize.
|
||||||
|
* @return The best-fit microscheduler.
|
||||||
|
*/
|
||||||
|
public static MicroScheduler create( Walker walker, List<File> reads, File ref, int nThreadsToUse ) {
|
||||||
|
if( walker instanceof TreeReducible && nThreadsToUse > 1 ) {
|
||||||
|
logger.info("Creating hierarchical microscheduler");
|
||||||
|
return new HierarchicalMicroScheduler( reads, ref, nThreadsToUse );
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
logger.info("Creating linear microscheduler");
|
||||||
|
return new LinearMicroScheduler( reads, ref );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a microscheduler given the reads and reference.
|
* Create a microscheduler given the reads and reference.
|
||||||
* @param reads The reads.
|
* @param reads The reads.
|
||||||
|
|
|
||||||
|
|
@ -13,13 +13,17 @@ import java.util.List;
|
||||||
* Time: 3:22:14 PM
|
* Time: 3:22:14 PM
|
||||||
* To change this template use File | Settings | File Templates.
|
* To change this template use File | Settings | File Templates.
|
||||||
*/
|
*/
|
||||||
public class CountLociWalker extends LocusWalker<Integer, Integer> {
|
public class CountLociWalker extends LocusWalker<Integer, Integer> implements TreeReducible<Integer> {
|
||||||
public Integer map(RefMetaDataTracker tracker, char ref, LocusContext context) {
|
public Integer map(RefMetaDataTracker tracker, char ref, LocusContext context) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer reduceInit() { return 0; }
|
public Integer reduceInit() { return 0; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reduces two subtrees together. In this case, the implementation of the tree reduce
|
||||||
|
* is exactly the same as the implementation of the single reduce.
|
||||||
|
*/
|
||||||
public Integer reduce(Integer value, Integer sum) {
|
public Integer reduce(Integer value, Integer sum) {
|
||||||
return value + sum;
|
return value + sum;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
package org.broadinstitute.sting.gatk.walkers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by IntelliJ IDEA.
|
||||||
|
* User: mhanna
|
||||||
|
* Date: Apr 26, 2009
|
||||||
|
* Time: 5:34:11 PM
|
||||||
|
* To change this template use File | Settings | File Templates.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates that a class is tree reducible, aka that any two adjacent
|
||||||
|
* shards of the data can reduce with each other, and the composite result
|
||||||
|
* can be reduced with other composite results.
|
||||||
|
*/
|
||||||
|
public interface TreeReducible<ReduceType> {
|
||||||
|
/**
|
||||||
|
* A composite, 'reduce of reduces' function.
|
||||||
|
* @param lhs 'left-most' portion of data in the composite reduce.
|
||||||
|
* @param rhs 'right-most' portion of data in the composite reduce.
|
||||||
|
* @return The composite reduce type.
|
||||||
|
*/
|
||||||
|
ReduceType reduce(ReduceType lhs, ReduceType rhs);
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue