Initial updates for ReadShard

This commit is contained in:
Joel Thibault 2013-01-03 13:57:36 -05:00
parent 14a3ac0e3c
commit e7553545ef
5 changed files with 242 additions and 129 deletions

View File

@ -570,11 +570,29 @@ public class GenomeAnalysisEngine {
else if(walker instanceof ActiveRegionWalker) {
if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate)
throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Active region walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately.");
if(intervals == null)
return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new LocusShardBalancer());
else
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer());
}
switch(argCollection.activeRegionShardType) {
case LOCUSSHARD:
if(intervals == null)
return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new LocusShardBalancer());
else
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer());
case READSHARD:
// Use the legacy ReadShardBalancer if legacy downsampling is enabled
ShardBalancer readShardBalancer = downsamplingMethod != null && downsamplingMethod.useLegacyDownsampler ?
new LegacyReadShardBalancer() :
new ReadShardBalancer();
if(intervals == null)
return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(), readShardBalancer);
else
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), readShardBalancer);
case ACTIVEREGIONSHARD:
throw new UserException.CommandLineException("Not implemented.");
default:
throw new UserException.CommandLineException("Invalid active region shard type.");
}
}
else if(walker instanceof ReadWalker || walker instanceof ReadPairWalker || walker instanceof DuplicateWalker) {
// Apply special validation to read pair walkers.
if(walker instanceof ReadPairWalker) {

View File

@ -44,6 +44,22 @@ public class LocusShardDataProvider extends ShardDataProvider {
this.locusIterator = locusIterator;
}
/**
* Create a data provider based on an input provider
* Used only by ExperimentalReadShardTraverseActiveRegions
* @param dataProvider
* @param sourceInfo
* @param genomeLocParser
* @param locus
* @param locusIterator
*/
public LocusShardDataProvider(ShardDataProvider dataProvider, ReadProperties sourceInfo, GenomeLocParser genomeLocParser, GenomeLoc locus, LocusIterator locusIterator) {
super(dataProvider.getShard(),genomeLocParser,dataProvider.getReference(),dataProvider.getReferenceOrderedData());
this.sourceInfo = sourceInfo;
this.locus = locus;
this.locusIterator = locusIterator;
}
/**
* Returns information about the source of the reads.
* @return Info about the source of the reads.

View File

@ -1,32 +1,35 @@
package org.broadinstitute.sting.gatk.traversals;
import net.sf.samtools.SAMFileHeader;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.WalkerManager;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.*;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.executive.WindowMaker;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension;
import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.SampleUtils;
import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
public class ExperimentalReadShardTraverseActiveRegions <M,T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,ReadShardDataProvider> {
/**
* our log, which we want to capture anything from this class
*/
protected final static Logger logger = Logger.getLogger(TraversalEngine.class);
private final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>();
private final LinkedHashSet<GATKSAMRecord> myReads = new LinkedHashSet<GATKSAMRecord>();
private final LinkedList<GATKSAMRecord> myReads = new LinkedList<GATKSAMRecord>();
@Override
public String getTraversalUnits() {
@ -35,71 +38,65 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
@Override
public T traverse( final ActiveRegionWalker<M,T> walker,
final LocusShardDataProvider dataProvider,
final ReadShardDataProvider readDataProvider,
T sum) {
logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider));
logger.debug(String.format("TraverseActiveRegion.traverse: Read Shard is %s", readDataProvider));
final LocusView locusView = new AllLocusView(dataProvider);
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension();
final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion();
int minStart = Integer.MAX_VALUE;
final ReadView readView = new ReadView(readDataProvider);
final List<ActiveRegion> activeRegions = new LinkedList<ActiveRegion>();
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() );
ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions());
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
Shard readShard = readDataProvider.getShard();
SAMFileHeader header = readShard.getReadProperties().getHeader();
WindowMaker windowMaker = new WindowMaker(readShard, engine.getGenomeLocParser(),
readView.iterator(), readShard.getGenomeLocs(), SampleUtils.getSAMFileSamples(header));
// We keep processing while the next reference location is within the interval
GenomeLoc prevLoc = null;
while( locusView.hasNext() ) {
final AlignmentContext locus = locusView.next();
final GenomeLoc location = locus.getLocation();
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
LocusShardDataProvider locusDataProvider = new LocusShardDataProvider(readDataProvider,
iterator.getSourceInfo(), engine.getGenomeLocParser(), iterator.getLocus(), iterator);
// Grab all the previously unseen reads from this pileup and add them to the massive read list
// Note that this must occur before we leave because we are outside the intervals because
// reads may occur outside our intervals but overlap them in the future
// TODO -- this whole HashSet logic should be changed to a linked list of reads with
// TODO -- subsequent pass over them to find the ones overlapping the active regions
for( final PileupElement p : locus.getBasePileup() ) {
final GATKSAMRecord read = p.getRead();
if( !myReads.contains(read) ) {
myReads.add(read);
final LocusView locusView = new AllLocusView(locusDataProvider);
final LocusReferenceView referenceView = new LocusReferenceView( walker, locusDataProvider );
ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, locusDataProvider, locusView);
// We keep processing while the next reference location is within the interval
GenomeLoc prevLoc = null;
while( locusView.hasNext() ) {
final AlignmentContext locus = locusView.next();
final GenomeLoc location = locus.getLocation();
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
// If this is the last pileup for this shard calculate the minimum alignment start so that we know
// which active regions in the work queue are now safe to process
minStart = Math.min(minStart, read.getAlignmentStart());
readDataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
}
// skip this location -- it's not part of our engine intervals
if ( outsideEngineIntervals(location) )
continue;
if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) {
// we've move across some interval boundary, restart profile
profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
}
dataProvider.getShard().getReadMetrics().incrementNumIterations();
// create reference context. Note that if we have a pileup of "extended events", the context will
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
final ReferenceContext refContext = referenceView.getReferenceContext(location);
// Iterate forward to get all reference ordered data covering this location
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
// Call the walkers isActive function for this locus and add them to the list to be integrated later
profile.add(walkerActiveProb(walker, tracker, refContext, locus, location));
prevLoc = location;
printProgress(locus.getLocation());
locusDataProvider.close();
}
updateCumulativeMetrics(dataProvider.getShard());
windowMaker.close();
updateCumulativeMetrics(readDataProvider.getShard());
if ( ! profile.isEmpty() )
incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize);
@ -113,7 +110,7 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) {
workQueue.removeLast();
activeRegions.remove(first);
workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) );
workQueue.addLast(new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension));
}
}
workQueue.addAll( activeRegions );
@ -121,21 +118,13 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." );
// now go and process all of the active regions
sum = processActiveRegions(walker, sum, minStart, dataProvider.getLocus().getContig());
// now process the active regions, where possible
boolean emptyQueue = false;
sum = processActiveRegions(walker, sum, emptyQueue);
return sum;
}
/**
* Is the loc outside of the intervals being requested for processing by the GATK?
* @param loc
* @return
*/
private boolean outsideEngineIntervals(final GenomeLoc loc) {
return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc);
}
/**
* Take the individual isActive calls and integrate them into contiguous active regions and
* add these blocks of work to the work queue
@ -191,12 +180,12 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
//
// --------------------------------------------------------------------------------
private T processActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) {
private T processActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, boolean emptyQueue ) {
if( walker.activeRegionOutStream != null ) {
writeActiveRegionsToStream(walker);
return sum;
} else {
return callWalkerMapOnActiveRegions(walker, sum, minStart, currentContig);
return callWalkerMapOnActiveRegions(walker, sum, emptyQueue);
}
}
@ -214,70 +203,99 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
}
}
private T callWalkerMapOnActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, final int minStart, final String currentContig ) {
// Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them
private T callWalkerMapOnActiveRegions( final ActiveRegionWalker<M,T> walker, T sum, boolean emptyQueue ) {
final int lastRegionStart = workQueue.getLast().getLocation().getStart();
final String lastRegionContig = workQueue.getLast().getLocation().getContig();
// If we've traversed sufficiently past the beginning of the workQueue we can unload those regions and process them
// TODO can implement parallel traversal here
while( workQueue.peek() != null ) {
final GenomeLoc extendedLoc = workQueue.peek().getExtendedLoc();
if ( extendedLoc.getStop() < minStart || (currentContig != null && !workQueue.peek().getExtendedLoc().getContig().equals(currentContig))) {
final ActiveRegion activeRegion = workQueue.remove();
sum = processActiveRegion(activeRegion, sum, walker);
} else {
break;
while( workQueue.peekFirst() != null ) {
ActiveRegion firstRegion = workQueue.getFirst();
final String firstRegionContig = firstRegion.getLocation().getContig();
if (emptyQueue || firstRegionContig != lastRegionContig) {
sum = processFirstActiveRegion(sum, walker);
}
else {
final int firstRegionMaxReadStop = walker.wantsExtendedReads() ? firstRegion.getMaxReadStop() : firstRegion.getExtendedMaxReadStop();
if (lastRegionStart > firstRegionMaxReadStop) {
sum = processFirstActiveRegion( sum, walker );
}
else {
break;
}
}
}
return sum;
}
private T processActiveRegion( final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M,T> walker ) {
final ArrayList<GATKSAMRecord> placedReads = new ArrayList<GATKSAMRecord>();
for( final GATKSAMRecord read : myReads ) {
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );
if( activeRegion.getLocation().overlapsP( readLoc ) ) {
/**
* Process the first active region and all remaining reads which overlap
*
* Remove the first active region from the queue
* (NB: some reads associated with this active region may have already been processed)
*
* Remove all of these reads from the queue
* (NB: some may be associated with other active regions)
*
* @param sum
* @param walker
* @return
*/
private T processFirstActiveRegion( final T sum, final ActiveRegionWalker<M,T> walker ) {
final ActiveRegion firstRegion = workQueue.removeFirst();
GATKSAMRecord firstRead = myReads.peekFirst(); // don't remove because it may not be placed here
GenomeLoc firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead );
while ( firstRegion.getLocation().overlapsP( firstReadLoc ) ||
(walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc ))) {
if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) {
// The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region)
long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc );
ActiveRegion bestRegion = activeRegion;
long maxOverlap = firstRegion.getLocation().sizeOfOverlap( firstReadLoc );
ActiveRegion bestRegion = firstRegion;
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) {
maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc );
if( otherRegionToTest.getLocation().sizeOfOverlap(firstReadLoc) >= maxOverlap ) {
maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( firstReadLoc );
bestRegion = otherRegionToTest;
}
}
bestRegion.add( read );
bestRegion.add( firstRead );
// The read is also added to all other regions in which it overlaps but marked as non-primary
if( walker.wantsNonPrimaryReads() ) {
if( !bestRegion.equals(activeRegion) ) {
activeRegion.add( read );
if( !bestRegion.equals(firstRegion) ) {
firstRegion.add(firstRead);
}
for( final ActiveRegion otherRegionToTest : workQueue ) {
if( !bestRegion.equals(otherRegionToTest) ) {
// check for non-primary vs. extended
if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) {
otherRegionToTest.add( read );
if ( otherRegionToTest.getLocation().overlapsP( firstReadLoc ) ) {
otherRegionToTest.add( firstRead );
} else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( firstReadLoc ) ) {
otherRegionToTest.add( firstRead );
}
}
}
}
placedReads.add( read );
// check for non-primary vs. extended
} else if( activeRegion.getLocation().overlapsP( readLoc ) ) {
if ( walker.wantsNonPrimaryReads() ) {
activeRegion.add( read );
}
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
}
myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region
// WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way.
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc());
final M x = walker.map(activeRegion, null);
return walker.reduce( x, sum );
// check for non-primary vs. extended
} else if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) {
if ( walker.wantsNonPrimaryReads() ) {
firstRegion.add( firstRead );
}
} else if( walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc )) {
firstRegion.add( firstRead );
}
myReads.removeFirst();
firstRead = myReads.peekFirst();
firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead );
}
logger.debug(">> Map call with " + firstRegion.getReads().size() + " " + (firstRegion.isActive ? "active" : "inactive") + " reads @ " + firstRegion.getLocation() + " with full extent: " + firstRegion.getReferenceLoc());
final M x = walker.map( firstRegion, null );
return walker.reduce(x, sum);
}
/**
@ -285,6 +303,7 @@ public class ExperimentalReadShardTraverseActiveRegions<M,T> extends TraversalEn
* Ugly for now but will be cleaned up when we push this functionality more into the engine
*/
public T endTraversal( final Walker<M,T> walker, T sum) {
return processActiveRegions((ActiveRegionWalker<M,T>)walker, sum, Integer.MAX_VALUE, null);
boolean emptyQueue = true;
return processActiveRegions((ActiveRegionWalker<M,T>)walker, sum, emptyQueue);
}
}

View File

@ -26,14 +26,19 @@ public class ActiveRegion implements HasGenomeLocation {
private final GenomeLocParser genomeLocParser;
public final boolean isActive;
// maximum stop position of all reads with start position in this active region
// Used only by ExperimentalReadShardTraverseActiveRegions
// NB: these reads may not be associated with this active region!
private int maxReadStop;
public ActiveRegion( final GenomeLoc activeRegionLoc, final boolean isActive, final GenomeLocParser genomeLocParser, final int extension ) {
this.activeRegionLoc = activeRegionLoc;
this.isActive = isActive;
this.genomeLocParser = genomeLocParser;
this.extension = extension;
extendedLoc = genomeLocParser.createGenomeLocOnContig(activeRegionLoc.getContig(), activeRegionLoc.getStart() - extension, activeRegionLoc.getStop() + extension);
fullExtentReferenceLoc = extendedLoc;
maxReadStop = activeRegionLoc.getStart();
}
@Override
@ -94,6 +99,18 @@ public class ActiveRegion implements HasGenomeLocation {
public void remove( final GATKSAMRecord read ) { reads.remove( read ); }
public void removeAll( final ArrayList<GATKSAMRecord> readsToRemove ) { reads.removeAll( readsToRemove ); }
public void setMaxReadStop(int maxReadStop) {
this.maxReadStop = maxReadStop;
}
public int getMaxReadStop() {
return maxReadStop;
}
public int getExtendedMaxReadStop() {
return maxReadStop + extension;
}
public boolean equalExceptReads(final ActiveRegion other) {
if ( activeRegionLoc.compareTo(other.activeRegionLoc) != 0 ) return false;
if ( isActive != other.isActive ) return false;

View File

@ -4,6 +4,9 @@ import com.google.java.contract.PreconditionError;
import net.sf.samtools.*;
import org.broadinstitute.sting.commandline.Tags;
import org.broadinstitute.sting.gatk.arguments.GATKArgumentCollection;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.reads.*;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
@ -17,7 +20,6 @@ import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.executive.WindowMaker;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
@ -97,7 +99,9 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
}
}
private final TraverseActiveRegions<Integer, Integer> t = new TraverseActiveRegions<Integer, Integer>();
private final TraverseActiveRegions<Integer, Integer> traverse = new TraverseActiveRegions<Integer, Integer>();
private final ExperimentalReadShardTraverseActiveRegions<Integer, Integer> readShardTraverse = new ExperimentalReadShardTraverseActiveRegions<Integer, Integer>();
private final ExperimentalActiveRegionShardTraverseActiveRegions<Integer, Integer> activeRegionShardTraverse = new ExperimentalActiveRegionShardTraverseActiveRegions<Integer, Integer>();
private IndexedFastaSequenceFile reference;
private SAMSequenceDictionary dictionary;
@ -108,6 +112,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
private static final String testBAM = "TraverseActiveRegionsUnitTest.bam";
private static final String testBAI = "TraverseActiveRegionsUnitTest.bai";
private static final ExperimentalActiveRegionShardType shardType = ExperimentalActiveRegionShardType.LOCUSSHARD;
@BeforeClass
private void init() throws FileNotFoundException {
reference = new CachingIndexedFastaSequenceFile(new File(hg19Reference));
@ -175,8 +181,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
private List<GenomeLoc> getIsActiveIntervals(DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
List<GenomeLoc> activeIntervals = new ArrayList<GenomeLoc>();
for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) {
t.traverse(walker, dataProvider, 0);
for (ShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) {
traverse(walker, dataProvider, 0);
activeIntervals.addAll(walker.isActiveCalls);
}
@ -413,10 +419,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
}
private Map<GenomeLoc, ActiveRegion> getActiveRegions(DummyActiveRegionWalker walker, List<GenomeLoc> intervals) {
for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM))
t.traverse(walker, dataProvider, 0);
for (ShardDataProvider dataProvider : createDataProviders(intervals, testBAM))
traverse(walker, dataProvider, 0);
t.endTraversal(walker, 0);
endTraversal(walker, 0);
return walker.mappedActiveRegions;
}
@ -477,13 +483,12 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
return record;
}
private List<LocusShardDataProvider> createDataProviders(List<GenomeLoc> intervals, String bamFile) {
private List<ShardDataProvider> createDataProviders(List<GenomeLoc> intervals, String bamFile) {
GenomeAnalysisEngine engine = new GenomeAnalysisEngine();
engine.setGenomeLocParser(genomeLocParser);
GATKArgumentCollection arguments = new GATKArgumentCollection();
arguments.activeRegionShardType = ExperimentalActiveRegionShardType.LOCUSSHARD; // make explicit
arguments.activeRegionShardType = shardType; // make explicit
engine.setArguments(arguments);
t.initialize(engine);
Collection<SAMReaderID> samFiles = new ArrayList<SAMReaderID>();
SAMReaderID readerID = new SAMReaderID(new File(bamFile), new Tags());
@ -491,13 +496,51 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
SAMDataSource dataSource = new SAMDataSource(samFiles, new ThreadAllocation(), null, genomeLocParser);
List<LocusShardDataProvider> providers = new ArrayList<LocusShardDataProvider>();
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) {
providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList<ReferenceOrderedDataSource>()));
}
List<ShardDataProvider> providers = new ArrayList<ShardDataProvider>();
switch (shardType) {
case LOCUSSHARD:
traverse.initialize(engine);
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) {
providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList<ReferenceOrderedDataSource>()));
}
}
break;
case READSHARD:
readShardTraverse.initialize(engine);
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ReadShardBalancer())) {
providers.add(new ReadShardDataProvider(shard, genomeLocParser, shard.iterator(), reference, new ArrayList<ReferenceOrderedDataSource>()));
}
break;
default: // other types not implemented
}
return providers;
}
private void traverse(DummyActiveRegionWalker walker, ShardDataProvider dataProvider, int i) {
switch (shardType) {
case LOCUSSHARD:
traverse.traverse(walker, (LocusShardDataProvider) dataProvider, i);
break;
case READSHARD:
readShardTraverse.traverse(walker, (ReadShardDataProvider) dataProvider, i);
break;
default: // other types not implemented
}
}
private void endTraversal(DummyActiveRegionWalker walker, int i) {
switch (shardType) {
case LOCUSSHARD:
traverse.endTraversal(walker, i);
break;
case READSHARD:
readShardTraverse.endTraversal(walker, i);
break;
default: // other types not implemented
}
}
}