diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/providers/ShardDataProvider.java b/java/src/org/broadinstitute/sting/gatk/dataSources/providers/ShardDataProvider.java new file mode 100755 index 000000000..2c1b4d755 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/providers/ShardDataProvider.java @@ -0,0 +1,131 @@ +package org.broadinstitute.sting.gatk.dataSources.providers; + +import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; +import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; +import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.LocusContext; +import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; +import org.broadinstitute.sting.utils.GenomeLoc; +/** + * User: hanna + * Date: May 8, 2009 + * Time: 3:09:57 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * An umbrella class that examines the data passed to the microscheduler and + * tries to assemble as much as possible with it. + */ +public class ShardDataProvider { + /** + * The raw collection of reads. + */ + private final StingSAMIterator reads; + + /** + * Information about the locus. Can be accessed mutually exclusively + * with the reads iterator. + */ + private final LocusContextProvider locusContextProvider; + + /** + * Provider of reference data for this particular shard. + */ + private final ReferenceProvider referenceProvider; + + /** + * Users should only drive using the reads directly or using the locus. + * Perhaps in the future we can support direct simultaneous access to both. + */ + private enum LastAccessType { READS, LOCUS }; + private LastAccessType lastAccessed = null; + + /** + * Can this data source provide reads? + * @return True if reads are available, false otherwise. + */ + public boolean hasReads() { + return reads != null; + } + + /** + * Can this data source provide a locus context? + * @return True if possible, false otherwise. + */ + public boolean hasLocusContext() { + return locusContextProvider != null; + } + + /** + * Can this data source provide reference information? + * @return True if possible, false otherwise. + */ + public boolean hasReference() { + return referenceProvider != null; + } + + /** + * Gets an iterator over all the reads bound by this shard. + * WARNING: Right now, this cannot be concurrently accessed with getLocusContext(). + * @return An iterator over all reads in this shard. + */ + public StingSAMIterator getReadIterator() { + if( LastAccessType.LOCUS.equals(lastAccessed) ) + throw new UnsupportedOperationException("Cannot mix direct access to reads with access to locus context"); + lastAccessed = LastAccessType.READS; + return reads; + } + + /** + * Gets a locus context for a particular region on the genome. + * WARNING: Right now, this cannot be concurrently accessed with the read iterator. + * WARNING: Right now, accesses must be sequential along the genome. + * @param genomeLoc The location for which to determine the context. + * @return The context associated with this location. + */ + public LocusContext getLocusContext( GenomeLoc genomeLoc ) { + if( LastAccessType.READS.equals(lastAccessed) ) + throw new UnsupportedOperationException("Cannot mix direct access to reads with access to locus context"); + lastAccessed = LastAccessType.LOCUS; + return locusContextProvider.getLocusContext( genomeLoc ); + } + + /** + * Gets the reference base associated with this particular point on the genome. + * @param genomeLoc Region for which to retrieve the base. GenomeLoc must represent a 1-base region. + * @return The base at the position represented by this genomeLoc. + */ + public char getReferenceBase( GenomeLoc genomeLoc ) { + return referenceProvider.getReferenceBase(genomeLoc); + } + + /** + * Create a data provider for the shard given the reads and reference. + * @param shard The chunk of data over which traversals happen. + * @param reads A window into the reads for a given region. + * @param reference A getter for a section of the reference. + */ + public ShardDataProvider( Shard shard, SAMDataSource reads, IndexedFastaSequenceFile reference ) { + // Provide basic reads information. + this.reads = reads.seek( shard ); + // Watch out! the locus context provider will start prefetching data off the queue. Only create this + // if absolutely necessary. + this.locusContextProvider = !(shard instanceof ReadShard) ? new LocusContextProvider(this.reads) : null; + this.referenceProvider = (reference != null) ? new ReferenceProvider(reference,shard) : null; + } + + /** + * Retire this shard. + */ + public void close() { + reads.close(); + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 3ea4bce11..c98ad4f18 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -1,12 +1,10 @@ package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; -import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.GenomeAnalysisTK; import org.broadinstitute.sting.gatk.OutputTracker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; @@ -60,16 +58,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce * @param refFile Reference for driving the traversal. * @param nThreadsToUse maximum number of threads to use to do the work */ - protected HierarchicalMicroScheduler( List reads, File refFile, List> rods, int nThreadsToUse ) { - super( reads, refFile ); - - - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); - traversalEngine = new TraverseLociByReference( reads, refFile, rods ); - } - - public TraversalEngine getTraversalEngine() { - return traversalEngine; + protected HierarchicalMicroScheduler( Walker walker, List reads, File refFile, List> rods, int nThreadsToUse ) { + super( walker, reads, refFile, rods ); + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + if( !(traversalEngine instanceof TraverseLociByReference) ) + throw new UnsupportedOperationException("Traversal engine supports only traverse loci by reference at this time."); } public void execute( Walker walker, List intervals ) { @@ -78,8 +71,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers"); ShardStrategy shardStrategy = getShardStrategy( reference, intervals ); - SAMDataSource dataSource = getReadsDataSource(); - ReduceTree reduceTree = new ReduceTree( this ); walker.initialize(); @@ -100,7 +91,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce if( isTreeReduceReady() ) queueNextTreeReduce( walker ); else if( isShardTraversePending() ) - queueNextShardTraverse( walker, dataSource, reduceTree ); + queueNextShardTraverse( walker, reduceTree ); } // Merge any lingering output files. If these files aren't ready, @@ -208,19 +199,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce /** * Queues the next traversal of a walker from the traversal tasks queue. * @param walker Walker to apply to the dataset. - * @param dataSource Source of the reads + * @param reduceTree Tree of reduces to which to add this shard traverse. */ - protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource, ReduceTree reduceTree ) { + protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) { if( traverseTasks.size() == 0 ) throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); + Shard shard = traverseTasks.remove(); OutputMerger outputMerger = new OutputMerger(); ShardTraverser traverser = new ShardTraverser( traversalEngine, walker, - traverseTasks.remove(), - reference, - dataSource, + shard, + getShardDataProvider(shard), outputMerger ); Future traverseResult = threadPool.submit(traverser); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 0e246a0a7..3c12d2b57 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -1,15 +1,8 @@ package org.broadinstitute.sting.gatk.executive; -import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; -import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; -import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; -import org.broadinstitute.sting.gatk.traversals.TraverseByReads; -import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; -import org.broadinstitute.sting.gatk.traversals.TraverseReads; -import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; @@ -21,25 +14,14 @@ import java.util.List; /** A micro-scheduling manager for single-threaded execution of a traversal. */ public class LinearMicroScheduler extends MicroScheduler { - private boolean isAReadWalker = false; - /** * Create a new linear microscheduler to process the given reads and reference. * * @param reads Reads file(s) to process. * @param refFile Reference for driving the traversal. */ - protected LinearMicroScheduler(List reads, File refFile, List> rods, Walker walker) { - super(reads, refFile); - - // determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology. I pinky swear... - isAReadWalker = (walker instanceof ReadWalker) ? true : false; - - if (isAReadWalker) { - traversalEngine = new TraverseByReads(reads, refFile, rods); - } else { - traversalEngine = new TraverseLociByReference(reads, refFile, rods); - } + protected LinearMicroScheduler( Walker walker, List reads, File refFile, List> rods ) { + super(walker, reads, refFile, rods); } /** @@ -50,25 +32,14 @@ public class LinearMicroScheduler extends MicroScheduler { */ public void execute(Walker walker, List locations) { ShardStrategy shardStrategy = getShardStrategy(reference, locations); - SAMDataSource dataSource = getReadsDataSource(); walker.initialize(); Object accumulator = walker.reduceInit(); for (Shard shard : shardStrategy) { - - StingSAMIterator readShard = dataSource.seek(shard); - - ReferenceProvider referenceProvider = new ReferenceProvider(reference, shard); - LocusContextProvider locusProvider = new LocusContextProvider(readShard); - - if (!isAReadWalker) { - accumulator = ((TraverseLociByReference) traversalEngine).traverse(walker, shard, referenceProvider, locusProvider, accumulator); - } else { - accumulator = ((TraverseReads) traversalEngine).traverse(walker, shard, readShard, accumulator); - } - - readShard.close(); + ShardDataProvider dataProvider = getShardDataProvider( shard ); + accumulator = traversalEngine.traverse(walker, shard, dataProvider, accumulator); + dataProvider.close(); } traversalEngine.printOnTraversalDone(accumulator); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 7e2dca21b..2c4713e1d 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -4,14 +4,19 @@ import edu.mit.broad.picard.reference.ReferenceSequenceFile; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; +import org.broadinstitute.sting.gatk.traversals.TraverseByReads; +import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.io.File; @@ -30,15 +35,14 @@ import java.util.List; * Shards and schedules data in manageable chunks. */ public abstract class MicroScheduler { - private List reads; private static long SHARD_SIZE = 100000L; protected static Logger logger = Logger.getLogger(MicroScheduler.class); - protected IndexedFastaSequenceFile reference; - - protected TraversalEngine traversalEngine = null; + protected final TraversalEngine traversalEngine; + protected final IndexedFastaSequenceFile reference; + private final SAMDataSource reads; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the @@ -50,11 +54,11 @@ public abstract class MicroScheduler { public static MicroScheduler create( Walker walker, List reads, File ref, List> rods, int nThreadsToUse ) { if( walker instanceof TreeReducible && nThreadsToUse > 1 ) { logger.info("Creating hierarchical microscheduler"); - return new HierarchicalMicroScheduler( reads, ref, rods, nThreadsToUse ); + return new HierarchicalMicroScheduler( walker, reads, ref, rods, nThreadsToUse ); } else { logger.info("Creating linear microscheduler"); - return new LinearMicroScheduler( reads, ref, rods, walker ); + return new LinearMicroScheduler( walker, reads, ref, rods ); } } @@ -63,8 +67,14 @@ public abstract class MicroScheduler { * @param reads The reads. * @param refFile File pointer to the reference. */ - protected MicroScheduler( List reads, File refFile ) { - this.reads = reads; + protected MicroScheduler( Walker walker, List reads, File refFile, List> rods ) { + if (walker instanceof ReadWalker) { + traversalEngine = new TraverseByReads(reads, refFile, rods); + } else { + traversalEngine = new TraverseLociByReference(reads, refFile, rods); + } + + this.reads = getReadsDataSource( reads ); this.reference = openReferenceSequenceFile( refFile ); } @@ -105,22 +115,30 @@ public abstract class MicroScheduler { return shardStrategy; } + /** + * Gets an window into all the data that can be viewed as a single shard. + * @param shard The section of data to view. + * @return An accessor for all the data in this shard. + */ + protected ShardDataProvider getShardDataProvider( Shard shard ) { + return new ShardDataProvider( shard, reads, reference ); + } + /** * Gets a data source for the given set of reads. * @return A data source for the given set of reads. */ - protected SAMDataSource getReadsDataSource() { - SAMDataSource dataSource = null; + private SAMDataSource getReadsDataSource( List reads ) { + List unpackedReads = null; try { - dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) ); - } - catch( SimpleDataSourceLoadException ex ) { - throw new RuntimeException( ex ); + unpackedReads = TraversalEngine.unpackReads(reads); } catch( FileNotFoundException ex ) { - throw new RuntimeException( ex ); + throw new StingException( "Cannot unpack list of reads files", ex ); } + SAMDataSource dataSource = new SAMDataSource( unpackedReads ); + // Side effect: initialize the traversal engine with reads data. // TODO: Give users a dedicated way of getting the header so that the MicroScheduler // doesn't have to bend over backward providing legacy getters and setters. diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 10330aefb..2d828f6ac 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -1,20 +1,13 @@ package org.broadinstitute.sting.gatk.executive; -import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; -import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; import org.broadinstitute.sting.gatk.GenomeAnalysisTK; import org.broadinstitute.sting.gatk.OutputTracker; -import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.Walker; -import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.util.concurrent.Callable; - -import net.sf.samtools.SAMRecord; -import net.sf.samtools.util.CloseableIterator; /** * User: hanna * Date: Apr 29, 2009 @@ -34,42 +27,31 @@ public class ShardTraverser implements Callable { private Walker walker; private TraverseLociByReference traversalEngine; private Shard shard; - private IndexedFastaSequenceFile reference; - private SAMDataSource reads; + private ShardDataProvider dataProvider; private OutputMerger output; public ShardTraverser( TraverseLociByReference traversalEngine, Walker walker, Shard shard, - IndexedFastaSequenceFile reference, - SAMDataSource reads, + ShardDataProvider dataProvider, OutputMerger output ) { this.walker = walker; this.traversalEngine = traversalEngine; this.shard = shard; - this.reference = reference; - this.reads = reads; + this.dataProvider = dataProvider; this.output = output; } public Object call() { Object accumulator = walker.reduceInit(); - - CloseableIterator readShard = null; - readShard = reads.seek( shard ); - - ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard ); - LocusContextProvider locusProvider = new LocusContextProvider( readShard ); - OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker(); - outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() ); + try { - accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + accumulator = traversalEngine.traverse( walker, shard, dataProvider, accumulator ); } finally { - readShard.close(); - + dataProvider.close(); output.complete(); outputTracker.removeLocalStreams(); } diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index a24e95684..a34b6b3b2 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -14,6 +14,8 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.dataSources.shards.Shard; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.fasta.FastaSequenceFile2; @@ -287,7 +289,7 @@ public abstract class TraversalEngine { * @param Type of the computation. */ public void printOnTraversalDone( T sum ) { - throw new UnsupportedOperationException( "This method should be overridden." ); + throw new UnsupportedOperationException( "This method is a required override for new traversal engines. Please port your traversal engine to the new style." ); } /** @@ -545,6 +547,14 @@ public abstract class TraversalEngine { return null; } + public T traverse( Walker walker, + Shard shard, + ShardDataProvider dataProvider, + T sum ) { + throw new UnsupportedOperationException( "This method is a required override for new traversal engines. Please port your traversal engine to the new style." ); + } + + // -------------------------------------------------------------------------------------------------------------- // // traversal by loci functions diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociByReference.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociByReference.java index d5119f0a8..a6bf84203 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociByReference.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseLociByReference.java @@ -7,6 +7,7 @@ import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; import org.broadinstitute.sting.gatk.dataSources.providers.InvalidPositionException; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; @@ -45,10 +46,10 @@ public class TraverseLociByReference extends TraversalEngine { throw new UnsupportedOperationException("This traversal type not supported by TraverseLociByReference"); } + @Override public T traverse( Walker walker, Shard shard, - ReferenceProvider referenceProvider, - LocusContextProvider locusProvider, + ShardDataProvider dataProvider, T sum ) { logger.debug(String.format("TraverseLociByReference.traverse Genomic interval is %s", shard.getGenomeLoc())); @@ -67,9 +68,8 @@ public class TraverseLociByReference extends TraversalEngine { // Iterate forward to get all reference ordered data covering this locus final RefMetaDataTracker tracker = getReferenceOrderedDataAtLocus( site ); - LocusContext locus = locusProvider.getLocusContext( site ); - - char refBase = referenceProvider.getReferenceBase( site ); + LocusContext locus = dataProvider.getLocusContext( site ); + char refBase = dataProvider.getReferenceBase( site ); if ( DOWNSAMPLE_BY_COVERAGE ) locus.downsampleToCoverage(downsamplingCoverage); diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java index 763bf812f..83d1b95a4 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReads.java @@ -5,7 +5,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.LocusContext; import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; -import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.walkers.ReadWalker; @@ -74,7 +74,7 @@ public class TraverseReads extends TraversalEngine { */ public T traverse(Walker walker, Shard shard, - StingSAMIterator iter, + ShardDataProvider dataProvider, T sum) { logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize())); @@ -82,12 +82,15 @@ public class TraverseReads extends TraversalEngine { if (!(walker instanceof ReadWalker)) throw new IllegalArgumentException("Walker isn't a read walker!"); + if( !dataProvider.hasReads() ) + throw new IllegalArgumentException("Unable to traverse reads; no read data is available."); + ReadWalker readWalker = (ReadWalker) walker; int readCNT = 0; // while we still have more reads - for (SAMRecord read : iter) { + for (SAMRecord read : dataProvider.getReadIterator()) { // our locus context LocusContext locus = null; diff --git a/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsTest.java b/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsTest.java index 095ca50a3..9c4ea3d82 100755 --- a/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsTest.java +++ b/java/test/org/broadinstitute/sting/gatk/traversals/TraverseReadsTest.java @@ -6,6 +6,7 @@ import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; +import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; @@ -127,51 +128,31 @@ public class TraverseReadsTest extends BaseTest { ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS, ref.getSequenceDictionary(), readSize); - SAMDataSource dataSource = null; + + List unpackedReads = null; try { - dataSource = new SAMDataSource(TraversalEngine.unpackReads(bamList)); - dataSource.viewUnmappedReads(true); - //dataSource.viewUnmappedReads(false); - } - catch (SimpleDataSourceLoadException ex) { - throw new RuntimeException(ex); + unpackedReads = TraversalEngine.unpackReads(bamList); } catch (FileNotFoundException ex) { throw new RuntimeException(ex); } + SAMDataSource dataSource = new SAMDataSource(unpackedReads); dataSource.viewUnmappedReads(false); - boolean walkerInitialized = false; - Object accumulator = null; + countReadWalker.initialize(); + Object accumulator = countReadWalker.reduceInit(); + while (shardStrategy.hasNext()) { Shard shard = shardStrategy.next(); - BoundedReadIterator readIter = null; - try { - readIter = (BoundedReadIterator) dataSource.seek(shard); - } - catch (SimpleDataSourceLoadException ex) { - throw new RuntimeException(ex); - } - //LocusContextProvider locusProvider = new LocusContextProvider( readIter ); - - // set the sam header of the traversal engine - traversalEngine.setSAMHeader(readIter.getHeader()); - - if (!walkerInitialized) { - countReadWalker.initialize(); - accumulator = ((ReadWalker) countReadWalker).reduceInit(); - walkerInitialized = true; - - } if (shard == null) { fail("Shard == null"); } - - accumulator = traversalEngine.traverse(countReadWalker, shard, readIter, accumulator); - readIter.close(); + ShardDataProvider dataProvider = new ShardDataProvider(shard,dataSource,null); + accumulator = traversalEngine.traverse(countReadWalker, shard, dataProvider, accumulator); + dataProvider.close(); } @@ -208,52 +189,30 @@ public class TraverseReadsTest extends BaseTest { ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS, ref.getSequenceDictionary(), readSize); - SAMDataSource dataSource = null; + List unpackedReads = null; try { - dataSource = new SAMDataSource(TraversalEngine.unpackReads(bamList)); - dataSource.viewUnmappedReads(true); - //dataSource.viewUnmappedReads(false); - } - catch (SimpleDataSourceLoadException ex) { - throw new RuntimeException(ex); + unpackedReads = TraversalEngine.unpackReads(bamList); } catch (FileNotFoundException ex) { throw new RuntimeException(ex); } + SAMDataSource dataSource = new SAMDataSource(unpackedReads); dataSource.viewUnmappedReads(true); - boolean walkerInitialized = false; - Object accumulator = null; + countReadWalker.initialize(); + Object accumulator = countReadWalker.reduceInit(); + while (shardStrategy.hasNext()) { Shard shard = shardStrategy.next(); - BoundedReadIterator readIter = null; - try { - readIter = (BoundedReadIterator) dataSource.seek(shard); - } - catch (SimpleDataSourceLoadException ex) { - throw new RuntimeException(ex); - } - //LocusContextProvider locusProvider = new LocusContextProvider( readIter ); - - // set the sam header of the traversal engine - traversalEngine.setSAMHeader(readIter.getHeader()); - - if (!walkerInitialized) { - countReadWalker.initialize(); - accumulator = ((ReadWalker) countReadWalker).reduceInit(); - walkerInitialized = true; - - } if (shard == null) { fail("Shard == null"); } - - accumulator = traversalEngine.traverse(countReadWalker, shard, readIter, accumulator); - readIter.close(); - + ShardDataProvider dataProvider = new ShardDataProvider(shard,dataSource,null); + accumulator = traversalEngine.traverse(countReadWalker, shard, dataProvider, accumulator); + dataProvider.close(); } traversalEngine.printOnTraversalDone("loci", accumulator); @@ -263,7 +222,7 @@ public class TraverseReadsTest extends BaseTest { fail("Count read walker should return an interger."); } if (((Integer) accumulator) != 10000) { - fail("there should be 9721 mapped reads in the index file"); + fail("there should be 10000 mapped reads in the index file"); } }