Added code to the schedulers, one step closer to turning on the new reads traversals

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@613 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
aaron 2009-05-06 22:36:25 +00:00
parent 9c0b81e946
commit 5136724884
4 changed files with 48 additions and 33 deletions

View File

@ -5,12 +5,12 @@ import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader; import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMReadGroupRecord; import net.sf.samtools.SAMReadGroupRecord;
import net.sf.samtools.SAMRecord; import net.sf.samtools.SAMRecord;
import net.sf.samtools.util.CloseableIterator;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.StingException;
@ -145,7 +145,7 @@ public class SAMDataSource implements SimpleDataSource {
* @param shard the shard to get data for * @param shard the shard to get data for
* @return an iterator for that region * @return an iterator for that region
*/ */
public CloseableIterator<SAMRecord> seek(Shard shard) throws SimpleDataSourceLoadException { public StingSAMIterator seek(Shard shard) throws SimpleDataSourceLoadException {
if (shard.getShardType() == Shard.ShardType.READ) { if (shard.getShardType() == Shard.ShardType.READ) {
return seekRead((ReadShard) shard); return seekRead((ReadShard) shard);
} else if (shard.getShardType() == Shard.ShardType.LOCUS) { } else if (shard.getShardType() == Shard.ShardType.LOCUS) {

View File

@ -6,10 +6,11 @@ import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.traversals.TraverseByReads;
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
import org.broadinstitute.sting.gatk.traversals.TraverseReads;
import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.LocusWalker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.Walker;
@ -18,69 +19,84 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
/** /** A micro-scheduling manager for single-threaded execution of a traversal. */
* A micro-scheduling manager for single-threaded execution of a traversal.
*/
public class LinearMicroScheduler extends MicroScheduler { public class LinearMicroScheduler extends MicroScheduler {
private TraversalEngine traversalEngine = null; private TraversalEngine traversalEngine = null;
private boolean isAReadWalker = false;
/** get the traversal engine */
public TraversalEngine getTraversalEngine() { public TraversalEngine getTraversalEngine() {
return traversalEngine; return traversalEngine;
} }
/** /**
* Create a new linear microscheduler to process the given reads and reference. * Create a new linear microscheduler to process the given reads and reference.
* @param reads Reads file(s) to process. *
* @param reads Reads file(s) to process.
* @param refFile Reference for driving the traversal. * @param refFile Reference for driving the traversal.
*/ */
protected LinearMicroScheduler( List<File> reads, File refFile ) { protected LinearMicroScheduler(List<File> reads, File refFile, Walker walker) {
super( reads, refFile ); super(reads, refFile);
traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() );
// determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology. I pinky swear...
isAReadWalker = (walker instanceof ReadWalker) ? true : false;
if (isAReadWalker) {
traversalEngine = new TraverseByReads(reads, refFile, new java.util.ArrayList());
} else {
traversalEngine = new TraverseLociByReference(reads, refFile, new java.util.ArrayList());
}
} }
/** /**
* Run this traversal over the specified subsection of the dataset. * Run this traversal over the specified subsection of the dataset.
* @param walker Computation to perform over dataset. *
* @param walker Computation to perform over dataset.
* @param locations Subset of the dataset over which to walk. * @param locations Subset of the dataset over which to walk.
*/ */
public void execute( Walker walker, List<GenomeLoc> locations ) { public void execute(Walker walker, List<GenomeLoc> locations) {
ShardStrategy shardStrategy = getShardStrategy( reference, locations ); ShardStrategy shardStrategy = getShardStrategy(reference, locations);
SAMDataSource dataSource = getReadsDataSource(); SAMDataSource dataSource = getReadsDataSource();
// determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology
boolean readwalker = (walker instanceof ReadWalker) ? true : false;
boolean walkerInitialized = false; boolean walkerInitialized = false;
Object accumulator = null; Object accumulator = null;
for(Shard shard: shardStrategy) { for (Shard shard : shardStrategy) {
StingSAMIterator readShard = null; StingSAMIterator readShard = null;
try { try {
readShard = (MergingSamRecordIterator2)dataSource.seek( shard ); readShard = dataSource.seek(shard);
} }
catch( SimpleDataSourceLoadException ex ) { catch (SimpleDataSourceLoadException ex) {
throw new RuntimeException( ex ); throw new RuntimeException(ex);
} }
ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard.getGenomeLoc() ); ReferenceProvider referenceProvider = new ReferenceProvider(reference, shard.getGenomeLoc());
LocusContextProvider locusProvider = new LocusContextProvider( readShard ); LocusContextProvider locusProvider = new LocusContextProvider(readShard);
// set the sam header of the traversal engine // set the sam header of the traversal engine
traversalEngine.setSAMHeader(readShard.getHeader()); traversalEngine.setSAMHeader(readShard.getHeader());
if (!walkerInitialized) { if (!walkerInitialized) {
walker.initialize(); walker.initialize();
accumulator = ((LocusWalker<?,?>)walker).reduceInit(); accumulator = ((LocusWalker<?, ?>) walker).reduceInit();
walkerInitialized = true; walkerInitialized = true;
} }
accumulator = ((TraverseLociByReference)traversalEngine).traverse( walker, shard, referenceProvider, locusProvider, accumulator ); if (!isAReadWalker) {
accumulator = ((TraverseLociByReference) traversalEngine).traverse(walker, shard, referenceProvider, locusProvider, accumulator);
} else {
accumulator = ((TraverseReads) traversalEngine).traverse(walker, shard, readShard, accumulator);
}
readShard.close(); readShard.close();
} }
traversalEngine.printOnTraversalDone("loci", accumulator); String type = (isAReadWalker) ? "read" : "loci";
traversalEngine.printOnTraversalDone(type, accumulator);
walker.onTraversalDone(accumulator); walker.onTraversalDone(accumulator);
} }

View File

@ -1,21 +1,20 @@
package org.broadinstitute.sting.gatk.executive; package org.broadinstitute.sting.gatk.executive;
import edu.mit.broad.picard.reference.ReferenceSequenceFile;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import java.util.List;
import java.io.FileNotFoundException;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import edu.mit.broad.picard.reference.ReferenceSequenceFile; import java.util.List;
/** /**
* Created by IntelliJ IDEA. * Created by IntelliJ IDEA.
@ -50,7 +49,7 @@ public abstract class MicroScheduler {
} }
else { else {
logger.info("Creating linear microscheduler"); logger.info("Creating linear microscheduler");
return new LinearMicroScheduler( reads, ref ); return new LinearMicroScheduler( reads, ref, walker );
} }
} }

View File

@ -5,7 +5,7 @@ import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.LocusContext; import org.broadinstitute.sting.gatk.LocusContext;
import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.ReadWalker;
@ -74,7 +74,7 @@ public class TraverseReads extends TraversalEngine {
*/ */
public <M, T> T traverse(Walker<M, T> walker, public <M, T> T traverse(Walker<M, T> walker,
Shard shard, Shard shard,
BoundedReadIterator iter, StingSAMIterator iter,
T sum) { T sum) {
logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize())); logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize()));