package org.broadinstitute.sting.gatk.executive; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.io.File; import java.io.FileNotFoundException; import java.util.List; /** * A micro-scheduling manager for N-way threaded execution of a traversal * */ public class MicroManager { private static long SHARD_SIZE = 100000L; private List reads; private IndexedFastaSequenceFile ref; private TraverseLociByReference traversalEngine = null; protected static Logger logger = Logger.getLogger(MicroManager.class); protected List intervalList = null; public TraversalEngine getTraversalEngine() { return traversalEngine; } public MicroManager( List reads, // the reads file(s) File refFile, // the reference file driving the traversal int nThreadsToUse ) { // maximum number of threads to use to do the work this.reads = reads; try { ref = new IndexedFastaSequenceFile(refFile); } catch( FileNotFoundException ex ) { throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex); } GenomeLoc.setupRefContigOrdering(ref); traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); } public void setIntervalList(List intervalList) { this.intervalList = intervalList; } public void execute( Walker walker, // the analysis technique to use. List locations ) { // list of work to do ShardStrategy shardStrategy = null; if( locations != null ) shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, ref.getSequenceDictionary(), SHARD_SIZE, locations ); else shardStrategy = ShardStrategyFactory.shatter( ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, ref.getSequenceDictionary(), SHARD_SIZE ); SAMDataSource dataSource = null; try { dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) ); } catch( SimpleDataSourceLoadException ex ) { throw new RuntimeException( ex ); } catch( FileNotFoundException ex ) { throw new RuntimeException( ex ); } boolean walkerInitialized = false; Object accumulator = null; for(Shard shard: shardStrategy) { GenomeLoc span = shard.getGenomeLoc(); MergingSamRecordIterator2 readShard = null; try { readShard = dataSource.seek( span ); } catch( SimpleDataSourceLoadException ex ) { throw new RuntimeException( ex ); } ReferenceProvider referenceProvider = new ReferenceProvider( ref, span ); LocusContextProvider locusProvider = new LocusContextProvider( readShard ); // set the sam header of the traversal engine traversalEngine.setSAMHeader(readShard.getMergedHeader()); if (!walkerInitialized) { walker.initialize(); accumulator = ((LocusWalker)walker).reduceInit(); walkerInitialized = true; } accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); readShard.close(); } traversalEngine.printOnTraversalDone("loci", accumulator); walker.onTraversalDone(accumulator); } }