From c8be7c3102d7764b0141665bc64955062da8c00a Mon Sep 17 00:00:00 2001 From: Menachem Fromer Date: Wed, 21 Nov 2012 15:56:53 -0500 Subject: [PATCH] Keep SNPs and indels separately for batch merging; Add options to DepthOfCoverage to count fragments (to not double-count overlapping reads of same fragment); DepthOfCoverage should now support ReducedReads; Replace recusrion with loop in DoC/package.scala (for lists longer than 5000 elements) --- .../gatk/walkers/coverage/CoverageUtils.java | 114 +++++++++++++++--- .../walkers/coverage/DepthOfCoverage.java | 6 +- .../pileup/AbstractReadBackedPileup.java | 56 ++++++++- .../sting/utils/pileup/ReadBackedPileup.java | 17 +++ .../queue/qscripts/CNV/xhmmCNVpipeline.scala | 25 +++- .../sting/queue/util/DoC/package.scala | 26 ++-- .../sting/queue/util/VCF_BAM_utilities.scala | 27 ++--- 7 files changed, 215 insertions(+), 56 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/CoverageUtils.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/CoverageUtils.java index a41e55166..21532823b 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/CoverageUtils.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/CoverageUtils.java @@ -6,11 +6,10 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.utils.BaseUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.fragments.FragmentCollection; import org.broadinstitute.sting.utils.pileup.PileupElement; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * IF THERE IS NO JAVADOC RIGHT HERE, YELL AT chartl @@ -20,6 +19,21 @@ import java.util.Map; */ public class CoverageUtils { + public enum CountPileupType { + /** + * Count all reads independently (even if from the same fragment). + */ + COUNT_READS, + /** + * Count all fragments (even if the reads that compose the fragment are not consistent at that base). + */ + COUNT_FRAGMENTS, + /** + * Count all fragments (but only if the reads that compose the fragment are consistent at that base). + */ + COUNT_FRAGMENTS_REQUIRE_SAME_BASE + } + /** * Returns the counts of bases from reads with MAPQ > minMapQ and base quality > minBaseQ in the context * as an array of ints, indexed by the index fields of BaseUtils @@ -64,10 +78,10 @@ public class CoverageUtils { } public static Map> - getBaseCountsByPartition(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, Collection types) { + getBaseCountsByPartition(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, CountPileupType countType, Collection types) { Map> countsByIDByType = new HashMap>(); - Map countsByRG = getBaseCountsByReadGroup(context,minMapQ,maxMapQ,minBaseQ,maxBaseQ); + Map countsByRG = getBaseCountsByReadGroup(context,minMapQ,maxMapQ,minBaseQ,maxBaseQ,countType); for (DoCOutputType.Partition t : types ) { // iterate through the read group counts and build the type associations for ( Map.Entry readGroupCountEntry : countsByRG.entrySet() ) { @@ -95,31 +109,95 @@ public class CoverageUtils { } } - public static Map getBaseCountsByReadGroup(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ) { + public static Map getBaseCountsByReadGroup(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, CountPileupType countType) { Map countsByRG = new HashMap(); - for ( PileupElement e : context.getBasePileup() ) { - if ( e.getMappingQual() >= minMapQ && e.getMappingQual() <= maxMapQ && ( e.getQual() >= minBaseQ && e.getQual() <= maxBaseQ || e.isDeletion() ) ) { - SAMReadGroupRecord readGroup = getReadGroup(e.getRead()); - if ( ! countsByRG.keySet().contains(readGroup) ) { - countsByRG.put(readGroup,new int[6]); - updateCounts(countsByRG.get(readGroup),e); - } else { - updateCounts(countsByRG.get(readGroup),e); + + List countPileup = new LinkedList(); + FragmentCollection fpile; + + switch (countType) { + + case COUNT_READS: + for (PileupElement e : context.getBasePileup()) + if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) + countPileup.add(e); + break; + + case COUNT_FRAGMENTS: // ignore base identities and put in FIRST base that passes filters: + fpile = context.getBasePileup().getStartSortedPileup().toFragments(); + + for (PileupElement e : fpile.getSingletonReads()) + if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) + countPileup.add(e); + + for (List overlappingPair : fpile.getOverlappingPairs()) { + // iterate over all elements in fragment: + for (PileupElement e : overlappingPair) { + if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) { + countPileup.add(e); // add the first passing element per fragment + break; + } + } } - } + break; + + case COUNT_FRAGMENTS_REQUIRE_SAME_BASE: + fpile = context.getBasePileup().getStartSortedPileup().toFragments(); + + for (PileupElement e : fpile.getSingletonReads()) + if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) + countPileup.add(e); + + for (List overlappingPair : fpile.getOverlappingPairs()) { + PileupElement firstElem = null; + PileupElement addElem = null; + + // iterate over all elements in fragment: + for (PileupElement e : overlappingPair) { + if (firstElem == null) + firstElem = e; + else if (e.getBase() != firstElem.getBase()) { + addElem = null; + break; + } + + // will add the first passing element per base-consistent fragment: + if (addElem == null && countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) + addElem = e; + } + + if (addElem != null) + countPileup.add(addElem); + } + break; + + default: + throw new UserException("Must use valid CountPileupType"); + } + + for (PileupElement e : countPileup) { + SAMReadGroupRecord readGroup = getReadGroup(e.getRead()); + if (!countsByRG.keySet().contains(readGroup)) + countsByRG.put(readGroup, new int[6]); + + updateCounts(countsByRG.get(readGroup), e); } return countsByRG; } + private static boolean countElement(PileupElement e, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ) { + return (e.getMappingQual() >= minMapQ && e.getMappingQual() <= maxMapQ && ( e.getQual() >= minBaseQ && e.getQual() <= maxBaseQ || e.isDeletion() )); + } + private static void updateCounts(int[] counts, PileupElement e) { if ( e.isDeletion() ) { - counts[BaseUtils.DELETION_INDEX]++; + counts[BaseUtils.DELETION_INDEX] += e.getRepresentativeCount(); } else if ( BaseUtils.basesAreEqual((byte) 'N', e.getBase()) ) { - counts[BaseUtils.NO_CALL_INDEX]++; + counts[BaseUtils.NO_CALL_INDEX] += e.getRepresentativeCount(); } else { try { - counts[BaseUtils.simpleBaseToBaseIndex(e.getBase())]++; + counts[BaseUtils.simpleBaseToBaseIndex(e.getBase())] += e.getRepresentativeCount(); } catch (ArrayIndexOutOfBoundsException exc) { throw new ReviewedStingException("Expected a simple base, but actually received"+(char)e.getBase()); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/DepthOfCoverage.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/DepthOfCoverage.java index 44b0d74ca..fe9942662 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/DepthOfCoverage.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/coverage/DepthOfCoverage.java @@ -129,11 +129,15 @@ public class DepthOfCoverage extends LocusWalker tracker = (PerSamplePileupElementTracker) pileupElementTracker; PerSamplePileupElementTracker filteredTracker = new PerSamplePileupElementTracker(); for (final String sample : tracker.getSamples()) { PileupElementTracker perSampleElements = tracker.getElements(sample); - AbstractReadBackedPileup pileup = createNewPileup(loc, perSampleElements).getOverlappingFragmentFilteredPileup(); + AbstractReadBackedPileup pileup = createNewPileup(loc, perSampleElements).getOverlappingFragmentFilteredPileup(discardDiscordant, baseQualNotMapQual); filteredTracker.addElements(sample, pileup.pileupElementTracker); } return (RBP) createNewPileup(loc, filteredTracker); @@ -284,11 +297,16 @@ public abstract class AbstractReadBackedPileup sortedElements = new TreeSet(new Comparator() { + @Override + public int compare(PE element1, PE element2) { + final int difference = element1.getRead().getAlignmentStart() - element2.getRead().getAlignmentStart(); + return difference != 0 ? difference : element1.getRead().getReadName().compareTo(element2.getRead().getReadName()); + } + }); + UnifiedPileupElementTracker tracker = (UnifiedPileupElementTracker) pileupElementTracker; + for (PE pile : tracker) + sortedElements.add(pile); + + UnifiedPileupElementTracker sortedTracker = new UnifiedPileupElementTracker(); + for (PE pile : sortedElements) + sortedTracker.add(pile); + + return (RBP) createNewPileup(this.getLocation(), sortedTracker); + } + @Override public FragmentCollection toFragments() { return FragmentUtils.create(this); diff --git a/public/java/src/org/broadinstitute/sting/utils/pileup/ReadBackedPileup.java b/public/java/src/org/broadinstitute/sting/utils/pileup/ReadBackedPileup.java index f15468840..be61bad99 100644 --- a/public/java/src/org/broadinstitute/sting/utils/pileup/ReadBackedPileup.java +++ b/public/java/src/org/broadinstitute/sting/utils/pileup/ReadBackedPileup.java @@ -60,6 +60,16 @@ public interface ReadBackedPileup extends Iterable, HasGenomeLoca */ public ReadBackedPileup getOverlappingFragmentFilteredPileup(); + /** + * Returns a new ReadBackedPileup where only one read from an overlapping read + * pair is retained. If discardDiscordant and the two reads in question disagree to their basecall, + * neither read is retained. Otherwise, the read with the higher + * quality (base or mapping, depending on baseQualNotMapQual) observation is retained + * + * @return the newly filtered pileup + */ + public ReadBackedPileup getOverlappingFragmentFilteredPileup(boolean discardDiscordant, boolean baseQualNotMapQual); + /** * Returns a new ReadBackedPileup that is free of mapping quality zero reads in this pileup. Note that this * does not copy the data, so both ReadBackedPileups should not be changed. Doesn't make an unnecessary copy @@ -261,6 +271,13 @@ public interface ReadBackedPileup extends Iterable, HasGenomeLoca */ public byte[] getMappingQuals(); + /** + * Returns a new ReadBackedPileup that is sorted by start coordinate of the reads. + * + * @return + */ + public ReadBackedPileup getStartSortedPileup(); + /** * Converts this pileup into a FragmentCollection (see FragmentUtils for documentation) * @return diff --git a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/CNV/xhmmCNVpipeline.scala b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/CNV/xhmmCNVpipeline.scala index 8db089484..c556913ab 100644 --- a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/CNV/xhmmCNVpipeline.scala +++ b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/CNV/xhmmCNVpipeline.scala @@ -8,6 +8,7 @@ import org.broadinstitute.sting.commandline.Hidden import java.io.{PrintStream, PrintWriter} import org.broadinstitute.sting.utils.text.XReadLines import collection.JavaConversions._ +import org.broadinstitute.sting.gatk.walkers.coverage.CoverageUtils class xhmmCNVpipeline extends QScript { qscript => @@ -15,22 +16,22 @@ class xhmmCNVpipeline extends QScript { @Input(doc = "bam input, as .bam or as a list of files", shortName = "I", required = true) var bams: File = _ - @Argument(doc = "gatk jar file", shortName = "J", required = true) + @Input(doc = "gatk jar file", shortName = "J", required = true) var gatkJarFile: File = _ - @Argument(doc = "xhmm executable file", shortName = "xhmmExec", required = true) + @Input(doc = "xhmm executable file", shortName = "xhmmExec", required = true) var xhmmExec: File = _ - @Argument(doc = "Plink/Seq executable file", shortName = "pseqExec", required = true) + @Input(doc = "Plink/Seq executable file", shortName = "pseqExec", required = true) var pseqExec: File = _ @Argument(doc = "Plink/Seq SEQDB file (Reference genome sequence)", shortName = "SEQDB", required = true) var pseqSeqDB: String = _ - @Argument(shortName = "R", doc = "ref", required = true) + @Input(shortName = "R", doc = "ref", required = true) var referenceFile: File = _ - @Argument(shortName = "L", doc = "Intervals", required = false) + @Input(shortName = "L", doc = "Intervals", required = false) var intervals: File = _ @Argument(doc = "level of parallelism for BAM DoC. By default is set to 0 [no scattering].", shortName = "scatter", required = false) @@ -42,6 +43,15 @@ class xhmmCNVpipeline extends QScript { @Output(doc = "Base name for files to output", shortName = "o", required = true) var outputBase: File = _ + @Hidden + @Argument(doc = "How should overlapping reads from the same fragment be handled?", shortName = "countType", required = false) + // TODO: change this to be the default once reads can be ordered properly for FragmentUtils.create(): + // + // Don't want to double-count (but also don't mind counting base-inconsistencies in overlap): + //var countType = CoverageUtils.CountPileupType.COUNT_FRAGMENTS + // + var countType = CoverageUtils.CountPileupType.COUNT_READS + @Argument(doc = "Maximum depth (before GATK down-sampling kicks in...)", shortName = "MAX_DEPTH", required = false) var MAX_DEPTH = 20000 @@ -56,6 +66,9 @@ class xhmmCNVpipeline extends QScript { @Argument(doc = "Minimum read mapping quality", shortName = "MMQ", required = false) var minMappingQuality = 0 + @Argument(doc = "Minimum base quality to be counted in depth", shortName = "MBQ", required = false) + var minBaseQuality = 0 + @Argument(doc = "Memory (in GB) required for storing the whole matrix in memory", shortName = "wholeMatrixMemory", required = false) var wholeMatrixMemory = -1 @@ -159,7 +172,7 @@ class xhmmCNVpipeline extends QScript { var docs: List[DoC] = List[DoC]() for (group <- groups) { Console.out.printf("Group is %s%n", group) - docs ::= new DoC(group.bams, group.DoC_output, MAX_DEPTH, minMappingQuality, scatterCountInput, START_BIN, NUM_BINS, Nil) with CommandLineGATKArgs + docs ::= new DoC(group.bams, group.DoC_output, countType, MAX_DEPTH, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, Nil) with CommandLineGATKArgs } addAll(docs) diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/DoC/package.scala b/public/scala/src/org/broadinstitute/sting/queue/util/DoC/package.scala index f35db4aa3..2b19b0f8e 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/DoC/package.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/DoC/package.scala @@ -6,9 +6,10 @@ import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFu import org.broadinstitute.sting.gatk.downsampling.DownsampleType import org.broadinstitute.sting.commandline.{Input, Gather, Output} import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.gatk.walkers.coverage.CoverageUtils package object DoC { - class DoC(val bams: List[File], val DoC_output: File, val MAX_DEPTH: Int, val minMappingQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int]) extends CommandLineGATK with ScatterGatherableFunction { + class DoC(val bams: List[File], val DoC_output: File, val countType: CoverageUtils.CountPileupType, val MAX_DEPTH: Int, val minMappingQuality: Int, val minBaseQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int]) extends CommandLineGATK with ScatterGatherableFunction { val DOC_OUTPUT_SUFFIX: String = ".sample_interval_summary" // So that the output files of this DoC run get deleted once they're used further downstream: @@ -32,8 +33,9 @@ package object DoC { override def commandLine = super.commandLine + " --omitDepthOutputAtEachBase" + " --omitLocusTable" + - " --minBaseQuality 0" + " --minMappingQuality " + minMappingQuality + + " --minBaseQuality " + minBaseQuality + + optional("--countType", countType, spaceSeparated=true, escape=true, format="%s") + " --start " + START_BIN + " --stop " + MAX_DEPTH + " --nBins " + NUM_BINS + (if (!minCoverageCalcs.isEmpty) minCoverageCalcs.map(cov => " --summaryCoverageThreshold " + cov).reduceLeft(_ + "" + _) else "") + " --includeRefNSites" + @@ -42,7 +44,7 @@ package object DoC { override def shortDescription = "DoC: " + DoC_output } - class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, MAX_DEPTH: Int, minMappingQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int]) extends DoC(bams, DoC_output, MAX_DEPTH: Int, minMappingQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs) { + class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality: Int, minBaseQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int]) extends DoC(bams, DoC_output, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs) { // HACK for DoC to work properly within Queue: @Output @Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction]) @@ -52,15 +54,21 @@ package object DoC { } def buildDoCgroups(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]], samplesPerJob: Int, outputBase: File): List[Group] = { + var l: List[Group] = Nil - def buildDoCgroupsHelper(samples: List[String], count: Int): List[Group] = (samples splitAt samplesPerJob) match { - case (Nil, y) => - return Nil - case (subsamples, remaining) => - return new Group("group" + count, outputBase, subsamples, VCF_BAM_utilities.findBAMsForSamples(subsamples, sampleToBams)) :: buildDoCgroupsHelper(remaining, count + 1) + var remaining = samples + var subsamples: List[String] = Nil + var count = 1 + + while (!remaining.isEmpty) { + val splitRes = (remaining splitAt samplesPerJob) + subsamples = splitRes._1 + remaining = splitRes._2 + l ::= new Group("group" + count, outputBase, subsamples, VCF_BAM_utilities.findBAMsForSamples(subsamples, sampleToBams)) + count = count + 1 } - return buildDoCgroupsHelper(samples, 0) + return l } // A group has a list of samples and bam files to use for DoC diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/VCF_BAM_utilities.scala b/public/scala/src/org/broadinstitute/sting/queue/util/VCF_BAM_utilities.scala index 1f18858e1..3fe867981 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/VCF_BAM_utilities.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/VCF_BAM_utilities.scala @@ -26,36 +26,31 @@ object VCF_BAM_utilities { case _ => throw new RuntimeException("Unexpected BAM input type: " + bamsIn + "; only permitted extensions are .bam and .list") } - def getMapOfBAMsForSample(bams: List[File]): scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = bams match { - case Nil => return scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]] - - case x :: y => - val m: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = getMapOfBAMsForSample(y) - val bamSamples: List[String] = getSamplesInBAM(x) + def getMapOfBAMsForSample(bams: List[File]): scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = { + var m = scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]] + for (bam <- bams) { + val bamSamples: List[String] = getSamplesInBAM(bam) for (s <- bamSamples) { if (!m.contains(s)) m += s -> scala.collection.mutable.Set.empty[File] - m(s) = m(s) + x + m(s) += bam } + } return m } def findBAMsForSamples(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]]): List[File] = { + var s = scala.collection.mutable.Set.empty[File] - def findBAMsForSamplesHelper(samples: List[String]): scala.collection.mutable.Set[File] = samples match { - case Nil => scala.collection.mutable.Set.empty[File] - - case x :: y => - var bamsForSampleX: scala.collection.mutable.Set[File] = scala.collection.mutable.Set.empty[File] - if (sampleToBams.contains(x)) - bamsForSampleX = sampleToBams(x) - return bamsForSampleX ++ findBAMsForSamplesHelper(y) + for (sample <- samples) { + if (sampleToBams.contains(sample)) + s ++= sampleToBams(sample) } val l: List[File] = Nil - return l ++ findBAMsForSamplesHelper(samples) + return l ++ s } }