From 4036f2490913b9afa2c8a88ddbcdd68300df93b5 Mon Sep 17 00:00:00 2001 From: hanna Date: Sun, 26 Apr 2009 17:42:00 +0000 Subject: [PATCH] Documentation and cleanup work in preparation for parallelism. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@538 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GenomeAnalysisTK.java | 21 ++-- ...roManager.java => LinearMicroManager.java} | 54 +-------- .../sting/gatk/executive/MicroScheduler.java | 113 ++++++++++++++++++ 3 files changed, 127 insertions(+), 61 deletions(-) rename java/src/org/broadinstitute/sting/gatk/executive/{MicroManager.java => LinearMicroManager.java} (59%) create mode 100755 java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java index dc69d12fe..542d5df2e 100644 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java @@ -10,7 +10,8 @@ import net.sf.samtools.util.RuntimeIOException; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.executive.MicroManager; +import org.broadinstitute.sting.gatk.executive.LinearMicroManager; +import org.broadinstitute.sting.gatk.executive.MicroScheduler; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.traversals.*; @@ -245,7 +246,7 @@ public class GenomeAnalysisTK extends CommandLineProgram { throw new RuntimeException( "Unable to access walker", ex ); } - MicroManager microManager = null; + MicroScheduler microScheduler = null; // Get the walker specified if ( my_walker instanceof LocusWalker ) { @@ -267,8 +268,8 @@ public class GenomeAnalysisTK extends CommandLineProgram { // is not filtered. if( !DISABLE_THREADING ) { logger.warn("Preliminary threading support ENABLED"); - microManager = new MicroManager( INPUT_FILES, REF_FILE_ARG, numThreads ); - this.engine = microManager.getTraversalEngine(); + microScheduler = new LinearMicroManager( INPUT_FILES, REF_FILE_ARG, numThreads ); + this.engine = microScheduler.getTraversalEngine(); } else { logger.warn("Preliminary threading support DISABLED"); @@ -331,15 +332,9 @@ public class GenomeAnalysisTK extends CommandLineProgram { engine.setWalkOverAllSites(WALK_ALL_LOCI); engine.initialize(); - if( microManager != null ) { - List locs; - if (INTERVALS_FILE != null) { - locs = GenomeLoc.IntervalFileToList(INTERVALS_FILE); - microManager.setIntervalList(locs); - } else { - locs = GenomeLoc.parseGenomeLocs( REGION_STR ); - } - microManager.execute( my_walker, locs ); + if( microScheduler != null ) { + List locs = GenomeLoc.parseGenomeLocs( REGION_STR ); + microScheduler.execute( my_walker, locs ); } else engine.traverse(my_walker); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroManager.java similarity index 59% rename from java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java rename to java/src/org/broadinstitute/sting/gatk/executive/LinearMicroManager.java index 25f79b9a2..c5e899eb0 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroManager.java @@ -1,11 +1,9 @@ package org.broadinstitute.sting.gatk.executive; -import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; -import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; @@ -17,72 +15,32 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.io.File; -import java.io.FileNotFoundException; import java.util.List; /** * A micro-scheduling manager for N-way threaded execution of a traversal * */ -public class MicroManager { - private static long SHARD_SIZE = 100000L; - - private List reads; - private IndexedFastaSequenceFile ref; +public class LinearMicroManager extends MicroScheduler { private TraverseLociByReference traversalEngine = null; - protected static Logger logger = Logger.getLogger(MicroManager.class); - - protected List intervalList = null; - public TraversalEngine getTraversalEngine() { return traversalEngine; } - public MicroManager( List reads, // the reads file(s) + public LinearMicroManager( List reads, // the reads file(s) File refFile, // the reference file driving the traversal int nThreadsToUse ) { // maximum number of threads to use to do the work - - this.reads = reads; - try { - ref = new IndexedFastaSequenceFile(refFile); - } - catch( FileNotFoundException ex ) { - throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex); - } - GenomeLoc.setupRefContigOrdering(ref); - + super( reads, refFile ); traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); } - public void setIntervalList(List intervalList) { - this.intervalList = intervalList; - } - public void execute( Walker walker, // the analysis technique to use. List locations ) { // list of work to do - ShardStrategy shardStrategy = null; - if( locations != null ) - shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, - ref.getSequenceDictionary(), - SHARD_SIZE, - locations ); - else - shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, - ref.getSequenceDictionary(), - SHARD_SIZE ); - SAMDataSource dataSource = null; - try { - dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) ); - } - catch( SimpleDataSourceLoadException ex ) { - throw new RuntimeException( ex ); - } - catch( FileNotFoundException ex ) { - throw new RuntimeException( ex ); - } + ShardStrategy shardStrategy = getShardStrategy( reference, locations ); + SAMDataSource dataSource = getReadsDataSource(); boolean walkerInitialized = false; Object accumulator = null; @@ -98,7 +56,7 @@ public class MicroManager { throw new RuntimeException( ex ); } - ReferenceProvider referenceProvider = new ReferenceProvider( ref, span ); + ReferenceProvider referenceProvider = new ReferenceProvider( reference, span ); LocusContextProvider locusProvider = new LocusContextProvider( readShard ); // set the sam header of the traversal engine diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java new file mode 100755 index 000000000..a26218f92 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -0,0 +1,113 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; +import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; + +import java.util.List; +import java.io.FileNotFoundException; +import java.io.File; + +import edu.mit.broad.picard.reference.ReferenceSequenceFile; + +/** + * Created by IntelliJ IDEA. + * User: mhanna + * Date: Apr 26, 2009 + * Time: 12:37:23 PM + * To change this template use File | Settings | File Templates. + */ +public abstract class MicroScheduler { + private List reads; + private static long SHARD_SIZE = 100000L; + + protected static Logger logger = Logger.getLogger(LinearMicroManager.class); + + protected IndexedFastaSequenceFile reference; + + /** + * Create a microscheduler given the reads and reference. + * @param reads The reads. + * @param refFile File pointer to the reference. + */ + protected MicroScheduler( List reads, File refFile ) { + this.reads = reads; + this.reference = openReferenceSequenceFile( refFile ); + } + + /** + * A temporary getter for the traversal engine. In the future, clients + * of the microscheduler shouldn't need to know anything about the traversal engine. + * @return The traversal engine. + */ + public abstract TraversalEngine getTraversalEngine(); + + /** + * Walks a walker over the given list of intervals. + * @param walker Computation to perform over dataset. + * @param intervals A list of intervals over which to walk. Null for whole dataset. + */ + public abstract void execute( Walker walker, List intervals); + + /** + * Get the sharding strategy given a driving data source. + * @param drivingDataSource Data on which to shard. + * @param intervals Intervals to use when limiting sharding. + * @return Sharding strategy for this driving data source. + */ + protected ShardStrategy getShardStrategy( ReferenceSequenceFile drivingDataSource, List intervals ) { + ShardStrategy shardStrategy = null; + if( intervals != null ) + shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, + drivingDataSource.getSequenceDictionary(), + SHARD_SIZE, + intervals ); + else + shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, + drivingDataSource.getSequenceDictionary(), + SHARD_SIZE ); + + return shardStrategy; + } + + /** + * Gets a data source for the given set of reads. + * @return A data source for the given set of reads. + */ + protected SAMDataSource getReadsDataSource() { + SAMDataSource dataSource = null; + try { + dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) ); + } + catch( SimpleDataSourceLoadException ex ) { + throw new RuntimeException( ex ); + } + catch( FileNotFoundException ex ) { + throw new RuntimeException( ex ); + } + return dataSource; + } + + /** + * Opens a reference sequence file paired with an index. + * @param refFile Handle to a reference sequence file. Non-null. + * @return A thread-safe file wrapper. + */ + private IndexedFastaSequenceFile openReferenceSequenceFile( File refFile ) { + IndexedFastaSequenceFile ref = null; + try { + ref = new IndexedFastaSequenceFile(refFile); + } + catch( FileNotFoundException ex ) { + throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex); + } + GenomeLoc.setupRefContigOrdering(ref); + return ref; + } +}