2009-03-27 23:40:45 +08:00
|
|
|
package org.broadinstitute.sting.gatk.executive;
|
|
|
|
|
|
2009-06-12 02:13:22 +08:00
|
|
|
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
2010-03-12 02:40:31 +08:00
|
|
|
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
|
|
|
|
|
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
|
2009-06-12 02:13:22 +08:00
|
|
|
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
|
|
|
|
|
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
|
2009-07-30 00:11:45 +08:00
|
|
|
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource;
|
|
|
|
|
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
|
2009-04-16 02:29:38 +08:00
|
|
|
import org.broadinstitute.sting.gatk.walkers.Walker;
|
2010-05-31 02:00:12 +08:00
|
|
|
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
|
2009-08-23 08:56:02 +08:00
|
|
|
import org.broadinstitute.sting.gatk.io.DirectOutputTracker;
|
|
|
|
|
import org.broadinstitute.sting.gatk.io.OutputTracker;
|
2010-02-25 08:16:50 +08:00
|
|
|
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
|
2010-05-27 06:12:25 +08:00
|
|
|
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
2009-07-30 00:11:45 +08:00
|
|
|
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
2009-04-10 04:28:17 +08:00
|
|
|
|
2009-07-30 00:11:45 +08:00
|
|
|
import java.util.Collection;
|
2009-03-27 23:40:45 +08:00
|
|
|
|
2010-05-27 06:12:25 +08:00
|
|
|
import net.sf.samtools.SAMRecord;
|
|
|
|
|
import net.sf.samtools.util.CloseableIterator;
|
|
|
|
|
|
2010-03-12 02:40:31 +08:00
|
|
|
|
2009-05-07 06:36:25 +08:00
|
|
|
/** A micro-scheduling manager for single-threaded execution of a traversal. */
|
2009-04-27 01:46:52 +08:00
|
|
|
public class LinearMicroScheduler extends MicroScheduler {
|
2009-04-10 04:28:17 +08:00
|
|
|
|
2009-08-23 08:56:02 +08:00
|
|
|
/**
|
|
|
|
|
* A direct output tracker for directly managing output.
|
|
|
|
|
*/
|
|
|
|
|
private DirectOutputTracker outputTracker = new DirectOutputTracker();
|
|
|
|
|
|
2009-04-27 07:08:12 +08:00
|
|
|
/**
|
|
|
|
|
* Create a new linear microscheduler to process the given reads and reference.
|
2009-05-07 06:36:25 +08:00
|
|
|
*
|
2009-07-30 00:11:45 +08:00
|
|
|
* @param walker Walker for the traversal.
|
|
|
|
|
* @param reads Reads file(s) to process.
|
|
|
|
|
* @param reference Reference for driving the traversal.
|
|
|
|
|
* @param rods Reference-ordered data.
|
2009-04-27 07:08:12 +08:00
|
|
|
*/
|
2010-02-25 08:16:50 +08:00
|
|
|
protected LinearMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods ) {
|
|
|
|
|
super(engine, walker, reads, reference, rods);
|
2009-03-27 23:40:45 +08:00
|
|
|
}
|
2009-04-10 04:28:17 +08:00
|
|
|
|
2009-04-27 07:08:12 +08:00
|
|
|
/**
|
|
|
|
|
* Run this traversal over the specified subsection of the dataset.
|
2009-05-07 06:36:25 +08:00
|
|
|
*
|
|
|
|
|
* @param walker Computation to perform over dataset.
|
2009-07-30 00:11:45 +08:00
|
|
|
* @param shardStrategy A strategy for sharding the data.
|
2009-04-27 07:08:12 +08:00
|
|
|
*/
|
2009-07-30 07:00:15 +08:00
|
|
|
public Object execute(Walker walker, ShardStrategy shardStrategy, int maxIterations) {
|
|
|
|
|
// Having maxiterations in the execute method is a holdover from the old TraversalEngine days.
|
|
|
|
|
// Lets do something else with this.
|
|
|
|
|
traversalEngine.setMaximumIterations(maxIterations);
|
|
|
|
|
|
2009-05-08 08:58:37 +08:00
|
|
|
walker.initialize();
|
2010-02-25 08:16:50 +08:00
|
|
|
Accumulator accumulator = Accumulator.create(engine,walker);
|
2009-04-10 04:28:17 +08:00
|
|
|
|
2009-05-07 06:36:25 +08:00
|
|
|
for (Shard shard : shardStrategy) {
|
2010-02-25 08:16:50 +08:00
|
|
|
// New experimental code for managing locus intervals.
|
2010-03-12 02:40:31 +08:00
|
|
|
if(shard.getShardType() == Shard.ShardType.LOCUS || shard.getShardType() == Shard.ShardType.LOCUS_INTERVAL) {
|
2010-05-31 02:00:12 +08:00
|
|
|
LocusWalker lWalker = (LocusWalker)walker;
|
|
|
|
|
WindowMaker windowMaker = new WindowMaker(getReadIterator(shard), shard.getGenomeLocs(), walker.getMandatoryReadFilters(), lWalker.getDiscards());
|
2010-02-25 08:16:50 +08:00
|
|
|
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
|
2010-03-12 02:40:31 +08:00
|
|
|
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),iterator.getLocus(),iterator,reference,rods);
|
2010-02-25 08:16:50 +08:00
|
|
|
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
|
|
|
|
|
accumulator.accumulate(dataProvider,result);
|
|
|
|
|
dataProvider.close();
|
|
|
|
|
}
|
|
|
|
|
windowMaker.close();
|
|
|
|
|
}
|
|
|
|
|
else {
|
2010-03-12 02:40:31 +08:00
|
|
|
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,getReadIterator(shard),reference,rods);
|
2010-02-25 08:16:50 +08:00
|
|
|
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
|
|
|
|
|
accumulator.accumulate(dataProvider,result);
|
|
|
|
|
dataProvider.close();
|
|
|
|
|
}
|
2009-04-10 04:28:17 +08:00
|
|
|
}
|
|
|
|
|
|
2009-05-19 06:54:18 +08:00
|
|
|
Object result = accumulator.finishTraversal();
|
2009-05-07 06:36:25 +08:00
|
|
|
|
2009-07-07 06:50:22 +08:00
|
|
|
printOnTraversalDone(result);
|
2009-05-16 04:20:27 +08:00
|
|
|
|
2009-10-27 04:18:31 +08:00
|
|
|
outputTracker.close();
|
2009-09-16 01:58:14 +08:00
|
|
|
|
2009-05-16 04:20:27 +08:00
|
|
|
return accumulator;
|
2009-04-10 04:28:17 +08:00
|
|
|
}
|
|
|
|
|
|
2009-08-23 08:56:02 +08:00
|
|
|
/**
|
|
|
|
|
* @{inheritDoc}
|
|
|
|
|
*/
|
|
|
|
|
public OutputTracker getOutputTracker() { return outputTracker; }
|
2009-03-27 23:40:45 +08:00
|
|
|
}
|