From 5136724884f95e8a865e9b43f33b4edb906ea1eb Mon Sep 17 00:00:00 2001 From: aaron Date: Wed, 6 May 2009 22:36:25 +0000 Subject: [PATCH] Added code to the schedulers, one step closer to turning on the new reads traversals git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@613 348d0f76-0448-11de-a6fe-93d51630548a --- .../simpleDataSources/SAMDataSource.java | 4 +- .../gatk/executive/LinearMicroScheduler.java | 62 ++++++++++++------- .../sting/gatk/executive/MicroScheduler.java | 11 ++-- .../sting/gatk/traversals/TraverseReads.java | 4 +- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMDataSource.java index b2965ac14..3f572a69c 100755 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMDataSource.java @@ -5,12 +5,12 @@ import net.sf.samtools.SAMFileHeader; import net.sf.samtools.SAMFileReader; import net.sf.samtools.SAMReadGroupRecord; import net.sf.samtools.SAMRecord; -import net.sf.samtools.util.CloseableIterator; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.StingException; @@ -145,7 +145,7 @@ public class SAMDataSource implements SimpleDataSource { * @param shard the shard to get data for * @return an iterator for that region */ - public CloseableIterator seek(Shard shard) throws SimpleDataSourceLoadException { + public StingSAMIterator seek(Shard shard) throws SimpleDataSourceLoadException { if (shard.getShardType() == Shard.ShardType.READ) { return seekRead((ReadShard) shard); } else if (shard.getShardType() == Shard.ShardType.LOCUS) { diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index b56d1a81d..a11db3601 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -6,10 +6,11 @@ import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; -import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; +import org.broadinstitute.sting.gatk.traversals.TraverseByReads; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; +import org.broadinstitute.sting.gatk.traversals.TraverseReads; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Walker; @@ -18,69 +19,84 @@ import org.broadinstitute.sting.utils.GenomeLoc; import java.io.File; import java.util.List; -/** - * A micro-scheduling manager for single-threaded execution of a traversal. - */ +/** A micro-scheduling manager for single-threaded execution of a traversal. */ public class LinearMicroScheduler extends MicroScheduler { private TraversalEngine traversalEngine = null; + private boolean isAReadWalker = false; + + /** get the traversal engine */ public TraversalEngine getTraversalEngine() { return traversalEngine; } /** * Create a new linear microscheduler to process the given reads and reference. - * @param reads Reads file(s) to process. + * + * @param reads Reads file(s) to process. * @param refFile Reference for driving the traversal. */ - protected LinearMicroScheduler( List reads, File refFile ) { - super( reads, refFile ); - traversalEngine = new TraverseLociByReference( reads, refFile, new java.util.ArrayList() ); + protected LinearMicroScheduler(List reads, File refFile, Walker walker) { + super(reads, refFile); + + // determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology. I pinky swear... + isAReadWalker = (walker instanceof ReadWalker) ? true : false; + + if (isAReadWalker) { + traversalEngine = new TraverseByReads(reads, refFile, new java.util.ArrayList()); + } else { + traversalEngine = new TraverseLociByReference(reads, refFile, new java.util.ArrayList()); + } } /** * Run this traversal over the specified subsection of the dataset. - * @param walker Computation to perform over dataset. + * + * @param walker Computation to perform over dataset. * @param locations Subset of the dataset over which to walk. */ - public void execute( Walker walker, List locations ) { - ShardStrategy shardStrategy = getShardStrategy( reference, locations ); + public void execute(Walker walker, List locations) { + ShardStrategy shardStrategy = getShardStrategy(reference, locations); SAMDataSource dataSource = getReadsDataSource(); - // determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology - boolean readwalker = (walker instanceof ReadWalker) ? true : false; - boolean walkerInitialized = false; Object accumulator = null; - for(Shard shard: shardStrategy) { + for (Shard shard : shardStrategy) { StingSAMIterator readShard = null; try { - readShard = (MergingSamRecordIterator2)dataSource.seek( shard ); + readShard = dataSource.seek(shard); } - catch( SimpleDataSourceLoadException ex ) { - throw new RuntimeException( ex ); + catch (SimpleDataSourceLoadException ex) { + throw new RuntimeException(ex); } - ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard.getGenomeLoc() ); - LocusContextProvider locusProvider = new LocusContextProvider( readShard ); + ReferenceProvider referenceProvider = new ReferenceProvider(reference, shard.getGenomeLoc()); + LocusContextProvider locusProvider = new LocusContextProvider(readShard); // set the sam header of the traversal engine traversalEngine.setSAMHeader(readShard.getHeader()); if (!walkerInitialized) { walker.initialize(); - accumulator = ((LocusWalker)walker).reduceInit(); + accumulator = ((LocusWalker) walker).reduceInit(); walkerInitialized = true; } - accumulator = ((TraverseLociByReference)traversalEngine).traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + if (!isAReadWalker) { + accumulator = ((TraverseLociByReference) traversalEngine).traverse(walker, shard, referenceProvider, locusProvider, accumulator); + } else { + accumulator = ((TraverseReads) traversalEngine).traverse(walker, shard, readShard, accumulator); + } + readShard.close(); } - traversalEngine.printOnTraversalDone("loci", accumulator); + String type = (isAReadWalker) ? "read" : "loci"; + traversalEngine.printOnTraversalDone(type, accumulator); + walker.onTraversalDone(accumulator); } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 5d1e4ec0d..94be1bb4c 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -1,21 +1,20 @@ package org.broadinstitute.sting.gatk.executive; +import edu.mit.broad.picard.reference.ReferenceSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; -import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; +import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; -import java.util.List; -import java.io.FileNotFoundException; import java.io.File; - -import edu.mit.broad.picard.reference.ReferenceSequenceFile; +import java.io.FileNotFoundException; +import java.util.List; /** * Created by IntelliJ IDEA. @@ -50,7 +49,7 @@ public abstract class MicroScheduler { } else { logger.info("Creating linear microscheduler"); - return new LinearMicroScheduler( reads, ref ); + return new LinearMicroScheduler( reads, ref, walker ); } } diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java index 9e730bd5e..ae2c0ef4e 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java @@ -5,7 +5,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.LocusContext; import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; -import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.walkers.ReadWalker; @@ -74,7 +74,7 @@ public class TraverseReads extends TraversalEngine { */ public T traverse(Walker walker, Shard shard, - BoundedReadIterator iter, + StingSAMIterator iter, T sum) { logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize()));