Documentation and cleanup work in preparation for parallelism.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@538 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-04-26 17:42:00 +00:00
parent 0c76a70313
commit 4036f24909
3 changed files with 127 additions and 61 deletions

View File

@ -10,7 +10,8 @@ import net.sf.samtools.util.RuntimeIOException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.executive.MicroManager;
import org.broadinstitute.sting.gatk.executive.LinearMicroManager;
import org.broadinstitute.sting.gatk.executive.MicroScheduler;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
import org.broadinstitute.sting.gatk.traversals.*;
@ -245,7 +246,7 @@ public class GenomeAnalysisTK extends CommandLineProgram {
throw new RuntimeException( "Unable to access walker", ex );
}
MicroManager microManager = null;
MicroScheduler microScheduler = null;
// Get the walker specified
if ( my_walker instanceof LocusWalker ) {
@ -267,8 +268,8 @@ public class GenomeAnalysisTK extends CommandLineProgram {
// is not filtered.
if( !DISABLE_THREADING ) {
logger.warn("Preliminary threading support ENABLED");
microManager = new MicroManager( INPUT_FILES, REF_FILE_ARG, numThreads );
this.engine = microManager.getTraversalEngine();
microScheduler = new LinearMicroManager( INPUT_FILES, REF_FILE_ARG, numThreads );
this.engine = microScheduler.getTraversalEngine();
}
else {
logger.warn("Preliminary threading support DISABLED");
@ -331,15 +332,9 @@ public class GenomeAnalysisTK extends CommandLineProgram {
engine.setWalkOverAllSites(WALK_ALL_LOCI);
engine.initialize();
if( microManager != null ) {
List<GenomeLoc> locs;
if (INTERVALS_FILE != null) {
locs = GenomeLoc.IntervalFileToList(INTERVALS_FILE);
microManager.setIntervalList(locs);
} else {
locs = GenomeLoc.parseGenomeLocs( REGION_STR );
}
microManager.execute( my_walker, locs );
if( microScheduler != null ) {
List<GenomeLoc> locs = GenomeLoc.parseGenomeLocs( REGION_STR );
microScheduler.execute( my_walker, locs );
}
else
engine.traverse(my_walker);

View File

@ -1,11 +1,9 @@
package org.broadinstitute.sting.gatk.executive;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
@ -17,72 +15,32 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;
/**
* A micro-scheduling manager for N-way threaded execution of a traversal
*
*/
public class MicroManager {
private static long SHARD_SIZE = 100000L;
private List<File> reads;
private IndexedFastaSequenceFile ref;
public class LinearMicroManager extends MicroScheduler {
private TraverseLociByReference traversalEngine = null;
protected static Logger logger = Logger.getLogger(MicroManager.class);
protected List<GenomeLoc> intervalList = null;
public TraversalEngine getTraversalEngine() {
return traversalEngine;
}
public MicroManager( List<File> reads, // the reads file(s)
public LinearMicroManager( List<File> reads, // the reads file(s)
File refFile, // the reference file driving the traversal
int nThreadsToUse ) { // maximum number of threads to use to do the work
this.reads = reads;
try {
ref = new IndexedFastaSequenceFile(refFile);
}
catch( FileNotFoundException ex ) {
throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex);
}
GenomeLoc.setupRefContigOrdering(ref);
super( reads, refFile );
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
}
public void setIntervalList(List<GenomeLoc> intervalList) {
this.intervalList = intervalList;
}
public void execute( Walker walker, // the analysis technique to use.
List<GenomeLoc> locations ) { // list of work to do
ShardStrategy shardStrategy = null;
if( locations != null )
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
ref.getSequenceDictionary(),
SHARD_SIZE,
locations );
else
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
ref.getSequenceDictionary(),
SHARD_SIZE );
SAMDataSource dataSource = null;
try {
dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) );
}
catch( SimpleDataSourceLoadException ex ) {
throw new RuntimeException( ex );
}
catch( FileNotFoundException ex ) {
throw new RuntimeException( ex );
}
ShardStrategy shardStrategy = getShardStrategy( reference, locations );
SAMDataSource dataSource = getReadsDataSource();
boolean walkerInitialized = false;
Object accumulator = null;
@ -98,7 +56,7 @@ public class MicroManager {
throw new RuntimeException( ex );
}
ReferenceProvider referenceProvider = new ReferenceProvider( ref, span );
ReferenceProvider referenceProvider = new ReferenceProvider( reference, span );
LocusContextProvider locusProvider = new LocusContextProvider( readShard );
// set the sam header of the traversal engine

View File

@ -0,0 +1,113 @@
package org.broadinstitute.sting.gatk.executive;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import java.util.List;
import java.io.FileNotFoundException;
import java.io.File;
import edu.mit.broad.picard.reference.ReferenceSequenceFile;
/**
* Created by IntelliJ IDEA.
* User: mhanna
* Date: Apr 26, 2009
* Time: 12:37:23 PM
* To change this template use File | Settings | File Templates.
*/
public abstract class MicroScheduler {
private List<File> reads;
private static long SHARD_SIZE = 100000L;
protected static Logger logger = Logger.getLogger(LinearMicroManager.class);
protected IndexedFastaSequenceFile reference;
/**
* Create a microscheduler given the reads and reference.
* @param reads The reads.
* @param refFile File pointer to the reference.
*/
protected MicroScheduler( List<File> reads, File refFile ) {
this.reads = reads;
this.reference = openReferenceSequenceFile( refFile );
}
/**
* A temporary getter for the traversal engine. In the future, clients
* of the microscheduler shouldn't need to know anything about the traversal engine.
* @return The traversal engine.
*/
public abstract TraversalEngine getTraversalEngine();
/**
* Walks a walker over the given list of intervals.
* @param walker Computation to perform over dataset.
* @param intervals A list of intervals over which to walk. Null for whole dataset.
*/
public abstract void execute( Walker walker, List<GenomeLoc> intervals);
/**
* Get the sharding strategy given a driving data source.
* @param drivingDataSource Data on which to shard.
* @param intervals Intervals to use when limiting sharding.
* @return Sharding strategy for this driving data source.
*/
protected ShardStrategy getShardStrategy( ReferenceSequenceFile drivingDataSource, List<GenomeLoc> intervals ) {
ShardStrategy shardStrategy = null;
if( intervals != null )
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals );
else
shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE );
return shardStrategy;
}
/**
* Gets a data source for the given set of reads.
* @return A data source for the given set of reads.
*/
protected SAMDataSource getReadsDataSource() {
SAMDataSource dataSource = null;
try {
dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) );
}
catch( SimpleDataSourceLoadException ex ) {
throw new RuntimeException( ex );
}
catch( FileNotFoundException ex ) {
throw new RuntimeException( ex );
}
return dataSource;
}
/**
* Opens a reference sequence file paired with an index.
* @param refFile Handle to a reference sequence file. Non-null.
* @return A thread-safe file wrapper.
*/
private IndexedFastaSequenceFile openReferenceSequenceFile( File refFile ) {
IndexedFastaSequenceFile ref = null;
try {
ref = new IndexedFastaSequenceFile(refFile);
}
catch( FileNotFoundException ex ) {
throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex);
}
GenomeLoc.setupRefContigOrdering(ref);
return ref;
}
}