Cleanup in preparation for ByLoci traversal. Also did some work minimizing unit tests.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@643 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
ee777c89de
commit
6e394490cb
|
|
@ -0,0 +1,131 @@
|
|||
package org.broadinstitute.sting.gatk.dataSources.providers;
|
||||
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.LocusContext;
|
||||
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: May 8, 2009
|
||||
* Time: 3:09:57 PM
|
||||
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||
* All rights are reserved.
|
||||
*
|
||||
* Users acknowledge that this software is supplied without any warranty or support.
|
||||
* The Broad Institute is not responsible for its use, misuse, or
|
||||
* functionality.
|
||||
*/
|
||||
|
||||
/**
|
||||
* An umbrella class that examines the data passed to the microscheduler and
|
||||
* tries to assemble as much as possible with it.
|
||||
*/
|
||||
public class ShardDataProvider {
|
||||
/**
|
||||
* The raw collection of reads.
|
||||
*/
|
||||
private final StingSAMIterator reads;
|
||||
|
||||
/**
|
||||
* Information about the locus. Can be accessed mutually exclusively
|
||||
* with the reads iterator.
|
||||
*/
|
||||
private final LocusContextProvider locusContextProvider;
|
||||
|
||||
/**
|
||||
* Provider of reference data for this particular shard.
|
||||
*/
|
||||
private final ReferenceProvider referenceProvider;
|
||||
|
||||
/**
|
||||
* Users should only drive using the reads directly or using the locus.
|
||||
* Perhaps in the future we can support direct simultaneous access to both.
|
||||
*/
|
||||
private enum LastAccessType { READS, LOCUS };
|
||||
private LastAccessType lastAccessed = null;
|
||||
|
||||
/**
|
||||
* Can this data source provide reads?
|
||||
* @return True if reads are available, false otherwise.
|
||||
*/
|
||||
public boolean hasReads() {
|
||||
return reads != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Can this data source provide a locus context?
|
||||
* @return True if possible, false otherwise.
|
||||
*/
|
||||
public boolean hasLocusContext() {
|
||||
return locusContextProvider != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Can this data source provide reference information?
|
||||
* @return True if possible, false otherwise.
|
||||
*/
|
||||
public boolean hasReference() {
|
||||
return referenceProvider != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an iterator over all the reads bound by this shard.
|
||||
* WARNING: Right now, this cannot be concurrently accessed with getLocusContext().
|
||||
* @return An iterator over all reads in this shard.
|
||||
*/
|
||||
public StingSAMIterator getReadIterator() {
|
||||
if( LastAccessType.LOCUS.equals(lastAccessed) )
|
||||
throw new UnsupportedOperationException("Cannot mix direct access to reads with access to locus context");
|
||||
lastAccessed = LastAccessType.READS;
|
||||
return reads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a locus context for a particular region on the genome.
|
||||
* WARNING: Right now, this cannot be concurrently accessed with the read iterator.
|
||||
* WARNING: Right now, accesses must be sequential along the genome.
|
||||
* @param genomeLoc The location for which to determine the context.
|
||||
* @return The context associated with this location.
|
||||
*/
|
||||
public LocusContext getLocusContext( GenomeLoc genomeLoc ) {
|
||||
if( LastAccessType.READS.equals(lastAccessed) )
|
||||
throw new UnsupportedOperationException("Cannot mix direct access to reads with access to locus context");
|
||||
lastAccessed = LastAccessType.LOCUS;
|
||||
return locusContextProvider.getLocusContext( genomeLoc );
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the reference base associated with this particular point on the genome.
|
||||
* @param genomeLoc Region for which to retrieve the base. GenomeLoc must represent a 1-base region.
|
||||
* @return The base at the position represented by this genomeLoc.
|
||||
*/
|
||||
public char getReferenceBase( GenomeLoc genomeLoc ) {
|
||||
return referenceProvider.getReferenceBase(genomeLoc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a data provider for the shard given the reads and reference.
|
||||
* @param shard The chunk of data over which traversals happen.
|
||||
* @param reads A window into the reads for a given region.
|
||||
* @param reference A getter for a section of the reference.
|
||||
*/
|
||||
public ShardDataProvider( Shard shard, SAMDataSource reads, IndexedFastaSequenceFile reference ) {
|
||||
// Provide basic reads information.
|
||||
this.reads = reads.seek( shard );
|
||||
// Watch out! the locus context provider will start prefetching data off the queue. Only create this
|
||||
// if absolutely necessary.
|
||||
this.locusContextProvider = !(shard instanceof ReadShard) ? new LocusContextProvider(this.reads) : null;
|
||||
this.referenceProvider = (reference != null) ? new ReferenceProvider(reference,shard) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retire this shard.
|
||||
*/
|
||||
public void close() {
|
||||
reads.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,10 @@
|
|||
package org.broadinstitute.sting.gatk.executive;
|
||||
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
||||
import org.broadinstitute.sting.gatk.OutputTracker;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
|
|
@ -60,16 +58,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
* @param refFile Reference for driving the traversal.
|
||||
* @param nThreadsToUse maximum number of threads to use to do the work
|
||||
*/
|
||||
protected HierarchicalMicroScheduler( List<File> reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
||||
super( reads, refFile );
|
||||
|
||||
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||
traversalEngine = new TraverseLociByReference( reads, refFile, rods );
|
||||
}
|
||||
|
||||
public TraversalEngine getTraversalEngine() {
|
||||
return traversalEngine;
|
||||
protected HierarchicalMicroScheduler( Walker walker, List<File> reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
||||
super( walker, reads, refFile, rods );
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||
if( !(traversalEngine instanceof TraverseLociByReference) )
|
||||
throw new UnsupportedOperationException("Traversal engine supports only traverse loci by reference at this time.");
|
||||
}
|
||||
|
||||
public void execute( Walker walker, List<GenomeLoc> intervals ) {
|
||||
|
|
@ -78,8 +71,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers");
|
||||
|
||||
ShardStrategy shardStrategy = getShardStrategy( reference, intervals );
|
||||
SAMDataSource dataSource = getReadsDataSource();
|
||||
|
||||
ReduceTree reduceTree = new ReduceTree( this );
|
||||
|
||||
walker.initialize();
|
||||
|
|
@ -100,7 +91,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
if( isTreeReduceReady() )
|
||||
queueNextTreeReduce( walker );
|
||||
else if( isShardTraversePending() )
|
||||
queueNextShardTraverse( walker, dataSource, reduceTree );
|
||||
queueNextShardTraverse( walker, reduceTree );
|
||||
}
|
||||
|
||||
// Merge any lingering output files. If these files aren't ready,
|
||||
|
|
@ -208,19 +199,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
/**
|
||||
* Queues the next traversal of a walker from the traversal tasks queue.
|
||||
* @param walker Walker to apply to the dataset.
|
||||
* @param dataSource Source of the reads
|
||||
* @param reduceTree Tree of reduces to which to add this shard traverse.
|
||||
*/
|
||||
protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource, ReduceTree reduceTree ) {
|
||||
protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) {
|
||||
if( traverseTasks.size() == 0 )
|
||||
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
|
||||
|
||||
Shard shard = traverseTasks.remove();
|
||||
OutputMerger outputMerger = new OutputMerger();
|
||||
|
||||
ShardTraverser traverser = new ShardTraverser( traversalEngine,
|
||||
walker,
|
||||
traverseTasks.remove(),
|
||||
reference,
|
||||
dataSource,
|
||||
shard,
|
||||
getShardDataProvider(shard),
|
||||
outputMerger );
|
||||
|
||||
Future traverseResult = threadPool.submit(traverser);
|
||||
|
|
|
|||
|
|
@ -1,15 +1,8 @@
|
|||
package org.broadinstitute.sting.gatk.executive;
|
||||
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseByReads;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseReads;
|
||||
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
|
|
@ -21,25 +14,14 @@ import java.util.List;
|
|||
/** A micro-scheduling manager for single-threaded execution of a traversal. */
|
||||
public class LinearMicroScheduler extends MicroScheduler {
|
||||
|
||||
private boolean isAReadWalker = false;
|
||||
|
||||
/**
|
||||
* Create a new linear microscheduler to process the given reads and reference.
|
||||
*
|
||||
* @param reads Reads file(s) to process.
|
||||
* @param refFile Reference for driving the traversal.
|
||||
*/
|
||||
protected LinearMicroScheduler(List<File> reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, Walker walker) {
|
||||
super(reads, refFile);
|
||||
|
||||
// determine if we're a read walker: they get a slightly different, but not in any way worse execute methodology. I pinky swear...
|
||||
isAReadWalker = (walker instanceof ReadWalker) ? true : false;
|
||||
|
||||
if (isAReadWalker) {
|
||||
traversalEngine = new TraverseByReads(reads, refFile, rods);
|
||||
} else {
|
||||
traversalEngine = new TraverseLociByReference(reads, refFile, rods);
|
||||
}
|
||||
protected LinearMicroScheduler( Walker walker, List<File> reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods ) {
|
||||
super(walker, reads, refFile, rods);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -50,25 +32,14 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
*/
|
||||
public void execute(Walker walker, List<GenomeLoc> locations) {
|
||||
ShardStrategy shardStrategy = getShardStrategy(reference, locations);
|
||||
SAMDataSource dataSource = getReadsDataSource();
|
||||
|
||||
walker.initialize();
|
||||
Object accumulator = walker.reduceInit();
|
||||
|
||||
for (Shard shard : shardStrategy) {
|
||||
|
||||
StingSAMIterator readShard = dataSource.seek(shard);
|
||||
|
||||
ReferenceProvider referenceProvider = new ReferenceProvider(reference, shard);
|
||||
LocusContextProvider locusProvider = new LocusContextProvider(readShard);
|
||||
|
||||
if (!isAReadWalker) {
|
||||
accumulator = ((TraverseLociByReference) traversalEngine).traverse(walker, shard, referenceProvider, locusProvider, accumulator);
|
||||
} else {
|
||||
accumulator = ((TraverseReads) traversalEngine).traverse(walker, shard, readShard, accumulator);
|
||||
}
|
||||
|
||||
readShard.close();
|
||||
ShardDataProvider dataProvider = getShardDataProvider( shard );
|
||||
accumulator = traversalEngine.traverse(walker, shard, dataProvider, accumulator);
|
||||
dataProvider.close();
|
||||
}
|
||||
|
||||
traversalEngine.printOnTraversalDone(accumulator);
|
||||
|
|
|
|||
|
|
@ -4,14 +4,19 @@ import edu.mit.broad.picard.reference.ReferenceSequenceFile;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseByReads;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
||||
|
||||
import java.io.File;
|
||||
|
|
@ -30,15 +35,14 @@ import java.util.List;
|
|||
* Shards and schedules data in manageable chunks.
|
||||
*/
|
||||
public abstract class MicroScheduler {
|
||||
private List<File> reads;
|
||||
private static long SHARD_SIZE = 100000L;
|
||||
|
||||
protected static Logger logger = Logger.getLogger(MicroScheduler.class);
|
||||
|
||||
protected IndexedFastaSequenceFile reference;
|
||||
|
||||
protected TraversalEngine traversalEngine = null;
|
||||
protected final TraversalEngine traversalEngine;
|
||||
protected final IndexedFastaSequenceFile reference;
|
||||
|
||||
private final SAMDataSource reads;
|
||||
|
||||
/**
|
||||
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
|
||||
|
|
@ -50,11 +54,11 @@ public abstract class MicroScheduler {
|
|||
public static MicroScheduler create( Walker walker, List<File> reads, File ref, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
|
||||
if( walker instanceof TreeReducible && nThreadsToUse > 1 ) {
|
||||
logger.info("Creating hierarchical microscheduler");
|
||||
return new HierarchicalMicroScheduler( reads, ref, rods, nThreadsToUse );
|
||||
return new HierarchicalMicroScheduler( walker, reads, ref, rods, nThreadsToUse );
|
||||
}
|
||||
else {
|
||||
logger.info("Creating linear microscheduler");
|
||||
return new LinearMicroScheduler( reads, ref, rods, walker );
|
||||
return new LinearMicroScheduler( walker, reads, ref, rods );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,8 +67,14 @@ public abstract class MicroScheduler {
|
|||
* @param reads The reads.
|
||||
* @param refFile File pointer to the reference.
|
||||
*/
|
||||
protected MicroScheduler( List<File> reads, File refFile ) {
|
||||
this.reads = reads;
|
||||
protected MicroScheduler( Walker walker, List<File> reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods ) {
|
||||
if (walker instanceof ReadWalker) {
|
||||
traversalEngine = new TraverseByReads(reads, refFile, rods);
|
||||
} else {
|
||||
traversalEngine = new TraverseLociByReference(reads, refFile, rods);
|
||||
}
|
||||
|
||||
this.reads = getReadsDataSource( reads );
|
||||
this.reference = openReferenceSequenceFile( refFile );
|
||||
}
|
||||
|
||||
|
|
@ -105,22 +115,30 @@ public abstract class MicroScheduler {
|
|||
return shardStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an window into all the data that can be viewed as a single shard.
|
||||
* @param shard The section of data to view.
|
||||
* @return An accessor for all the data in this shard.
|
||||
*/
|
||||
protected ShardDataProvider getShardDataProvider( Shard shard ) {
|
||||
return new ShardDataProvider( shard, reads, reference );
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a data source for the given set of reads.
|
||||
* @return A data source for the given set of reads.
|
||||
*/
|
||||
protected SAMDataSource getReadsDataSource() {
|
||||
SAMDataSource dataSource = null;
|
||||
private SAMDataSource getReadsDataSource( List<File> reads ) {
|
||||
List<File> unpackedReads = null;
|
||||
try {
|
||||
dataSource = new SAMDataSource( TraversalEngine.unpackReads(reads) );
|
||||
}
|
||||
catch( SimpleDataSourceLoadException ex ) {
|
||||
throw new RuntimeException( ex );
|
||||
unpackedReads = TraversalEngine.unpackReads(reads);
|
||||
}
|
||||
catch( FileNotFoundException ex ) {
|
||||
throw new RuntimeException( ex );
|
||||
throw new StingException( "Cannot unpack list of reads files", ex );
|
||||
}
|
||||
|
||||
SAMDataSource dataSource = new SAMDataSource( unpackedReads );
|
||||
|
||||
// Side effect: initialize the traversal engine with reads data.
|
||||
// TODO: Give users a dedicated way of getting the header so that the MicroScheduler
|
||||
// doesn't have to bend over backward providing legacy getters and setters.
|
||||
|
|
|
|||
|
|
@ -1,20 +1,13 @@
|
|||
package org.broadinstitute.sting.gatk.executive;
|
||||
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
||||
import org.broadinstitute.sting.gatk.OutputTracker;
|
||||
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import net.sf.samtools.util.CloseableIterator;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: Apr 29, 2009
|
||||
|
|
@ -34,42 +27,31 @@ public class ShardTraverser implements Callable {
|
|||
private Walker walker;
|
||||
private TraverseLociByReference traversalEngine;
|
||||
private Shard shard;
|
||||
private IndexedFastaSequenceFile reference;
|
||||
private SAMDataSource reads;
|
||||
private ShardDataProvider dataProvider;
|
||||
private OutputMerger output;
|
||||
|
||||
public ShardTraverser( TraverseLociByReference traversalEngine,
|
||||
Walker walker,
|
||||
Shard shard,
|
||||
IndexedFastaSequenceFile reference,
|
||||
SAMDataSource reads,
|
||||
ShardDataProvider dataProvider,
|
||||
OutputMerger output ) {
|
||||
this.walker = walker;
|
||||
this.traversalEngine = traversalEngine;
|
||||
this.shard = shard;
|
||||
this.reference = reference;
|
||||
this.reads = reads;
|
||||
this.dataProvider = dataProvider;
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public Object call() {
|
||||
Object accumulator = walker.reduceInit();
|
||||
|
||||
CloseableIterator<SAMRecord> readShard = null;
|
||||
readShard = reads.seek( shard );
|
||||
|
||||
ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard );
|
||||
LocusContextProvider locusProvider = new LocusContextProvider( readShard );
|
||||
|
||||
OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker();
|
||||
|
||||
outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() );
|
||||
|
||||
try {
|
||||
accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator );
|
||||
accumulator = traversalEngine.traverse( walker, shard, dataProvider, accumulator );
|
||||
}
|
||||
finally {
|
||||
readShard.close();
|
||||
|
||||
dataProvider.close();
|
||||
output.complete();
|
||||
outputTracker.removeLocalStreams();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
|||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.utils.*;
|
||||
import org.broadinstitute.sting.utils.fasta.FastaSequenceFile2;
|
||||
|
||||
|
|
@ -287,7 +289,7 @@ public abstract class TraversalEngine {
|
|||
* @param <T> Type of the computation.
|
||||
*/
|
||||
public <T> void printOnTraversalDone( T sum ) {
|
||||
throw new UnsupportedOperationException( "This method should be overridden." );
|
||||
throw new UnsupportedOperationException( "This method is a required override for new traversal engines. Please port your traversal engine to the new style." );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -545,6 +547,14 @@ public abstract class TraversalEngine {
|
|||
return null;
|
||||
}
|
||||
|
||||
public <M,T> T traverse( Walker<M,T> walker,
|
||||
Shard shard,
|
||||
ShardDataProvider dataProvider,
|
||||
T sum ) {
|
||||
throw new UnsupportedOperationException( "This method is a required override for new traversal engines. Please port your traversal engine to the new style." );
|
||||
}
|
||||
|
||||
|
||||
// --------------------------------------------------------------------------------------------------------------
|
||||
//
|
||||
// traversal by loci functions
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
|||
import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.InvalidPositionException;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
||||
|
|
@ -45,10 +46,10 @@ public class TraverseLociByReference extends TraversalEngine {
|
|||
throw new UnsupportedOperationException("This traversal type not supported by TraverseLociByReference");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <M,T> T traverse( Walker<M,T> walker,
|
||||
Shard shard,
|
||||
ReferenceProvider referenceProvider,
|
||||
LocusContextProvider locusProvider,
|
||||
ShardDataProvider dataProvider,
|
||||
T sum ) {
|
||||
logger.debug(String.format("TraverseLociByReference.traverse Genomic interval is %s", shard.getGenomeLoc()));
|
||||
|
||||
|
|
@ -67,9 +68,8 @@ public class TraverseLociByReference extends TraversalEngine {
|
|||
// Iterate forward to get all reference ordered data covering this locus
|
||||
final RefMetaDataTracker tracker = getReferenceOrderedDataAtLocus( site );
|
||||
|
||||
LocusContext locus = locusProvider.getLocusContext( site );
|
||||
|
||||
char refBase = referenceProvider.getReferenceBase( site );
|
||||
LocusContext locus = dataProvider.getLocusContext( site );
|
||||
char refBase = dataProvider.getReferenceBase( site );
|
||||
|
||||
if ( DOWNSAMPLE_BY_COVERAGE )
|
||||
locus.downsampleToCoverage(downsamplingCoverage);
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import org.apache.log4j.Logger;
|
|||
import org.broadinstitute.sting.gatk.LocusContext;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.ReadShard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
||||
|
|
@ -74,7 +74,7 @@ public class TraverseReads extends TraversalEngine {
|
|||
*/
|
||||
public <M, T> T traverse(Walker<M, T> walker,
|
||||
Shard shard,
|
||||
StingSAMIterator iter,
|
||||
ShardDataProvider dataProvider,
|
||||
T sum) {
|
||||
|
||||
logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize()));
|
||||
|
|
@ -82,12 +82,15 @@ public class TraverseReads extends TraversalEngine {
|
|||
if (!(walker instanceof ReadWalker))
|
||||
throw new IllegalArgumentException("Walker isn't a read walker!");
|
||||
|
||||
if( !dataProvider.hasReads() )
|
||||
throw new IllegalArgumentException("Unable to traverse reads; no read data is available.");
|
||||
|
||||
ReadWalker<M, T> readWalker = (ReadWalker<M, T>) walker;
|
||||
|
||||
int readCNT = 0;
|
||||
|
||||
// while we still have more reads
|
||||
for (SAMRecord read : iter) {
|
||||
for (SAMRecord read : dataProvider.getReadIterator()) {
|
||||
|
||||
// our locus context
|
||||
LocusContext locus = null;
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
|||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
|
||||
import org.broadinstitute.sting.gatk.dataSources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
|
|
@ -127,51 +128,31 @@ public class TraverseReadsTest extends BaseTest {
|
|||
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,
|
||||
ref.getSequenceDictionary(),
|
||||
readSize);
|
||||
SAMDataSource dataSource = null;
|
||||
|
||||
List<File> unpackedReads = null;
|
||||
try {
|
||||
dataSource = new SAMDataSource(TraversalEngine.unpackReads(bamList));
|
||||
dataSource.viewUnmappedReads(true);
|
||||
//dataSource.viewUnmappedReads(false);
|
||||
}
|
||||
catch (SimpleDataSourceLoadException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
unpackedReads = TraversalEngine.unpackReads(bamList);
|
||||
}
|
||||
catch (FileNotFoundException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
SAMDataSource dataSource = new SAMDataSource(unpackedReads);
|
||||
dataSource.viewUnmappedReads(false);
|
||||
|
||||
boolean walkerInitialized = false;
|
||||
Object accumulator = null;
|
||||
countReadWalker.initialize();
|
||||
Object accumulator = countReadWalker.reduceInit();
|
||||
|
||||
while (shardStrategy.hasNext()) {
|
||||
Shard shard = shardStrategy.next();
|
||||
BoundedReadIterator readIter = null;
|
||||
try {
|
||||
readIter = (BoundedReadIterator) dataSource.seek(shard);
|
||||
}
|
||||
catch (SimpleDataSourceLoadException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
//LocusContextProvider locusProvider = new LocusContextProvider( readIter );
|
||||
|
||||
// set the sam header of the traversal engine
|
||||
traversalEngine.setSAMHeader(readIter.getHeader());
|
||||
|
||||
if (!walkerInitialized) {
|
||||
countReadWalker.initialize();
|
||||
accumulator = ((ReadWalker<?, ?>) countReadWalker).reduceInit();
|
||||
walkerInitialized = true;
|
||||
|
||||
}
|
||||
if (shard == null) {
|
||||
fail("Shard == null");
|
||||
}
|
||||
|
||||
|
||||
accumulator = traversalEngine.traverse(countReadWalker, shard, readIter, accumulator);
|
||||
readIter.close();
|
||||
ShardDataProvider dataProvider = new ShardDataProvider(shard,dataSource,null);
|
||||
accumulator = traversalEngine.traverse(countReadWalker, shard, dataProvider, accumulator);
|
||||
dataProvider.close();
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -208,52 +189,30 @@ public class TraverseReadsTest extends BaseTest {
|
|||
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,
|
||||
ref.getSequenceDictionary(),
|
||||
readSize);
|
||||
SAMDataSource dataSource = null;
|
||||
List<File> unpackedReads = null;
|
||||
try {
|
||||
dataSource = new SAMDataSource(TraversalEngine.unpackReads(bamList));
|
||||
dataSource.viewUnmappedReads(true);
|
||||
//dataSource.viewUnmappedReads(false);
|
||||
}
|
||||
catch (SimpleDataSourceLoadException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
unpackedReads = TraversalEngine.unpackReads(bamList);
|
||||
}
|
||||
catch (FileNotFoundException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
SAMDataSource dataSource = new SAMDataSource(unpackedReads);
|
||||
dataSource.viewUnmappedReads(true);
|
||||
|
||||
boolean walkerInitialized = false;
|
||||
Object accumulator = null;
|
||||
countReadWalker.initialize();
|
||||
Object accumulator = countReadWalker.reduceInit();
|
||||
|
||||
while (shardStrategy.hasNext()) {
|
||||
Shard shard = shardStrategy.next();
|
||||
BoundedReadIterator readIter = null;
|
||||
try {
|
||||
readIter = (BoundedReadIterator) dataSource.seek(shard);
|
||||
}
|
||||
catch (SimpleDataSourceLoadException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
//LocusContextProvider locusProvider = new LocusContextProvider( readIter );
|
||||
|
||||
// set the sam header of the traversal engine
|
||||
traversalEngine.setSAMHeader(readIter.getHeader());
|
||||
|
||||
if (!walkerInitialized) {
|
||||
countReadWalker.initialize();
|
||||
accumulator = ((ReadWalker<?, ?>) countReadWalker).reduceInit();
|
||||
walkerInitialized = true;
|
||||
|
||||
}
|
||||
if (shard == null) {
|
||||
fail("Shard == null");
|
||||
}
|
||||
|
||||
|
||||
accumulator = traversalEngine.traverse(countReadWalker, shard, readIter, accumulator);
|
||||
readIter.close();
|
||||
|
||||
ShardDataProvider dataProvider = new ShardDataProvider(shard,dataSource,null);
|
||||
accumulator = traversalEngine.traverse(countReadWalker, shard, dataProvider, accumulator);
|
||||
dataProvider.close();
|
||||
}
|
||||
|
||||
traversalEngine.printOnTraversalDone("loci", accumulator);
|
||||
|
|
@ -263,7 +222,7 @@ public class TraverseReadsTest extends BaseTest {
|
|||
fail("Count read walker should return an interger.");
|
||||
}
|
||||
if (((Integer) accumulator) != 10000) {
|
||||
fail("there should be 9721 mapped reads in the index file");
|
||||
fail("there should be 10000 mapped reads in the index file");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue