diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 1187039bb..f17450247 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -570,11 +570,29 @@ public class GenomeAnalysisEngine { else if(walker instanceof ActiveRegionWalker) { if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate) throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Active region walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately."); - if(intervals == null) - return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new LocusShardBalancer()); - else - return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer()); - } + + switch(argCollection.activeRegionShardType) { + case LOCUSSHARD: + if(intervals == null) + return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(),new LocusShardBalancer()); + else + return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer()); + case READSHARD: + // Use the legacy ReadShardBalancer if legacy downsampling is enabled + ShardBalancer readShardBalancer = downsamplingMethod != null && downsamplingMethod.useLegacyDownsampler ? + new LegacyReadShardBalancer() : + new ReadShardBalancer(); + + if(intervals == null) + return readsDataSource.createShardIteratorOverMappedReads(referenceDataSource.getReference().getSequenceDictionary(), readShardBalancer); + else + return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), readShardBalancer); + case ACTIVEREGIONSHARD: + throw new UserException.CommandLineException("Not implemented."); + default: + throw new UserException.CommandLineException("Invalid active region shard type."); + } + } else if(walker instanceof ReadWalker || walker instanceof ReadPairWalker || walker instanceof DuplicateWalker) { // Apply special validation to read pair walkers. if(walker instanceof ReadPairWalker) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusShardDataProvider.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusShardDataProvider.java index 4888b9f41..1607469eb 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusShardDataProvider.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/LocusShardDataProvider.java @@ -44,6 +44,22 @@ public class LocusShardDataProvider extends ShardDataProvider { this.locusIterator = locusIterator; } + /** + * Create a data provider based on an input provider + * Used only by ExperimentalReadShardTraverseActiveRegions + * @param dataProvider + * @param sourceInfo + * @param genomeLocParser + * @param locus + * @param locusIterator + */ + public LocusShardDataProvider(ShardDataProvider dataProvider, ReadProperties sourceInfo, GenomeLocParser genomeLocParser, GenomeLoc locus, LocusIterator locusIterator) { + super(dataProvider.getShard(),genomeLocParser,dataProvider.getReference(),dataProvider.getReferenceOrderedData()); + this.sourceInfo = sourceInfo; + this.locus = locus; + this.locusIterator = locusIterator; + } + /** * Returns information about the source of the reads. * @return Info about the source of the reads. diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java index 516a7dc34..672d37f7f 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/ExperimentalReadShardTraverseActiveRegions.java @@ -1,32 +1,35 @@ package org.broadinstitute.sting.gatk.traversals; +import net.sf.samtools.SAMFileHeader; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.WalkerManager; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.datasources.providers.*; +import org.broadinstitute.sting.gatk.datasources.reads.Shard; +import org.broadinstitute.sting.gatk.executive.WindowMaker; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ActiveRegionExtension; import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.SampleUtils; import org.broadinstitute.sting.utils.activeregion.ActiveRegion; import org.broadinstitute.sting.utils.activeregion.ActivityProfile; import org.broadinstitute.sting.utils.activeregion.ActivityProfileResult; -import org.broadinstitute.sting.utils.pileup.PileupElement; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import java.util.*; -public class ExperimentalReadShardTraverseActiveRegions extends TraversalEngine,LocusShardDataProvider> { +public class ExperimentalReadShardTraverseActiveRegions extends TraversalEngine,ReadShardDataProvider> { /** * our log, which we want to capture anything from this class */ protected final static Logger logger = Logger.getLogger(TraversalEngine.class); private final LinkedList workQueue = new LinkedList(); - private final LinkedHashSet myReads = new LinkedHashSet(); + private final LinkedList myReads = new LinkedList(); @Override public String getTraversalUnits() { @@ -35,71 +38,65 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn @Override public T traverse( final ActiveRegionWalker walker, - final LocusShardDataProvider dataProvider, + final ReadShardDataProvider readDataProvider, T sum) { - logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); + logger.debug(String.format("TraverseActiveRegion.traverse: Read Shard is %s", readDataProvider)); - final LocusView locusView = new AllLocusView(dataProvider); - - final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider ); final int activeRegionExtension = walker.getClass().getAnnotation(ActiveRegionExtension.class).extension(); final int maxRegionSize = walker.getClass().getAnnotation(ActiveRegionExtension.class).maxRegion(); - int minStart = Integer.MAX_VALUE; + final ReadView readView = new ReadView(readDataProvider); + final List activeRegions = new LinkedList(); - ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions() ); + ActivityProfile profile = new ActivityProfile(engine.getGenomeLocParser(), walker.hasPresetActiveRegions()); - ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView); + Shard readShard = readDataProvider.getShard(); + SAMFileHeader header = readShard.getReadProperties().getHeader(); + WindowMaker windowMaker = new WindowMaker(readShard, engine.getGenomeLocParser(), + readView.iterator(), readShard.getGenomeLocs(), SampleUtils.getSAMFileSamples(header)); - // We keep processing while the next reference location is within the interval - GenomeLoc prevLoc = null; - while( locusView.hasNext() ) { - final AlignmentContext locus = locusView.next(); - final GenomeLoc location = locus.getLocation(); + for(WindowMaker.WindowMakerIterator iterator: windowMaker) { + LocusShardDataProvider locusDataProvider = new LocusShardDataProvider(readDataProvider, + iterator.getSourceInfo(), engine.getGenomeLocParser(), iterator.getLocus(), iterator); - // Grab all the previously unseen reads from this pileup and add them to the massive read list - // Note that this must occur before we leave because we are outside the intervals because - // reads may occur outside our intervals but overlap them in the future - // TODO -- this whole HashSet logic should be changed to a linked list of reads with - // TODO -- subsequent pass over them to find the ones overlapping the active regions - for( final PileupElement p : locus.getBasePileup() ) { - final GATKSAMRecord read = p.getRead(); - if( !myReads.contains(read) ) { - myReads.add(read); + final LocusView locusView = new AllLocusView(locusDataProvider); + final LocusReferenceView referenceView = new LocusReferenceView( walker, locusDataProvider ); + ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, locusDataProvider, locusView); + + // We keep processing while the next reference location is within the interval + GenomeLoc prevLoc = null; + while( locusView.hasNext() ) { + final AlignmentContext locus = locusView.next(); + final GenomeLoc location = locus.getLocation(); + + if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) { + // we've move across some interval boundary, restart profile + profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); } - // If this is the last pileup for this shard calculate the minimum alignment start so that we know - // which active regions in the work queue are now safe to process - minStart = Math.min(minStart, read.getAlignmentStart()); + readDataProvider.getShard().getReadMetrics().incrementNumIterations(); + + // create reference context. Note that if we have a pileup of "extended events", the context will + // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). + final ReferenceContext refContext = referenceView.getReferenceContext(location); + + // Iterate forward to get all reference ordered data covering this location + final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); + + // Call the walkers isActive function for this locus and add them to the list to be integrated later + profile.add(walkerActiveProb(walker, tracker, refContext, locus, location)); + + prevLoc = location; + + printProgress(locus.getLocation()); } - // skip this location -- it's not part of our engine intervals - if ( outsideEngineIntervals(location) ) - continue; - - if ( prevLoc != null && location.getStart() != prevLoc.getStop() + 1 ) { - // we've move across some interval boundary, restart profile - profile = incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); - } - - dataProvider.getShard().getReadMetrics().incrementNumIterations(); - - // create reference context. Note that if we have a pileup of "extended events", the context will - // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). - final ReferenceContext refContext = referenceView.getReferenceContext(location); - - // Iterate forward to get all reference ordered data covering this location - final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); - - // Call the walkers isActive function for this locus and add them to the list to be integrated later - profile.add(walkerActiveProb(walker, tracker, refContext, locus, location)); - - prevLoc = location; - - printProgress(locus.getLocation()); + locusDataProvider.close(); } - updateCumulativeMetrics(dataProvider.getShard()); + windowMaker.close(); + + updateCumulativeMetrics(readDataProvider.getShard()); if ( ! profile.isEmpty() ) incorporateActiveRegions(profile, activeRegions, activeRegionExtension, maxRegionSize); @@ -113,7 +110,7 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn if( last.isActive == first.isActive && last.getLocation().contiguousP(first.getLocation()) && last.getLocation().size() + first.getLocation().size() <= maxRegionSize ) { workQueue.removeLast(); activeRegions.remove(first); - workQueue.add( new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension) ); + workQueue.addLast(new ActiveRegion(last.getLocation().union(first.getLocation()), first.isActive, this.engine.getGenomeLocParser(), activeRegionExtension)); } } workQueue.addAll( activeRegions ); @@ -121,21 +118,13 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn logger.debug("Integrated " + profile.size() + " isActive calls into " + activeRegions.size() + " regions." ); - // now go and process all of the active regions - sum = processActiveRegions(walker, sum, minStart, dataProvider.getLocus().getContig()); + // now process the active regions, where possible + boolean emptyQueue = false; + sum = processActiveRegions(walker, sum, emptyQueue); return sum; } - /** - * Is the loc outside of the intervals being requested for processing by the GATK? - * @param loc - * @return - */ - private boolean outsideEngineIntervals(final GenomeLoc loc) { - return engine.getIntervals() != null && ! engine.getIntervals().overlaps(loc); - } - /** * Take the individual isActive calls and integrate them into contiguous active regions and * add these blocks of work to the work queue @@ -191,12 +180,12 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn // // -------------------------------------------------------------------------------- - private T processActiveRegions( final ActiveRegionWalker walker, T sum, final int minStart, final String currentContig ) { + private T processActiveRegions( final ActiveRegionWalker walker, T sum, boolean emptyQueue ) { if( walker.activeRegionOutStream != null ) { writeActiveRegionsToStream(walker); return sum; } else { - return callWalkerMapOnActiveRegions(walker, sum, minStart, currentContig); + return callWalkerMapOnActiveRegions(walker, sum, emptyQueue); } } @@ -214,70 +203,99 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn } } - private T callWalkerMapOnActiveRegions( final ActiveRegionWalker walker, T sum, final int minStart, final String currentContig ) { - // Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them + private T callWalkerMapOnActiveRegions( final ActiveRegionWalker walker, T sum, boolean emptyQueue ) { + final int lastRegionStart = workQueue.getLast().getLocation().getStart(); + final String lastRegionContig = workQueue.getLast().getLocation().getContig(); + + // If we've traversed sufficiently past the beginning of the workQueue we can unload those regions and process them // TODO can implement parallel traversal here - while( workQueue.peek() != null ) { - final GenomeLoc extendedLoc = workQueue.peek().getExtendedLoc(); - if ( extendedLoc.getStop() < minStart || (currentContig != null && !workQueue.peek().getExtendedLoc().getContig().equals(currentContig))) { - final ActiveRegion activeRegion = workQueue.remove(); - sum = processActiveRegion(activeRegion, sum, walker); - } else { - break; + while( workQueue.peekFirst() != null ) { + ActiveRegion firstRegion = workQueue.getFirst(); + final String firstRegionContig = firstRegion.getLocation().getContig(); + if (emptyQueue || firstRegionContig != lastRegionContig) { + sum = processFirstActiveRegion(sum, walker); + } + else { + final int firstRegionMaxReadStop = walker.wantsExtendedReads() ? firstRegion.getMaxReadStop() : firstRegion.getExtendedMaxReadStop(); + if (lastRegionStart > firstRegionMaxReadStop) { + sum = processFirstActiveRegion( sum, walker ); + } + else { + break; + } } } return sum; } - private T processActiveRegion( final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker walker ) { - final ArrayList placedReads = new ArrayList(); - for( final GATKSAMRecord read : myReads ) { - final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read ); - if( activeRegion.getLocation().overlapsP( readLoc ) ) { + /** + * Process the first active region and all remaining reads which overlap + * + * Remove the first active region from the queue + * (NB: some reads associated with this active region may have already been processed) + * + * Remove all of these reads from the queue + * (NB: some may be associated with other active regions) + * + * @param sum + * @param walker + * @return + */ + private T processFirstActiveRegion( final T sum, final ActiveRegionWalker walker ) { + final ActiveRegion firstRegion = workQueue.removeFirst(); + + GATKSAMRecord firstRead = myReads.peekFirst(); // don't remove because it may not be placed here + GenomeLoc firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead ); + + while ( firstRegion.getLocation().overlapsP( firstReadLoc ) || + (walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc ))) { + if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) { // The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region) - long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc ); - ActiveRegion bestRegion = activeRegion; + long maxOverlap = firstRegion.getLocation().sizeOfOverlap( firstReadLoc ); + ActiveRegion bestRegion = firstRegion; for( final ActiveRegion otherRegionToTest : workQueue ) { - if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) { - maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( readLoc ); + if( otherRegionToTest.getLocation().sizeOfOverlap(firstReadLoc) >= maxOverlap ) { + maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap( firstReadLoc ); bestRegion = otherRegionToTest; } } - bestRegion.add( read ); + bestRegion.add( firstRead ); // The read is also added to all other regions in which it overlaps but marked as non-primary if( walker.wantsNonPrimaryReads() ) { - if( !bestRegion.equals(activeRegion) ) { - activeRegion.add( read ); + if( !bestRegion.equals(firstRegion) ) { + firstRegion.add(firstRead); } for( final ActiveRegion otherRegionToTest : workQueue ) { if( !bestRegion.equals(otherRegionToTest) ) { // check for non-primary vs. extended - if ( otherRegionToTest.getLocation().overlapsP( readLoc ) ) { - otherRegionToTest.add( read ); - } else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( readLoc ) ) { - otherRegionToTest.add( read ); + if ( otherRegionToTest.getLocation().overlapsP( firstReadLoc ) ) { + otherRegionToTest.add( firstRead ); + } else if ( walker.wantsExtendedReads() && otherRegionToTest.getExtendedLoc().overlapsP( firstReadLoc ) ) { + otherRegionToTest.add( firstRead ); } } } } - placedReads.add( read ); - // check for non-primary vs. extended - } else if( activeRegion.getLocation().overlapsP( readLoc ) ) { - if ( walker.wantsNonPrimaryReads() ) { - activeRegion.add( read ); - } - } else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) { - activeRegion.add( read ); - } - } - myReads.removeAll( placedReads ); // remove all the reads which have been placed into their active region - // WARNING: This hashset relies on reads being exactly equal when they are placed in the list as when they are removed. So the ActiveRegionWalker can't modify the reads in any way. - logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLoc()); - final M x = walker.map(activeRegion, null); - return walker.reduce( x, sum ); + // check for non-primary vs. extended + } else if( firstRegion.getLocation().overlapsP( firstReadLoc ) ) { + if ( walker.wantsNonPrimaryReads() ) { + firstRegion.add( firstRead ); + } + } else if( walker.wantsExtendedReads() && firstRegion.getExtendedLoc().overlapsP( firstReadLoc )) { + firstRegion.add( firstRead ); + } + + myReads.removeFirst(); + firstRead = myReads.peekFirst(); + firstReadLoc = this.engine.getGenomeLocParser().createGenomeLoc( firstRead ); + } + + logger.debug(">> Map call with " + firstRegion.getReads().size() + " " + (firstRegion.isActive ? "active" : "inactive") + " reads @ " + firstRegion.getLocation() + " with full extent: " + firstRegion.getReferenceLoc()); + final M x = walker.map( firstRegion, null ); + return walker.reduce(x, sum); } /** @@ -285,6 +303,7 @@ public class ExperimentalReadShardTraverseActiveRegions extends TraversalEn * Ugly for now but will be cleaned up when we push this functionality more into the engine */ public T endTraversal( final Walker walker, T sum) { - return processActiveRegions((ActiveRegionWalker)walker, sum, Integer.MAX_VALUE, null); + boolean emptyQueue = true; + return processActiveRegions((ActiveRegionWalker)walker, sum, emptyQueue); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java index 3e3bb220a..d1199ad3d 100644 --- a/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java +++ b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java @@ -26,14 +26,19 @@ public class ActiveRegion implements HasGenomeLocation { private final GenomeLocParser genomeLocParser; public final boolean isActive; + // maximum stop position of all reads with start position in this active region + // Used only by ExperimentalReadShardTraverseActiveRegions + // NB: these reads may not be associated with this active region! + private int maxReadStop; + public ActiveRegion( final GenomeLoc activeRegionLoc, final boolean isActive, final GenomeLocParser genomeLocParser, final int extension ) { this.activeRegionLoc = activeRegionLoc; this.isActive = isActive; this.genomeLocParser = genomeLocParser; - this.extension = extension; extendedLoc = genomeLocParser.createGenomeLocOnContig(activeRegionLoc.getContig(), activeRegionLoc.getStart() - extension, activeRegionLoc.getStop() + extension); fullExtentReferenceLoc = extendedLoc; + maxReadStop = activeRegionLoc.getStart(); } @Override @@ -94,6 +99,18 @@ public class ActiveRegion implements HasGenomeLocation { public void remove( final GATKSAMRecord read ) { reads.remove( read ); } public void removeAll( final ArrayList readsToRemove ) { reads.removeAll( readsToRemove ); } + public void setMaxReadStop(int maxReadStop) { + this.maxReadStop = maxReadStop; + } + + public int getMaxReadStop() { + return maxReadStop; + } + + public int getExtendedMaxReadStop() { + return maxReadStop + extension; + } + public boolean equalExceptReads(final ActiveRegion other) { if ( activeRegionLoc.compareTo(other.activeRegionLoc) != 0 ) return false; if ( isActive != other.isActive ) return false; diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java index 645ba3f3f..1204e639e 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java @@ -4,6 +4,9 @@ import com.google.java.contract.PreconditionError; import net.sf.samtools.*; import org.broadinstitute.sting.commandline.Tags; import org.broadinstitute.sting.gatk.arguments.GATKArgumentCollection; +import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.*; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.utils.GenomeLocSortedSet; @@ -17,7 +20,6 @@ import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; -import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.executive.WindowMaker; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; @@ -97,7 +99,9 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { } } - private final TraverseActiveRegions t = new TraverseActiveRegions(); + private final TraverseActiveRegions traverse = new TraverseActiveRegions(); + private final ExperimentalReadShardTraverseActiveRegions readShardTraverse = new ExperimentalReadShardTraverseActiveRegions(); + private final ExperimentalActiveRegionShardTraverseActiveRegions activeRegionShardTraverse = new ExperimentalActiveRegionShardTraverseActiveRegions(); private IndexedFastaSequenceFile reference; private SAMSequenceDictionary dictionary; @@ -108,6 +112,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { private static final String testBAM = "TraverseActiveRegionsUnitTest.bam"; private static final String testBAI = "TraverseActiveRegionsUnitTest.bai"; + private static final ExperimentalActiveRegionShardType shardType = ExperimentalActiveRegionShardType.LOCUSSHARD; + @BeforeClass private void init() throws FileNotFoundException { reference = new CachingIndexedFastaSequenceFile(new File(hg19Reference)); @@ -175,8 +181,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { private List getIsActiveIntervals(DummyActiveRegionWalker walker, List intervals) { List activeIntervals = new ArrayList(); - for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) { - t.traverse(walker, dataProvider, 0); + for (ShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) { + traverse(walker, dataProvider, 0); activeIntervals.addAll(walker.isActiveCalls); } @@ -413,10 +419,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { } private Map getActiveRegions(DummyActiveRegionWalker walker, List intervals) { - for (LocusShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) - t.traverse(walker, dataProvider, 0); + for (ShardDataProvider dataProvider : createDataProviders(intervals, testBAM)) + traverse(walker, dataProvider, 0); - t.endTraversal(walker, 0); + endTraversal(walker, 0); return walker.mappedActiveRegions; } @@ -477,13 +483,12 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { return record; } - private List createDataProviders(List intervals, String bamFile) { + private List createDataProviders(List intervals, String bamFile) { GenomeAnalysisEngine engine = new GenomeAnalysisEngine(); engine.setGenomeLocParser(genomeLocParser); GATKArgumentCollection arguments = new GATKArgumentCollection(); - arguments.activeRegionShardType = ExperimentalActiveRegionShardType.LOCUSSHARD; // make explicit + arguments.activeRegionShardType = shardType; // make explicit engine.setArguments(arguments); - t.initialize(engine); Collection samFiles = new ArrayList(); SAMReaderID readerID = new SAMReaderID(new File(bamFile), new Tags()); @@ -491,13 +496,51 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { SAMDataSource dataSource = new SAMDataSource(samFiles, new ThreadAllocation(), null, genomeLocParser); - List providers = new ArrayList(); - for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) { - for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) { - providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList())); - } + List providers = new ArrayList(); + + switch (shardType) { + case LOCUSSHARD: + traverse.initialize(engine); + for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) { + for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs())) { + providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList())); + } + } + break; + case READSHARD: + readShardTraverse.initialize(engine); + for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ReadShardBalancer())) { + providers.add(new ReadShardDataProvider(shard, genomeLocParser, shard.iterator(), reference, new ArrayList())); + } + break; + default: // other types not implemented } return providers; } + + private void traverse(DummyActiveRegionWalker walker, ShardDataProvider dataProvider, int i) { + switch (shardType) { + case LOCUSSHARD: + traverse.traverse(walker, (LocusShardDataProvider) dataProvider, i); + break; + case READSHARD: + readShardTraverse.traverse(walker, (ReadShardDataProvider) dataProvider, i); + break; + default: // other types not implemented + } + } + + private void endTraversal(DummyActiveRegionWalker walker, int i) { + switch (shardType) { + case LOCUSSHARD: + traverse.endTraversal(walker, i); + break; + case READSHARD: + readShardTraverse.endTraversal(walker, i); + break; + default: // other types not implemented + } + } + }