package org.broadinstitute.sting.gatk.executive; import net.sf.picard.reference.ReferenceSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraverseReads; import org.broadinstitute.sting.gatk.traversals.TraverseLoci; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.Reads; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.GenomeLocSortedSet; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.io.File; import java.io.FileNotFoundException; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.ArrayList; /** * Created by IntelliJ IDEA. * User: mhanna * Date: Apr 26, 2009 * Time: 12:37:23 PM * To change this template use File | Settings | File Templates. */ /** Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler { private static long SHARD_SIZE = 100000L; protected static Logger logger = Logger.getLogger(MicroScheduler.class); protected final TraversalEngine traversalEngine; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; private final List rods; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. * @param walker Which walker to use. * @param nThreadsToUse Number of threads to utilize. * @return The best-fit microscheduler. */ public static MicroScheduler create(Walker walker, Reads reads, File ref, List> rods, int nThreadsToUse) { if (walker instanceof TreeReducible && nThreadsToUse > 1) { logger.info("Creating hierarchical microscheduler"); return new HierarchicalMicroScheduler(walker, reads, ref, rods, nThreadsToUse); } else { logger.info("Creating linear microscheduler"); return new LinearMicroScheduler(walker, reads, ref, rods); } } /** * Create a microscheduler given the reads and reference. * @param reads The reads. * @param refFile File pointer to the reference. */ protected MicroScheduler(Walker walker, Reads reads, File refFile, List> rods) { if (walker instanceof ReadWalker) { traversalEngine = new TraverseReads(reads.getReadsFiles(), refFile, rods); } else { traversalEngine = new TraverseLoci(reads.getReadsFiles(), refFile, rods); } this.reads = getReadsDataSource(reads); this.reference = openReferenceSequenceFile(refFile); this.rods = getReferenceOrderedDataSources(rods); } /** * A temporary getter for the traversal engine. In the future, clients * of the microscheduler shouldn't need to know anything about the traversal engine. * @return The traversal engine. */ public TraversalEngine getTraversalEngine() { return traversalEngine; } /** * Walks a walker over the given list of intervals. * @param walker Computation to perform over dataset. * @param intervals A list of intervals over which to walk. Null for whole dataset. * @return the return type of the walker */ public abstract Object execute(Walker walker, GenomeLocSortedSet intervals); /** * Get the sharding strategy given a driving data source. * @param walker Walker for which to infer sharding strategy. * @param drivingDataSource Data on which to shard. * @param intervals Intervals to use when limiting sharding. * @return Sharding strategy for this driving data source. */ protected ShardStrategy getShardStrategy(Walker walker, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) { ShardStrategy shardStrategy = null; ShardStrategyFactory.SHATTER_STRATEGY shardType; if (walker instanceof LocusWalker) { if (intervals != null) { shardType = (walker.isReduceByInterval()) ? ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR; shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, intervals); } else shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, drivingDataSource.getSequenceDictionary(), SHARD_SIZE); } else if (walker instanceof ReadWalker) { shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS; if (intervals != null) { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, intervals); } else { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE); } } else throw new StingException("Unable to support walker of type" + walker.getClass().getName()); return shardStrategy; } /** * Gets an window into all the data that can be viewed as a single shard. * @param shard The section of data to view. * @return An accessor for all the data in this shard. */ protected ShardDataProvider getShardDataProvider(Shard shard) { return new ShardDataProvider(shard, reads, reference, rods); } /** * Gets a data source for the given set of reads. * @return A data source for the given set of reads. */ private SAMDataSource getReadsDataSource(Reads reads) { // By reference traversals are happy with no reads. Make sure that case is handled. if (reads.getReadsFiles().size() == 0) return null; SAMDataSource dataSource = new SAMDataSource(reads); // Side effect: initialize the traversal engine with reads data. // TODO: Give users a dedicated way of getting the header so that the MicroScheduler // doesn't have to bend over backward providing legacy getters and setters. traversalEngine.setSAMHeader(dataSource.getHeader()); return dataSource; } /** * Open the reference-ordered data sources. * @return A list of reference-ordered data sources. */ private List getReferenceOrderedDataSources(List> rods) { List dataSources = new ArrayList(); for (ReferenceOrderedData rod : rods) dataSources.add(new ReferenceOrderedDataSource(rod)); return dataSources; } /** * Opens a reference sequence file paired with an index. * @param refFile Handle to a reference sequence file. Non-null. * @return A thread-safe file wrapper. */ private IndexedFastaSequenceFile openReferenceSequenceFile(File refFile) { IndexedFastaSequenceFile ref = null; try { ref = new IndexedFastaSequenceFile(refFile); } catch (FileNotFoundException ex) { throw new RuntimeException("File not found opening fasta file; please do this check before MicroManaging", ex); } GenomeLoc.setupRefContigOrdering(ref); return ref; } }