diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java index 2b77f44b1..193b83773 100644 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java @@ -11,7 +11,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.executive.MicroScheduler; -import org.broadinstitute.sting.gatk.executive.LinearMicroScheduler; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.traversals.*; @@ -268,7 +267,7 @@ public class GenomeAnalysisTK extends CommandLineProgram { // is not filtered. if( !DISABLE_THREADING ) { logger.warn("Preliminary threading support ENABLED"); - microScheduler = new LinearMicroScheduler( INPUT_FILES, REF_FILE_ARG, numThreads ); + microScheduler = MicroScheduler.create( walker, INPUT_FILES, REF_FILE_ARG, numThreads ); this.engine = microScheduler.getTraversalEngine(); } else { diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java new file mode 100755 index 000000000..ca727a22e --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -0,0 +1,50 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.utils.GenomeLoc; + +import java.io.File; +import java.util.List; + +/** + * Created by IntelliJ IDEA. + * User: mhanna + * Date: Apr 26, 2009 + * Time: 5:41:04 PM + * To change this template use File | Settings | File Templates. + */ + +/** + * A microscheduler that schedules shards according to a tree-like structure. + * Requires a special walker tagged with a 'TreeReducible' interface. + */ +public class HierarchicalMicroScheduler extends MicroScheduler { + /** + * How many threads should the hierarchical scheduler try to keep busy. + */ + private int nThreadsToUse; + + private TraverseLociByReference traversalEngine = null; + + /** + * Create a new hierarchical microscheduler to process the given reads and reference. + * @param reads Reads file(s) to process. + * @param refFile Reference for driving the traversal. + * @param nThreadsToUse maximum number of threads to use to do the work + */ + protected HierarchicalMicroScheduler( List reads, File refFile, int nThreadsToUse ) { + super( reads, refFile ); + this.nThreadsToUse = nThreadsToUse; + traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); + } + + public TraversalEngine getTraversalEngine() { + return traversalEngine; + } + + public void execute( Walker walker, List intervals ) { + + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 39ae1c6b6..f69942995 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -17,8 +17,7 @@ import java.io.File; import java.util.List; /** - * A micro-scheduling manager for N-way threaded execution of a traversal - * + * A micro-scheduling manager for single-threaded execution of a traversal. */ public class LinearMicroScheduler extends MicroScheduler { @@ -28,16 +27,22 @@ public class LinearMicroScheduler extends MicroScheduler { return traversalEngine; } - public LinearMicroScheduler( List reads, // the reads file(s) - File refFile, // the reference file driving the traversal - int nThreadsToUse ) { // maximum number of threads to use to do the work + /** + * Create a new linear microscheduler to process the given reads and reference. + * @param reads Reads file(s) to process. + * @param refFile Reference for driving the traversal. + */ + protected LinearMicroScheduler( List reads, File refFile ) { super( reads, refFile ); traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); } - public void execute( Walker walker, // the analysis technique to use. - List locations ) { // list of work to do - + /** + * Run this traversal over the specified subsection of the dataset. + * @param walker Computation to perform over dataset. + * @param locations Subset of the dataset over which to walk. + */ + public void execute( Walker walker, List locations ) { ShardStrategy shardStrategy = getShardStrategy( reference, locations ); SAMDataSource dataSource = getReadsDataSource(); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index aab8836f5..5d1e4ec0d 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -7,6 +7,7 @@ import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; @@ -23,6 +24,10 @@ import edu.mit.broad.picard.reference.ReferenceSequenceFile; * Time: 12:37:23 PM * To change this template use File | Settings | File Templates. */ + +/** + * Shards and schedules data in manageable chunks. + */ public abstract class MicroScheduler { private List reads; private static long SHARD_SIZE = 100000L; @@ -31,6 +36,24 @@ public abstract class MicroScheduler { protected IndexedFastaSequenceFile reference; + /** + * MicroScheduler factory function. Create a microscheduler appropriate for reducing the + * selected walker. + * @param walker Which walker to use. + * @param nThreadsToUse Number of threads to utilize. + * @return The best-fit microscheduler. + */ + public static MicroScheduler create( Walker walker, List reads, File ref, int nThreadsToUse ) { + if( walker instanceof TreeReducible && nThreadsToUse > 1 ) { + logger.info("Creating hierarchical microscheduler"); + return new HierarchicalMicroScheduler( reads, ref, nThreadsToUse ); + } + else { + logger.info("Creating linear microscheduler"); + return new LinearMicroScheduler( reads, ref ); + } + } + /** * Create a microscheduler given the reads and reference. * @param reads The reads. diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java index fc491790f..a37c3a347 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java @@ -13,13 +13,17 @@ import java.util.List; * Time: 3:22:14 PM * To change this template use File | Settings | File Templates. */ -public class CountLociWalker extends LocusWalker { +public class CountLociWalker extends LocusWalker implements TreeReducible { public Integer map(RefMetaDataTracker tracker, char ref, LocusContext context) { return 1; } public Integer reduceInit() { return 0; } + /** + * Reduces two subtrees together. In this case, the implementation of the tree reduce + * is exactly the same as the implementation of the single reduce. + */ public Integer reduce(Integer value, Integer sum) { return value + sum; } diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java b/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java new file mode 100755 index 000000000..0b76f16a0 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java @@ -0,0 +1,24 @@ +package org.broadinstitute.sting.gatk.walkers; + +/** + * Created by IntelliJ IDEA. + * User: mhanna + * Date: Apr 26, 2009 + * Time: 5:34:11 PM + * To change this template use File | Settings | File Templates. + */ + +/** + * Indicates that a class is tree reducible, aka that any two adjacent + * shards of the data can reduce with each other, and the composite result + * can be reduced with other composite results. + */ +public interface TreeReducible { + /** + * A composite, 'reduce of reduces' function. + * @param lhs 'left-most' portion of data in the composite reduce. + * @param rhs 'right-most' portion of data in the composite reduce. + * @return The composite reduce type. + */ + ReduceType reduce(ReduceType lhs, ReduceType rhs); +}