/* * Copyright (c) 2009 The Broad Institute * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without * restriction, including without limitation the rights to use, * copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following * conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ package org.broadinstitute.sting.gatk.executive; import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.samtools.SAMFileHeader; import net.sf.samtools.SAMSequenceDictionary; import net.sf.samtools.SAMSequenceRecord; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.Reads; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocSortedSet; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.io.File; import java.io.FileNotFoundException; import java.util.*; /** * Created by IntelliJ IDEA. * User: mhanna * Date: Apr 26, 2009 * Time: 12:37:23 PM * To change this template use File | Settings | File Templates. */ /** Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler { private static long SHARD_SIZE = 100000L; protected static Logger logger = Logger.getLogger(MicroScheduler.class); protected final TraversalEngine traversalEngine; protected final IndexedFastaSequenceFile reference; private final SAMDataSource reads; private final List rods; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. * * @param walker Which walker to use. * @param reads the informations associated with the reads * @param ref the reference file * @param rods the rods to include in the traversal * @param nThreadsToUse Number of threads to utilize. * * @return The best-fit microscheduler. */ public static MicroScheduler create(Walker walker, Reads reads, File ref, List> rods, int nThreadsToUse) { if (walker instanceof TreeReducible && nThreadsToUse > 1) { logger.info("Creating hierarchical microscheduler"); return new HierarchicalMicroScheduler(walker, reads, ref, rods, nThreadsToUse); } else { logger.info("Creating linear microscheduler"); return new LinearMicroScheduler(walker, reads, ref, rods); } } /** * Create a microscheduler given the reads and reference. * * @param walker the walker to execute with * @param reads The reads. * @param refFile File pointer to the reference. * @param rods the rods to include in the traversal */ protected MicroScheduler(Walker walker, Reads reads, File refFile, List> rods) { if (walker instanceof ReadWalker) { traversalEngine = new TraverseReads(); } else if (walker instanceof LocusWalker) { traversalEngine = new TraverseLoci(); } else if (walker instanceof LocusWindowWalker) { traversalEngine = new TraverseLocusWindows(); } else if (walker instanceof DuplicateWalker) { traversalEngine = new TraverseDuplicates(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); } this.reads = getReadsDataSource(reads); this.reference = openReferenceSequenceFile(refFile); this.rods = getReferenceOrderedDataSources(rods); validate(this.reads,this.reference); } /** * A temporary getter for the traversal engine. In the future, clients * of the microscheduler shouldn't need to know anything about the traversal engine. * * @return The traversal engine. */ public TraversalEngine getTraversalEngine() { return traversalEngine; } /** * Walks a walker over the given list of intervals. * * @param walker Computation to perform over dataset. * @param intervals A list of intervals over which to walk. Null for whole dataset. * @param maxIterations the maximum number of iterations we're to perform * * @return the return type of the walker */ public abstract Object execute(Walker walker, GenomeLocSortedSet intervals, Integer maxIterations); /** * Get the sharding strategy given a driving data source. * * @param walker Walker for which to infer sharding strategy. * @param drivingDataSource Data on which to shard. * @param intervals Intervals to use when limiting sharding. * @param maxIterations the maximum number of iterations to run through * * @return Sharding strategy for this driving data source. */ protected ShardStrategy getShardStrategy(Walker walker, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals, Integer maxIterations) { ShardStrategy shardStrategy = null; ShardStrategyFactory.SHATTER_STRATEGY shardType; if (walker instanceof LocusWalker) { if (intervals != null) { shardType = (walker.isReduceByInterval()) ? ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR; shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, intervals, maxIterations); } else shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, maxIterations); } else if (walker instanceof ReadWalker || walker instanceof DuplicateWalker) { shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS; if (intervals != null) { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, intervals, maxIterations); } else { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, maxIterations); } } else if (walker instanceof LocusWindowWalker) { if( intervals == null ) throw new StingException("Unable to shard: walker is of type LocusWindow, but no intervals were provided"); shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, intervals, maxIterations); } else throw new StingException("Unable to support walker of type" + walker.getClass().getName()); return shardStrategy; } /** * Gets an window into all the data that can be viewed as a single shard. * * @param shard The section of data to view. * * @return An accessor for all the data in this shard. */ protected ShardDataProvider getShardDataProvider(Shard shard) { return new ShardDataProvider(shard, reads, reference, rods); } /** * Print summary information for the analysis. * @param sum The final reduce output. */ protected void printOnTraversalDone(Object sum) { // HACK: The microscheduler should be too dumb to know anything about the data // it's actually processing; it should just funnel anything it receives // to the traversal engine. // TODO: Implement code to allow the datasources to print summary info of the // data they've seen. if( reads != null && reads.getViolationHistogram().getViolationCount() > 0 ) logger.warn(String.format("%n%s",reads.getViolationHistogram())); traversalEngine.printOnTraversalDone(sum); } /** * Gets a data source for the given set of reads. * * @param reads the read source information * * @return A data source for the given set of reads. */ private SAMDataSource getReadsDataSource(Reads reads) { // By reference traversals are happy with no reads. Make sure that case is handled. if (reads.getReadsFiles().size() == 0) return null; SAMDataSource dataSource = new SAMDataSource(reads); // Side effect: initialize the traversal engine with reads data. // TODO: Give users a dedicated way of getting the header so that the MicroScheduler // doesn't have to bend over backward providing legacy getters and setters. traversalEngine.setSAMHeader(dataSource.getHeader()); return dataSource; } /** * Open the reference-ordered data sources. * * @param rods the reference order data to execute using * * @return A list of reference-ordered data sources. */ private List getReferenceOrderedDataSources(List> rods) { List dataSources = new ArrayList(); for (ReferenceOrderedData rod : rods) dataSources.add(new ReferenceOrderedDataSource(rod)); return dataSources; } /** * Opens a reference sequence file paired with an index. * * @param refFile Handle to a reference sequence file. Non-null. * * @return A thread-safe file wrapper. */ private IndexedFastaSequenceFile openReferenceSequenceFile(File refFile) { IndexedFastaSequenceFile ref = null; try { ref = new IndexedFastaSequenceFile(refFile); } catch (FileNotFoundException ex) { throw new StingException("I/O error while opening fasta file: " + ex.getMessage(), ex); } GenomeLocParser.setupRefContigOrdering(ref); return ref; } /** * Now that all files are open, validate the sequence dictionaries of the reads vs. the reference. * TODO: Doing this in the MicroScheduler is a bit late, but this is where data sources are initialized. * TODO: Move the initialization of data sources back to the GenomeAnalysisEngine. * @param reads Reads data source. * @param reference Reference data source. */ private void validate( SAMDataSource reads, ReferenceSequenceFile reference ) { if( reads == null || reference == null ) return; // Compile a set of sequence names that exist in the BAM files. SAMSequenceDictionary readsDictionary = reads.getHeader().getSequenceDictionary(); Set readsSequenceNames = new TreeSet(); for( SAMSequenceRecord dictionaryEntry: readsDictionary.getSequences() ) readsSequenceNames.add(dictionaryEntry.getSequenceName()); // Compile a set of sequence names that exist in the reference file. SAMSequenceDictionary referenceDictionary = reference.getSequenceDictionary(); Set referenceSequenceNames = new TreeSet(); for( SAMSequenceRecord dictionaryEntry: referenceDictionary.getSequences() ) referenceSequenceNames.add(dictionaryEntry.getSequenceName()); if( readsSequenceNames.size() == 0 ) { logger.info("Reads file is unmapped. Skipping validation against reference."); return; } // If there's no overlap between reads and reference, data will be bogus. Throw an exception. Set intersectingSequenceNames = new HashSet(readsSequenceNames); intersectingSequenceNames.retainAll(referenceSequenceNames); if( intersectingSequenceNames.size() == 0 ) { StringBuilder error = new StringBuilder(); error.append("No overlap exists between sequence dictionary of the reads and the sequence dictionary of the reference. Perhaps you're using the wrong reference?\n"); error.append(System.getProperty("line.separator")); error.append(String.format("Reads contigs: %s%n", prettyPrintSequenceRecords(readsDictionary))); error.append(String.format("Reference contigs: %s%n", prettyPrintSequenceRecords(referenceDictionary))); logger.error(error.toString()); Utils.scareUser("No overlap exists between sequence dictionary of the reads and the sequence dictionary of the reference."); } // If one data source isn't a strict subset of the other or the match is of fewer than x% of the smaller, // issue a warning. if( !readsSequenceNames.equals(referenceSequenceNames) ) { StringBuilder warning = new StringBuilder(); warning.append("Limited overlap exists between sequence dictionary of the reads and the sequence dictionary of the reference. Perhaps you're using the wrong reference?\n"); warning.append(System.getProperty("line.separator")); warning.append(String.format("Reads contigs: %s%n", prettyPrintSequenceRecords(readsDictionary))); warning.append(String.format("Reference contigs: %s%n", prettyPrintSequenceRecords(referenceDictionary))); logger.warn(warning.toString()); } } private String prettyPrintSequenceRecords( SAMSequenceDictionary sequenceDictionary ) { String[] sequenceRecordNames = new String[ sequenceDictionary.size() ]; int sequenceRecordIndex = 0; for( SAMSequenceRecord sequenceRecord: sequenceDictionary.getSequences() ) sequenceRecordNames[sequenceRecordIndex++] = sequenceRecord.getSequenceName(); return Arrays.deepToString(sequenceRecordNames); } }