Keep SNPs and indels separately for batch merging; Add options to DepthOfCoverage to count fragments (to not double-count overlapping reads of same fragment); DepthOfCoverage should now support ReducedReads; Replace recusrion with loop in DoC/package.scala (for lists longer than 5000 elements)

This commit is contained in:
Menachem Fromer 2012-11-21 15:56:53 -05:00
parent 8376c28728
commit c8be7c3102
7 changed files with 215 additions and 56 deletions

View File

@ -6,11 +6,10 @@ import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.utils.BaseUtils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.fragments.FragmentCollection;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
/**
* IF THERE IS NO JAVADOC RIGHT HERE, YELL AT chartl
@ -20,6 +19,21 @@ import java.util.Map;
*/
public class CoverageUtils {
public enum CountPileupType {
/**
* Count all reads independently (even if from the same fragment).
*/
COUNT_READS,
/**
* Count all fragments (even if the reads that compose the fragment are not consistent at that base).
*/
COUNT_FRAGMENTS,
/**
* Count all fragments (but only if the reads that compose the fragment are consistent at that base).
*/
COUNT_FRAGMENTS_REQUIRE_SAME_BASE
}
/**
* Returns the counts of bases from reads with MAPQ > minMapQ and base quality > minBaseQ in the context
* as an array of ints, indexed by the index fields of BaseUtils
@ -64,10 +78,10 @@ public class CoverageUtils {
}
public static Map<DoCOutputType.Partition,Map<String,int[]>>
getBaseCountsByPartition(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, Collection<DoCOutputType.Partition> types) {
getBaseCountsByPartition(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, CountPileupType countType, Collection<DoCOutputType.Partition> types) {
Map<DoCOutputType.Partition,Map<String,int[]>> countsByIDByType = new HashMap<DoCOutputType.Partition,Map<String,int[]>>();
Map<SAMReadGroupRecord,int[]> countsByRG = getBaseCountsByReadGroup(context,minMapQ,maxMapQ,minBaseQ,maxBaseQ);
Map<SAMReadGroupRecord,int[]> countsByRG = getBaseCountsByReadGroup(context,minMapQ,maxMapQ,minBaseQ,maxBaseQ,countType);
for (DoCOutputType.Partition t : types ) {
// iterate through the read group counts and build the type associations
for ( Map.Entry<SAMReadGroupRecord,int[]> readGroupCountEntry : countsByRG.entrySet() ) {
@ -95,31 +109,95 @@ public class CoverageUtils {
}
}
public static Map<SAMReadGroupRecord,int[]> getBaseCountsByReadGroup(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ) {
public static Map<SAMReadGroupRecord,int[]> getBaseCountsByReadGroup(AlignmentContext context, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ, CountPileupType countType) {
Map<SAMReadGroupRecord, int[]> countsByRG = new HashMap<SAMReadGroupRecord,int[]>();
for ( PileupElement e : context.getBasePileup() ) {
if ( e.getMappingQual() >= minMapQ && e.getMappingQual() <= maxMapQ && ( e.getQual() >= minBaseQ && e.getQual() <= maxBaseQ || e.isDeletion() ) ) {
SAMReadGroupRecord readGroup = getReadGroup(e.getRead());
if ( ! countsByRG.keySet().contains(readGroup) ) {
countsByRG.put(readGroup,new int[6]);
updateCounts(countsByRG.get(readGroup),e);
} else {
updateCounts(countsByRG.get(readGroup),e);
List<PileupElement> countPileup = new LinkedList<PileupElement>();
FragmentCollection<PileupElement> fpile;
switch (countType) {
case COUNT_READS:
for (PileupElement e : context.getBasePileup())
if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ))
countPileup.add(e);
break;
case COUNT_FRAGMENTS: // ignore base identities and put in FIRST base that passes filters:
fpile = context.getBasePileup().getStartSortedPileup().toFragments();
for (PileupElement e : fpile.getSingletonReads())
if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ))
countPileup.add(e);
for (List<PileupElement> overlappingPair : fpile.getOverlappingPairs()) {
// iterate over all elements in fragment:
for (PileupElement e : overlappingPair) {
if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ)) {
countPileup.add(e); // add the first passing element per fragment
break;
}
}
}
}
break;
case COUNT_FRAGMENTS_REQUIRE_SAME_BASE:
fpile = context.getBasePileup().getStartSortedPileup().toFragments();
for (PileupElement e : fpile.getSingletonReads())
if (countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ))
countPileup.add(e);
for (List<PileupElement> overlappingPair : fpile.getOverlappingPairs()) {
PileupElement firstElem = null;
PileupElement addElem = null;
// iterate over all elements in fragment:
for (PileupElement e : overlappingPair) {
if (firstElem == null)
firstElem = e;
else if (e.getBase() != firstElem.getBase()) {
addElem = null;
break;
}
// will add the first passing element per base-consistent fragment:
if (addElem == null && countElement(e, minMapQ, maxMapQ, minBaseQ, maxBaseQ))
addElem = e;
}
if (addElem != null)
countPileup.add(addElem);
}
break;
default:
throw new UserException("Must use valid CountPileupType");
}
for (PileupElement e : countPileup) {
SAMReadGroupRecord readGroup = getReadGroup(e.getRead());
if (!countsByRG.keySet().contains(readGroup))
countsByRG.put(readGroup, new int[6]);
updateCounts(countsByRG.get(readGroup), e);
}
return countsByRG;
}
private static boolean countElement(PileupElement e, int minMapQ, int maxMapQ, byte minBaseQ, byte maxBaseQ) {
return (e.getMappingQual() >= minMapQ && e.getMappingQual() <= maxMapQ && ( e.getQual() >= minBaseQ && e.getQual() <= maxBaseQ || e.isDeletion() ));
}
private static void updateCounts(int[] counts, PileupElement e) {
if ( e.isDeletion() ) {
counts[BaseUtils.DELETION_INDEX]++;
counts[BaseUtils.DELETION_INDEX] += e.getRepresentativeCount();
} else if ( BaseUtils.basesAreEqual((byte) 'N', e.getBase()) ) {
counts[BaseUtils.NO_CALL_INDEX]++;
counts[BaseUtils.NO_CALL_INDEX] += e.getRepresentativeCount();
} else {
try {
counts[BaseUtils.simpleBaseToBaseIndex(e.getBase())]++;
counts[BaseUtils.simpleBaseToBaseIndex(e.getBase())] += e.getRepresentativeCount();
} catch (ArrayIndexOutOfBoundsException exc) {
throw new ReviewedStingException("Expected a simple base, but actually received"+(char)e.getBase());
}

View File

@ -129,11 +129,15 @@ public class DepthOfCoverage extends LocusWalker<Map<DoCOutputType.Partition,Map
int minMappingQuality = -1;
@Argument(fullName = "maxMappingQuality", doc = "Maximum mapping quality of reads to count towards depth. Defaults to 2^31-1 (Integer.MAX_VALUE).", required = false)
int maxMappingQuality = Integer.MAX_VALUE;
@Argument(fullName = "minBaseQuality", shortName = "mbq", doc = "Minimum quality of bases to count towards depth. Defaults to -1.", required = false)
byte minBaseQuality = -1;
@Argument(fullName = "maxBaseQuality", doc = "Maximum quality of bases to count towards depth. Defaults to 127 (Byte.MAX_VALUE).", required = false)
byte maxBaseQuality = Byte.MAX_VALUE;
@Argument(fullName = "countType", doc = "How should overlapping reads from the same fragment be handled?", required = false)
CoverageUtils.CountPileupType countType = CoverageUtils.CountPileupType.COUNT_READS;
/**
* Instead of reporting depth, report the base pileup at each locus
*/
@ -373,7 +377,7 @@ public class DepthOfCoverage extends LocusWalker<Map<DoCOutputType.Partition,Map
//System.out.printf("\t[log]\t%s",ref.getLocus());
}
return CoverageUtils.getBaseCountsByPartition(context,minMappingQuality,maxMappingQuality,minBaseQuality,maxBaseQuality,partitionTypes);
return CoverageUtils.getBaseCountsByPartition(context,minMappingQuality,maxMappingQuality,minBaseQuality,maxBaseQuality,countType,partitionTypes);
} else {
return null;
}

View File

@ -254,19 +254,32 @@ public abstract class AbstractReadBackedPileup<RBP extends AbstractReadBackedPil
* Returns a new ReadBackedPileup where only one read from an overlapping read
* pair is retained. If the two reads in question disagree to their basecall,
* neither read is retained. If they agree on the base, the read with the higher
* quality observation is retained
* base quality observation is retained
*
* @return the newly filtered pileup
*/
@Override
public RBP getOverlappingFragmentFilteredPileup() {
public ReadBackedPileup getOverlappingFragmentFilteredPileup() {
return getOverlappingFragmentFilteredPileup(true, true);
}
/**
* Returns a new ReadBackedPileup where only one read from an overlapping read
* pair is retained. If discardDiscordant and the two reads in question disagree to their basecall,
* neither read is retained. Otherwise, the read with the higher
* quality (base or mapping, depending on baseQualNotMapQual) observation is retained
*
* @return the newly filtered pileup
*/
@Override
public RBP getOverlappingFragmentFilteredPileup(boolean discardDiscordant, boolean baseQualNotMapQual) {
if (pileupElementTracker instanceof PerSamplePileupElementTracker) {
PerSamplePileupElementTracker<PE> tracker = (PerSamplePileupElementTracker<PE>) pileupElementTracker;
PerSamplePileupElementTracker<PE> filteredTracker = new PerSamplePileupElementTracker<PE>();
for (final String sample : tracker.getSamples()) {
PileupElementTracker<PE> perSampleElements = tracker.getElements(sample);
AbstractReadBackedPileup<RBP, PE> pileup = createNewPileup(loc, perSampleElements).getOverlappingFragmentFilteredPileup();
AbstractReadBackedPileup<RBP, PE> pileup = createNewPileup(loc, perSampleElements).getOverlappingFragmentFilteredPileup(discardDiscordant, baseQualNotMapQual);
filteredTracker.addElements(sample, pileup.pileupElementTracker);
}
return (RBP) createNewPileup(loc, filteredTracker);
@ -284,11 +297,16 @@ public abstract class AbstractReadBackedPileup<RBP extends AbstractReadBackedPil
// if the reads disagree at this position, throw them both out. Otherwise
// keep the element with the higher quality score
if (existing.getBase() != p.getBase()) {
if (discardDiscordant && existing.getBase() != p.getBase()) {
filteredPileup.remove(readName);
} else {
if (existing.getQual() < p.getQual()) {
filteredPileup.put(readName, p);
if (baseQualNotMapQual) {
if (existing.getQual() < p.getQual())
filteredPileup.put(readName, p);
}
else {
if (existing.getMappingQual() < p.getMappingQual())
filteredPileup.put(readName, p);
}
}
}
@ -998,6 +1016,32 @@ public abstract class AbstractReadBackedPileup<RBP extends AbstractReadBackedPil
return quals2String(getQuals());
}
/**
* Returns a new ReadBackedPileup that is sorted by start coordinate of the reads.
*
* @return
*/
@Override
public ReadBackedPileup getStartSortedPileup() {
final TreeSet<PE> sortedElements = new TreeSet<PE>(new Comparator<PE>() {
@Override
public int compare(PE element1, PE element2) {
final int difference = element1.getRead().getAlignmentStart() - element2.getRead().getAlignmentStart();
return difference != 0 ? difference : element1.getRead().getReadName().compareTo(element2.getRead().getReadName());
}
});
UnifiedPileupElementTracker<PE> tracker = (UnifiedPileupElementTracker<PE>) pileupElementTracker;
for (PE pile : tracker)
sortedElements.add(pile);
UnifiedPileupElementTracker<PE> sortedTracker = new UnifiedPileupElementTracker<PE>();
for (PE pile : sortedElements)
sortedTracker.add(pile);
return (RBP) createNewPileup(this.getLocation(), sortedTracker);
}
@Override
public FragmentCollection<PileupElement> toFragments() {
return FragmentUtils.create(this);

View File

@ -60,6 +60,16 @@ public interface ReadBackedPileup extends Iterable<PileupElement>, HasGenomeLoca
*/
public ReadBackedPileup getOverlappingFragmentFilteredPileup();
/**
* Returns a new ReadBackedPileup where only one read from an overlapping read
* pair is retained. If discardDiscordant and the two reads in question disagree to their basecall,
* neither read is retained. Otherwise, the read with the higher
* quality (base or mapping, depending on baseQualNotMapQual) observation is retained
*
* @return the newly filtered pileup
*/
public ReadBackedPileup getOverlappingFragmentFilteredPileup(boolean discardDiscordant, boolean baseQualNotMapQual);
/**
* Returns a new ReadBackedPileup that is free of mapping quality zero reads in this pileup. Note that this
* does not copy the data, so both ReadBackedPileups should not be changed. Doesn't make an unnecessary copy
@ -261,6 +271,13 @@ public interface ReadBackedPileup extends Iterable<PileupElement>, HasGenomeLoca
*/
public byte[] getMappingQuals();
/**
* Returns a new ReadBackedPileup that is sorted by start coordinate of the reads.
*
* @return
*/
public ReadBackedPileup getStartSortedPileup();
/**
* Converts this pileup into a FragmentCollection (see FragmentUtils for documentation)
* @return

View File

@ -8,6 +8,7 @@ import org.broadinstitute.sting.commandline.Hidden
import java.io.{PrintStream, PrintWriter}
import org.broadinstitute.sting.utils.text.XReadLines
import collection.JavaConversions._
import org.broadinstitute.sting.gatk.walkers.coverage.CoverageUtils
class xhmmCNVpipeline extends QScript {
qscript =>
@ -15,22 +16,22 @@ class xhmmCNVpipeline extends QScript {
@Input(doc = "bam input, as .bam or as a list of files", shortName = "I", required = true)
var bams: File = _
@Argument(doc = "gatk jar file", shortName = "J", required = true)
@Input(doc = "gatk jar file", shortName = "J", required = true)
var gatkJarFile: File = _
@Argument(doc = "xhmm executable file", shortName = "xhmmExec", required = true)
@Input(doc = "xhmm executable file", shortName = "xhmmExec", required = true)
var xhmmExec: File = _
@Argument(doc = "Plink/Seq executable file", shortName = "pseqExec", required = true)
@Input(doc = "Plink/Seq executable file", shortName = "pseqExec", required = true)
var pseqExec: File = _
@Argument(doc = "Plink/Seq SEQDB file (Reference genome sequence)", shortName = "SEQDB", required = true)
var pseqSeqDB: String = _
@Argument(shortName = "R", doc = "ref", required = true)
@Input(shortName = "R", doc = "ref", required = true)
var referenceFile: File = _
@Argument(shortName = "L", doc = "Intervals", required = false)
@Input(shortName = "L", doc = "Intervals", required = false)
var intervals: File = _
@Argument(doc = "level of parallelism for BAM DoC. By default is set to 0 [no scattering].", shortName = "scatter", required = false)
@ -42,6 +43,15 @@ class xhmmCNVpipeline extends QScript {
@Output(doc = "Base name for files to output", shortName = "o", required = true)
var outputBase: File = _
@Hidden
@Argument(doc = "How should overlapping reads from the same fragment be handled?", shortName = "countType", required = false)
// TODO: change this to be the default once reads can be ordered properly for FragmentUtils.create():
//
// Don't want to double-count (but also don't mind counting base-inconsistencies in overlap):
//var countType = CoverageUtils.CountPileupType.COUNT_FRAGMENTS
//
var countType = CoverageUtils.CountPileupType.COUNT_READS
@Argument(doc = "Maximum depth (before GATK down-sampling kicks in...)", shortName = "MAX_DEPTH", required = false)
var MAX_DEPTH = 20000
@ -56,6 +66,9 @@ class xhmmCNVpipeline extends QScript {
@Argument(doc = "Minimum read mapping quality", shortName = "MMQ", required = false)
var minMappingQuality = 0
@Argument(doc = "Minimum base quality to be counted in depth", shortName = "MBQ", required = false)
var minBaseQuality = 0
@Argument(doc = "Memory (in GB) required for storing the whole matrix in memory", shortName = "wholeMatrixMemory", required = false)
var wholeMatrixMemory = -1
@ -159,7 +172,7 @@ class xhmmCNVpipeline extends QScript {
var docs: List[DoC] = List[DoC]()
for (group <- groups) {
Console.out.printf("Group is %s%n", group)
docs ::= new DoC(group.bams, group.DoC_output, MAX_DEPTH, minMappingQuality, scatterCountInput, START_BIN, NUM_BINS, Nil) with CommandLineGATKArgs
docs ::= new DoC(group.bams, group.DoC_output, countType, MAX_DEPTH, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, Nil) with CommandLineGATKArgs
}
addAll(docs)

View File

@ -6,9 +6,10 @@ import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFu
import org.broadinstitute.sting.gatk.downsampling.DownsampleType
import org.broadinstitute.sting.commandline.{Input, Gather, Output}
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.gatk.walkers.coverage.CoverageUtils
package object DoC {
class DoC(val bams: List[File], val DoC_output: File, val MAX_DEPTH: Int, val minMappingQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int]) extends CommandLineGATK with ScatterGatherableFunction {
class DoC(val bams: List[File], val DoC_output: File, val countType: CoverageUtils.CountPileupType, val MAX_DEPTH: Int, val minMappingQuality: Int, val minBaseQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int]) extends CommandLineGATK with ScatterGatherableFunction {
val DOC_OUTPUT_SUFFIX: String = ".sample_interval_summary"
// So that the output files of this DoC run get deleted once they're used further downstream:
@ -32,8 +33,9 @@ package object DoC {
override def commandLine = super.commandLine +
" --omitDepthOutputAtEachBase" +
" --omitLocusTable" +
" --minBaseQuality 0" +
" --minMappingQuality " + minMappingQuality +
" --minBaseQuality " + minBaseQuality +
optional("--countType", countType, spaceSeparated=true, escape=true, format="%s") +
" --start " + START_BIN + " --stop " + MAX_DEPTH + " --nBins " + NUM_BINS +
(if (!minCoverageCalcs.isEmpty) minCoverageCalcs.map(cov => " --summaryCoverageThreshold " + cov).reduceLeft(_ + "" + _) else "") +
" --includeRefNSites" +
@ -42,7 +44,7 @@ package object DoC {
override def shortDescription = "DoC: " + DoC_output
}
class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, MAX_DEPTH: Int, minMappingQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int]) extends DoC(bams, DoC_output, MAX_DEPTH: Int, minMappingQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs) {
class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality: Int, minBaseQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int]) extends DoC(bams, DoC_output, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs) {
// HACK for DoC to work properly within Queue:
@Output
@Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction])
@ -52,15 +54,21 @@ package object DoC {
}
def buildDoCgroups(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]], samplesPerJob: Int, outputBase: File): List[Group] = {
var l: List[Group] = Nil
def buildDoCgroupsHelper(samples: List[String], count: Int): List[Group] = (samples splitAt samplesPerJob) match {
case (Nil, y) =>
return Nil
case (subsamples, remaining) =>
return new Group("group" + count, outputBase, subsamples, VCF_BAM_utilities.findBAMsForSamples(subsamples, sampleToBams)) :: buildDoCgroupsHelper(remaining, count + 1)
var remaining = samples
var subsamples: List[String] = Nil
var count = 1
while (!remaining.isEmpty) {
val splitRes = (remaining splitAt samplesPerJob)
subsamples = splitRes._1
remaining = splitRes._2
l ::= new Group("group" + count, outputBase, subsamples, VCF_BAM_utilities.findBAMsForSamples(subsamples, sampleToBams))
count = count + 1
}
return buildDoCgroupsHelper(samples, 0)
return l
}
// A group has a list of samples and bam files to use for DoC

View File

@ -26,36 +26,31 @@ object VCF_BAM_utilities {
case _ => throw new RuntimeException("Unexpected BAM input type: " + bamsIn + "; only permitted extensions are .bam and .list")
}
def getMapOfBAMsForSample(bams: List[File]): scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = bams match {
case Nil => return scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]]
case x :: y =>
val m: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = getMapOfBAMsForSample(y)
val bamSamples: List[String] = getSamplesInBAM(x)
def getMapOfBAMsForSample(bams: List[File]): scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = {
var m = scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]]
for (bam <- bams) {
val bamSamples: List[String] = getSamplesInBAM(bam)
for (s <- bamSamples) {
if (!m.contains(s))
m += s -> scala.collection.mutable.Set.empty[File]
m(s) = m(s) + x
m(s) += bam
}
}
return m
}
def findBAMsForSamples(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]]): List[File] = {
var s = scala.collection.mutable.Set.empty[File]
def findBAMsForSamplesHelper(samples: List[String]): scala.collection.mutable.Set[File] = samples match {
case Nil => scala.collection.mutable.Set.empty[File]
case x :: y =>
var bamsForSampleX: scala.collection.mutable.Set[File] = scala.collection.mutable.Set.empty[File]
if (sampleToBams.contains(x))
bamsForSampleX = sampleToBams(x)
return bamsForSampleX ++ findBAMsForSamplesHelper(y)
for (sample <- samples) {
if (sampleToBams.contains(sample))
s ++= sampleToBams(sample)
}
val l: List[File] = Nil
return l ++ findBAMsForSamplesHelper(samples)
return l ++ s
}
}