From 1b36db8940dcfae5a85cfb501cac4f671ef4f28a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 7 Apr 2013 15:25:52 -0400 Subject: [PATCH] Make ActiveRegionTraversal robust to excessive coverage -- Add a maximum per sample and overall maximum number of reads held in memory by the ART at any one time. Does this in a new TAROrderedReadCache data structure that uses a reservior downsampler to limit the total number of reads to a constant amount. This constant is set to be by default 3000 reads * nSamples to a global maximum of 1M reads, all controlled via the ActiveRegionTraversalParameters annotation. -- Added an integration test and associated excessively covered BAM excessiveCoverage.1.121484835.bam (private/testdata) that checks that the system is operating correctly. -- #resolves GSA-921 --- .../sting/gatk/GenomeAnalysisEngine.java | 9 ++ .../gatk/traversals/TAROrderedReadCache.java | 126 ++++++++++++++++++ .../traversals/TraverseActiveRegions.java | 23 +++- .../ActiveRegionTraversalParameters.java | 16 +++ .../TAROrderedReadCacheUnitTest.java | 107 +++++++++++++++ .../TraverseActiveRegionsUnitTest.java | 3 +- 6 files changed, 276 insertions(+), 8 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCache.java create mode 100644 public/java/test/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCacheUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 2d8b9cd9a..fed33c1cb 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -725,6 +725,15 @@ public class GenomeAnalysisEngine { rodDataSources = getReferenceOrderedDataSources(referenceMetaDataFiles,referenceDataSource.getReference().getSequenceDictionary(),genomeLocParser,argCollection.unsafe); } + /** + * Purely for testing purposes. Do not use unless you absolutely positively know what you are doing (or + * need to absolutely positively kill everyone in the room) + * @param dataSource + */ + public void setReadsDataSource(final SAMDataSource dataSource) { + this.readsDataSource = dataSource; + } + /** * Entry-point function to initialize the samples database from input data and pedigree arguments */ diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCache.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCache.java new file mode 100644 index 000000000..80da8f8eb --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCache.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.traversals; + +import org.broadinstitute.sting.gatk.downsampling.Downsampler; +import org.broadinstitute.sting.gatk.downsampling.ReservoirDownsampler; +import org.broadinstitute.sting.utils.sam.AlignmentStartComparator; +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Subsystem to track a list of all reads currently live in the TraverseActiveRegions system, + * while limiting the total number of reads to a maximum capacity. + * + * User: depristo + * Date: 4/7/13 + * Time: 11:23 AM + */ +public class TAROrderedReadCache { + final int maxCapacity; + final Downsampler downsampler; + + /** + * Create a new empty ReadCache + * @param maxCapacity the max capacity of the read cache. + */ + public TAROrderedReadCache(int maxCapacity) { + if ( maxCapacity < 0 ) throw new IllegalArgumentException("maxCapacity must be >= 0 but got " + maxCapacity); + this.maxCapacity = maxCapacity; + this.downsampler = new ReservoirDownsampler(maxCapacity); + } + + /** + * What's the maximum number of reads we'll store in the cache? + * @return a positive integer + */ + public int getMaxCapacity() { + return maxCapacity; + } + + /** + * Add a single read to this cache. Assumed to be in sorted order w.r.t. the previously added reads + * @param read a read to add + */ + public void add(final GATKSAMRecord read) { + if ( read == null ) throw new IllegalArgumentException("Read cannot be null"); + downsampler.submit(read); + } + + /** + * Add a collection of reads to this cache. Assumed to be in sorted order w.r.t. the previously added reads and each other + * @param reads a collection of reads to add + */ + public void addAll(final List reads) { + if ( reads == null ) throw new IllegalArgumentException("Reads cannot be null"); + downsampler.submit(reads); + } + + /** + * How many reads are currently in the cache? + * @return a positive integer + */ + public int size() { + return downsampler.size(); + } + + /** + * How many reads were discarded since the last call to popCurrentReads + * @return + */ + public int getNumDiscarded() { + return downsampler.getNumberOfDiscardedItems(); + } + + /** + * Removes all reads currently in the cache, and returns them in sorted order (w.r.t. alignmentStart) + * + * Flushes this cache, so after this call the cache will contain no reads and all downsampling stats will + * be reset. + * + * @return a list of GATKSAMRecords in this cache + */ + public List popCurrentReads() { + final List maybeUnordered = downsampler.consumeFinalizedItems(); + + final List ordered; + if ( downsampler.getNumberOfDiscardedItems() == 0 ) { + // haven't discarded anything, so the reads are ordered properly + ordered = maybeUnordered; + } else { + // we need to sort these damn things: O(n log n) + ordered = new ArrayList(maybeUnordered); + Collections.sort(ordered, new AlignmentStartComparator()); + } + + // reset the downsampler stats so getNumberOfDiscardedItems is 0 + downsampler.reset(); + return ordered; + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index 908755a24..1daaaf1da 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -39,6 +39,7 @@ import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.SampleUtils; import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.activeregion.*; import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; @@ -78,7 +79,8 @@ public class TraverseActiveRegions extends TraversalEngine workQueue = new LinkedList(); - private LinkedList myReads = new LinkedList(); + private TAROrderedReadCache myReads = null; + private GenomeLoc spanOfLastReadSeen = null; private ActivityProfile activityProfile = null; int maxReadsInMemory = 0; @@ -117,6 +119,10 @@ public class TraverseActiveRegions extends TraversalEngine extends TraversalEngine extends TraversalEngine walker) { - final Iterator liveReads = myReads.iterator(); - while ( liveReads.hasNext() ) { + final List stillLive = new LinkedList(); + for ( final GATKSAMRecord read : myReads.popCurrentReads() ) { boolean killed = false; - final GATKSAMRecord read = liveReads.next(); final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read ); if( activeRegion.getLocation().overlapsP( readLoc ) ) { activeRegion.add(read); if ( ! walker.wantsNonPrimaryReads() ) { - liveReads.remove(); killed = true; } } else if( walker.wantsExtendedReads() && activeRegion.getExtendedLoc().overlapsP( readLoc )) { activeRegion.add( read ); } + // if the read hasn't already been killed, check if it cannot occur in any more active regions, and maybe kill it if ( ! killed && readCannotOccurInAnyMoreActiveRegions(read, activeRegion) ) { - liveReads.remove(); + killed = true; } + + // keep track of all of the still live active regions + if ( ! killed ) stillLive.add(read); } + myReads.addAll(stillLive); if ( logger.isDebugEnabled() ) { logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive() ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReadSpanLoc()); diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionTraversalParameters.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionTraversalParameters.java index cdb45db7b..5560946ea 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionTraversalParameters.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionTraversalParameters.java @@ -78,4 +78,20 @@ public @interface ActiveRegionTraversalParameters { * @return the breadth of the band pass gaussian kernel we want for our traversal */ public double bandPassSigma() default BandPassActivityProfile.DEFAULT_SIGMA; + + /** + * What is the maximum number of reads we're willing to hold in memory per sample + * during the traversal? This limits our exposure to unusually large amounts + * of coverage in the engine. + * @return the maximum number of reads we're willing to hold in memory + */ + public int maxReadsToHoldInMemoryPerSample() default 3000; + + /** + * No matter what the per sample value says, we will never hold more than this + * number of reads in memory at any time. Provides an upper bound on the total number + * of reads in the case where we have a lot of samples. + * @return the maximum number of reads to hold in memory + */ + public int maxReadsToHoldTotal() default 1000000; } diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCacheUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCacheUnitTest.java new file mode 100644 index 000000000..f3e1ce44b --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TAROrderedReadCacheUnitTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2012 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR + * THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.traversals; + +import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile; +import org.broadinstitute.sting.utils.sam.ArtificialBAMBuilder; +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TAROrderedReadCacheUnitTest extends BaseTest { + // example fasta index file, can be deleted if you don't use the reference + private IndexedFastaSequenceFile seq; + + @BeforeClass + public void setup() throws FileNotFoundException { + // sequence + seq = new CachingIndexedFastaSequenceFile(new File(b37KGReference)); + } + + @DataProvider(name = "ReadCacheTestData") + public Object[][] makeReadCacheTestData() { + List tests = new ArrayList(); + + for ( final int nReadsPerLocus : Arrays.asList(0, 1, 10, 100) ) { + for ( final int nLoci : Arrays.asList(1, 10, 100) ) { + for ( final int max : Arrays.asList(10, 50, 1000) ) { + for ( final boolean addAllAtOnce : Arrays.asList(true, false) ) { + tests.add(new Object[]{nReadsPerLocus, nLoci, max, addAllAtOnce}); + } + } + } + } + + return tests.toArray(new Object[][]{}); + } + + @Test(dataProvider = "ReadCacheTestData") + public void testReadCache(final int nReadsPerLocus, final int nLoci, final int max, final boolean addAllAtOnce) { + final TAROrderedReadCache cache = new TAROrderedReadCache(max); + + Assert.assertEquals(cache.getMaxCapacity(), max); + Assert.assertEquals(cache.getNumDiscarded(), 0); + Assert.assertEquals(cache.size(), 0); + + final ArtificialBAMBuilder bamBuilder = new ArtificialBAMBuilder(seq, nReadsPerLocus, nLoci); + final List reads = bamBuilder.makeReads(); + + if ( addAllAtOnce ) { + cache.addAll(reads); + } else { + for ( final GATKSAMRecord read : reads ) { + cache.add(read); + } + } + + final int nTotalReads = reads.size(); + final int nExpectedToKeep = Math.min(nTotalReads, max); + final int nExpectedToDiscard = nTotalReads - nExpectedToKeep; + Assert.assertEquals(cache.getNumDiscarded(), nExpectedToDiscard, "wrong number of reads discarded"); + Assert.assertEquals(cache.size(), nExpectedToKeep, "wrong number of reads kept"); + + final List cacheReads = cache.popCurrentReads(); + Assert.assertEquals(cache.size(), 0, "Should be no reads left"); + Assert.assertEquals(cache.getNumDiscarded(), 0, "should have reset stats"); + Assert.assertEquals(cacheReads.size(), nExpectedToKeep, "should have 1 read for every read we expected to keep"); + + int lastStart = -1; + for ( final GATKSAMRecord read : cacheReads ) { + Assert.assertTrue(lastStart <= read.getAlignmentStart(), "Reads should be sorted but weren't. Found read with start " + read.getAlignmentStart() + " while last was " + lastStart); + lastStart = read.getAlignmentStart(); + } + } +} diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java index 0384260fa..b6106d4bc 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java @@ -470,7 +470,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { private List createDataProviders(TraverseActiveRegions traverseActiveRegions, final Walker walker, List intervals, File bamFile) { GenomeAnalysisEngine engine = new GenomeAnalysisEngine(); engine.setGenomeLocParser(genomeLocParser); - traverseActiveRegions.initialize(engine, walker); Collection samFiles = new ArrayList(); SAMReaderID readerID = new SAMReaderID(bamFile, new Tags()); @@ -486,8 +485,10 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { new ArrayList(), false, (byte)30, false, true); + engine.setReadsDataSource(dataSource); final Set samples = SampleUtils.getSAMFileSamples(dataSource.getHeader()); + traverseActiveRegions.initialize(engine, walker); List providers = new ArrayList(); for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) { for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs(), samples)) {