Make ActiveRegionTraversal robust to excessive coverage

-- Add a maximum per sample and overall maximum number of reads held in memory by the ART at any one time.  Does this in a new TAROrderedReadCache data structure that uses a reservior downsampler to limit the total number of reads to a constant amount.  This constant is set to be by default 3000 reads * nSamples to a global maximum of 1M reads, all controlled via the ActiveRegionTraversalParameters annotation.
-- Added an integration test and associated excessively covered BAM excessiveCoverage.1.121484835.bam (private/testdata) that checks that the system is operating correctly.
-- #resolves GSA-921
This commit is contained in:
Mark DePristo 2013-04-07 15:25:52 -04:00
parent 317dc4c323
commit 1b36db8940
6 changed files with 276 additions and 8 deletions

View File

@ -725,6 +725,15 @@ public class GenomeAnalysisEngine {
rodDataSources = getReferenceOrderedDataSources(referenceMetaDataFiles,referenceDataSource.getReference().getSequenceDictionary(),genomeLocParser,argCollection.unsafe);
}
/**
* Purely for testing purposes. Do not use unless you absolutely positively know what you are doing (or
* need to absolutely positively kill everyone in the room)
* @param dataSource
*/
public void setReadsDataSource(final SAMDataSource dataSource) {
this.readsDataSource = dataSource;
}
/**
* Entry-point function to initialize the samples database from input data and pedigree arguments
*/

View File

@ -0,0 +1,126 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import org.broadinstitute.sting.gatk.downsampling.Downsampler;
import org.broadinstitute.sting.gatk.downsampling.ReservoirDownsampler;
import org.broadinstitute.sting.utils.sam.AlignmentStartComparator;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Subsystem to track a list of all reads currently live in the TraverseActiveRegions system,
* while limiting the total number of reads to a maximum capacity.
*
* User: depristo
* Date: 4/7/13
* Time: 11:23 AM
*/
public class TAROrderedReadCache {
final int maxCapacity;
final Downsampler<GATKSAMRecord> downsampler;
/**
* Create a new empty ReadCache
* @param maxCapacity the max capacity of the read cache.
*/
public TAROrderedReadCache(int maxCapacity) {
if ( maxCapacity < 0 ) throw new IllegalArgumentException("maxCapacity must be >= 0 but got " + maxCapacity);
this.maxCapacity = maxCapacity;
this.downsampler = new ReservoirDownsampler<GATKSAMRecord>(maxCapacity);
}
/**
* What's the maximum number of reads we'll store in the cache?
* @return a positive integer
*/
public int getMaxCapacity() {
return maxCapacity;
}
/**
* Add a single read to this cache. Assumed to be in sorted order w.r.t. the previously added reads
* @param read a read to add
*/
public void add(final GATKSAMRecord read) {
if ( read == null ) throw new IllegalArgumentException("Read cannot be null");
downsampler.submit(read);
}
/**
* Add a collection of reads to this cache. Assumed to be in sorted order w.r.t. the previously added reads and each other
* @param reads a collection of reads to add
*/
public void addAll(final List<GATKSAMRecord> reads) {
if ( reads == null ) throw new IllegalArgumentException("Reads cannot be null");
downsampler.submit(reads);
}
/**
* How many reads are currently in the cache?
* @return a positive integer
*/
public int size() {
return downsampler.size();
}
/**
* How many reads were discarded since the last call to popCurrentReads
* @return
*/
public int getNumDiscarded() {
return downsampler.getNumberOfDiscardedItems();
}
/**
* Removes all reads currently in the cache, and returns them in sorted order (w.r.t. alignmentStart)
*
* Flushes this cache, so after this call the cache will contain no reads and all downsampling stats will
* be reset.
*
* @return a list of GATKSAMRecords in this cache
*/
public List<GATKSAMRecord> popCurrentReads() {
final List<GATKSAMRecord> maybeUnordered = downsampler.consumeFinalizedItems();
final List<GATKSAMRecord> ordered;
if ( downsampler.getNumberOfDiscardedItems() == 0 ) {
// haven't discarded anything, so the reads are ordered properly
ordered = maybeUnordered;
} else {
// we need to sort these damn things: O(n log n)
ordered = new ArrayList<GATKSAMRecord>(maybeUnordered);
Collections.sort(ordered, new AlignmentStartComparator());
}
// reset the downsampler stats so getNumberOfDiscardedItems is 0
downsampler.reset();
return ordered;
}
}

View File

@ -39,6 +39,7 @@ import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker;
import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.SampleUtils;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.activeregion.*;
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
@ -78,7 +79,8 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
private final LinkedList<ActiveRegion> workQueue = new LinkedList<ActiveRegion>();
private LinkedList<GATKSAMRecord> myReads = new LinkedList<GATKSAMRecord>();
private TAROrderedReadCache myReads = null;
private GenomeLoc spanOfLastReadSeen = null;
private ActivityProfile activityProfile = null;
int maxReadsInMemory = 0;
@ -117,6 +119,10 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
workQueue.add(new ActiveRegion(loc, null, true, engine.getGenomeLocParser(), getActiveRegionExtension()));
}
}
final int maxReadsAcrossSamples = annotation.maxReadsToHoldInMemoryPerSample() * SampleUtils.getSAMFileSamples(engine).size();
final int maxReadsToHoldInMemory = Math.min(maxReadsAcrossSamples, annotation.maxReadsToHoldTotal());
myReads = new TAROrderedReadCache(maxReadsToHoldInMemory);
}
// -------------------------------------------------------------------------------------
@ -257,7 +263,7 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
addIsActiveResult(walker, tracker, refContext, locus);
maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory);
printProgress(locus.getLocation());
printProgress(location);
}
updateCumulativeMetrics(dataProvider.getShard());
@ -522,27 +528,30 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
}
private T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M, T> walker) {
final Iterator<GATKSAMRecord> liveReads = myReads.iterator();
while ( liveReads.hasNext() ) {
final List<GATKSAMRecord> stillLive = new LinkedList<GATKSAMRecord>();
for ( final GATKSAMRecord read : myReads.popCurrentReads() ) {
boolean killed = false;
final GATKSAMRecord read = liveReads.next();
final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read );
if( activeRegion.getLocation().overlapsP( readLoc ) ) {
activeRegion.add(read);
if ( ! walker.wantsNonPrimaryReads() ) {
liveReads.remove();
killed = true;
}
} else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) {
activeRegion.add( read );
}
// if the read hasn't already been killed, check if it cannot occur in any more active regions, and maybe kill it
if ( ! killed && readCannotOccurInAnyMoreActiveRegions(read, activeRegion) ) {
liveReads.remove();
killed = true;
}
// keep track of all of the still live active regions
if ( ! killed ) stillLive.add(read);
}
myReads.addAll(stillLive);
if ( logger.isDebugEnabled() ) {
logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive() ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReadSpanLoc());

View File

@ -78,4 +78,20 @@ public @interface ActiveRegionTraversalParameters {
* @return the breadth of the band pass gaussian kernel we want for our traversal
*/
public double bandPassSigma() default BandPassActivityProfile.DEFAULT_SIGMA;
/**
* What is the maximum number of reads we're willing to hold in memory per sample
* during the traversal? This limits our exposure to unusually large amounts
* of coverage in the engine.
* @return the maximum number of reads we're willing to hold in memory
*/
public int maxReadsToHoldInMemoryPerSample() default 3000;
/**
* No matter what the per sample value says, we will never hold more than this
* number of reads in memory at any time. Provides an upper bound on the total number
* of reads in the case where we have a lot of samples.
* @return the maximum number of reads to hold in memory
*/
public int maxReadsToHoldTotal() default 1000000;
}

View File

@ -0,0 +1,107 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.sam.ArtificialBAMBuilder;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TAROrderedReadCacheUnitTest extends BaseTest {
// example fasta index file, can be deleted if you don't use the reference
private IndexedFastaSequenceFile seq;
@BeforeClass
public void setup() throws FileNotFoundException {
// sequence
seq = new CachingIndexedFastaSequenceFile(new File(b37KGReference));
}
@DataProvider(name = "ReadCacheTestData")
public Object[][] makeReadCacheTestData() {
List<Object[]> tests = new ArrayList<Object[]>();
for ( final int nReadsPerLocus : Arrays.asList(0, 1, 10, 100) ) {
for ( final int nLoci : Arrays.asList(1, 10, 100) ) {
for ( final int max : Arrays.asList(10, 50, 1000) ) {
for ( final boolean addAllAtOnce : Arrays.asList(true, false) ) {
tests.add(new Object[]{nReadsPerLocus, nLoci, max, addAllAtOnce});
}
}
}
}
return tests.toArray(new Object[][]{});
}
@Test(dataProvider = "ReadCacheTestData")
public void testReadCache(final int nReadsPerLocus, final int nLoci, final int max, final boolean addAllAtOnce) {
final TAROrderedReadCache cache = new TAROrderedReadCache(max);
Assert.assertEquals(cache.getMaxCapacity(), max);
Assert.assertEquals(cache.getNumDiscarded(), 0);
Assert.assertEquals(cache.size(), 0);
final ArtificialBAMBuilder bamBuilder = new ArtificialBAMBuilder(seq, nReadsPerLocus, nLoci);
final List<GATKSAMRecord> reads = bamBuilder.makeReads();
if ( addAllAtOnce ) {
cache.addAll(reads);
} else {
for ( final GATKSAMRecord read : reads ) {
cache.add(read);
}
}
final int nTotalReads = reads.size();
final int nExpectedToKeep = Math.min(nTotalReads, max);
final int nExpectedToDiscard = nTotalReads - nExpectedToKeep;
Assert.assertEquals(cache.getNumDiscarded(), nExpectedToDiscard, "wrong number of reads discarded");
Assert.assertEquals(cache.size(), nExpectedToKeep, "wrong number of reads kept");
final List<GATKSAMRecord> cacheReads = cache.popCurrentReads();
Assert.assertEquals(cache.size(), 0, "Should be no reads left");
Assert.assertEquals(cache.getNumDiscarded(), 0, "should have reset stats");
Assert.assertEquals(cacheReads.size(), nExpectedToKeep, "should have 1 read for every read we expected to keep");
int lastStart = -1;
for ( final GATKSAMRecord read : cacheReads ) {
Assert.assertTrue(lastStart <= read.getAlignmentStart(), "Reads should be sorted but weren't. Found read with start " + read.getAlignmentStart() + " while last was " + lastStart);
lastStart = read.getAlignmentStart();
}
}
}

View File

@ -470,7 +470,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
private List<LocusShardDataProvider> createDataProviders(TraverseActiveRegions traverseActiveRegions, final Walker walker, List<GenomeLoc> intervals, File bamFile) {
GenomeAnalysisEngine engine = new GenomeAnalysisEngine();
engine.setGenomeLocParser(genomeLocParser);
traverseActiveRegions.initialize(engine, walker);
Collection<SAMReaderID> samFiles = new ArrayList<SAMReaderID>();
SAMReaderID readerID = new SAMReaderID(bamFile, new Tags());
@ -486,8 +485,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
new ArrayList<ReadTransformer>(),
false, (byte)30, false, true);
engine.setReadsDataSource(dataSource);
final Set<String> samples = SampleUtils.getSAMFileSamples(dataSource.getHeader());
traverseActiveRegions.initialize(engine, walker);
List<LocusShardDataProvider> providers = new ArrayList<LocusShardDataProvider>();
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs(), samples)) {