Merge pull request #650 from broadinstitute/xhmm_Qscript

For XHMM and Depth-of-Coverage Qscripts, add ability for user to input s...
This commit is contained in:
Eric Banks 2014-06-10 10:20:21 -04:00
commit a20da6b9aa
2 changed files with 273 additions and 108 deletions

View File

@ -29,16 +29,19 @@ import org.broadinstitute.gatk.queue.extensions.gatk._
import org.broadinstitute.gatk.queue.QScript
import org.broadinstitute.gatk.queue.util.VCF_BAM_utilities
import org.broadinstitute.gatk.queue.extensions.gatk.DoC._
import org.broadinstitute.gatk.utils.commandline.Hidden
import java.io.{PrintStream, PrintWriter}
import org.broadinstitute.gatk.utils.commandline._
import java.io.{File, PrintStream, PrintWriter}
import org.broadinstitute.gatk.utils.text.XReadLines
import collection.JavaConversions._
import org.broadinstitute.gatk.tools.walkers.coverage.CoverageUtils
import org.broadinstitute.gatk.queue.function.scattergather.{CloneFunction, ScatterFunction, GatherFunction, ScatterGatherableFunction}
import org.broadinstitute.gatk.queue.function.{CommandLineFunction, InProcessFunction}
import org.broadinstitute.gatk.utils.io.IOUtils
class xhmmCNVpipeline extends QScript {
qscript =>
@Input(doc = "bam input, as .bam or as a list of files", shortName = "I", required = true)
@Input(doc = "bam input, as as a list of .bam files, or a list of bam files with sample IDs to be used ( as specified at https://www.broadinstitute.org/gatk/gatkdocs/org_broadinstitute_sting_gatk_CommandLineGATK.html#--sample_rename_mapping_file )", shortName = "I", required = true)
var bams: File = _
@Input(doc = "gatk jar file", shortName = "J", required = true)
@ -62,7 +65,7 @@ class xhmmCNVpipeline extends QScript {
@Argument(doc = "level of parallelism for BAM DoC. By default is set to 0 [no scattering].", shortName = "scatter", required = false)
var scatterCountInput = 0
@Argument(doc = "Samples to run together for DoC. By default is set to 1 [one job per sample].", shortName = "samplesPerJob", required = false)
@Argument(doc = "Samples to run together for DoC, CNV discovery, and CNV genotyping. By default is set to 1 [one job per sample].", shortName = "samplesPerJob", required = false)
var samplesPerJob = 1
@Output(doc = "Base name for files to output", shortName = "o", required = true)
@ -104,15 +107,6 @@ class xhmmCNVpipeline extends QScript {
@Argument(shortName = "maxTargRepeats", doc = "Exclude all targets with % of repeat-masked bases greater than this value", required = false)
var maxTargRepeats : Double = 0.1
@Argument(shortName = "sampleIDsMap", doc = "File mapping BAM sample IDs to desired sample IDs", required = false)
var sampleIDsMap: String = ""
@Argument(shortName = "sampleIDsMapFromColumn", doc = "Column number of OLD sample IDs to map", required = false)
var sampleIDsMapFromColumn = 1
@Argument(shortName = "sampleIDsMapToColumn", doc = "Column number of NEW sample IDs to map", required = false)
var sampleIDsMapToColumn = 2
@Argument(shortName = "rawFilters", doc = "xhmm command-line parameters to filter targets and samples from raw data", required = false)
var targetSampleFiltersString: String = ""
@ -184,19 +178,23 @@ class xhmmCNVpipeline extends QScript {
this.logging_level = "INFO"
}
val sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = VCF_BAM_utilities.getMapOfBAMsForSample(VCF_BAM_utilities.parseBAMsInput(bams))
val samples: List[String] = sampleToBams.keys.toList
val parseMixedInputBamList = parseBamListWithOptionalSampleMappings(bams)
val processMixedInputBamList = new ProcessBamListWithOptionalSampleMappings(parseMixedInputBamList, outputBase.getPath)
add(processMixedInputBamList)
val samples: List[String] = parseMixedInputBamList.sampleToBams.keys.toList
Console.out.printf("Samples are %s%n", samples)
val groups: List[Group] = buildDoCgroups(samples, sampleToBams, samplesPerJob, outputBase)
val groups: List[Group] = buildDoCgroups(samples, parseMixedInputBamList.sampleToBams, samplesPerJob, outputBase)
var docs: List[DoC] = List[DoC]()
for (group <- groups) {
Console.out.printf("Group is %s%n", group)
docs ::= new DoC(group.bams, group.DoC_output, countType, MAX_DEPTH, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, Nil) with CommandLineGATKArgs
docs ::= new DoC(group.bams, group.DoC_output, countType, MAX_DEPTH, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, Nil, Some(processMixedInputBamList.bamSampleMap)) with CommandLineGATKArgs
}
addAll(docs)
val mergeDepths = new MergeGATKdepths(docs.map(u => u.intervalSampleOut), outputBase.getPath + RD_OUTPUT_SUFFIX, "_mean_cvg", xhmmExec, sampleIDsMap, sampleIDsMapFromColumn, sampleIDsMapToColumn, None, false) with WholeMatrixMemoryLimit with LongRunTime
val mergeDepths = new MergeGATKdepths(docs.map(u => u.intervalSampleOut), outputBase.getPath + RD_OUTPUT_SUFFIX, "_mean_cvg", xhmmExec, None, false) with WholeMatrixMemoryLimit with LongRunTime
add(mergeDepths)
var excludeTargets : List[File] = List[File]()
@ -210,6 +208,7 @@ class xhmmCNVpipeline extends QScript {
excludeTargets ::= excludeTargetsBasedOnGC.out
}
class CalculateRepeatComplexity(outFile : String) extends CommandLineFunction {
@Input(doc="")
var intervals: File = prepTargets.out
@ -233,7 +232,7 @@ class xhmmCNVpipeline extends QScript {
" && " + calcRepeatMaskedPercent +
" && " + extractRepeatMaskedPercent
def commandLine = command
override def commandLine = command
override def description = "Calculate the percentage of each target that is repeat-masked in the reference sequence: " + command
}
@ -262,9 +261,103 @@ class xhmmCNVpipeline extends QScript {
val filterOriginal = new FilterOriginalData(mergeDepths.mergedDoC, filterCenterDepths, filterZscore)
add(filterOriginal)
class DiscoverCNVs(inputParam: File, origRDParam: File) extends SamplesScatterable(xhmmExec, groups) with LongRunTime {
@Input(doc = "")
val input = inputParam
@Input(doc = "")
val xhmmParams = xhmmParamsArg
@Input(doc = "")
val origRD = origRDParam
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
val xcnv: File = new File(outputBase.getPath + ".xcnv")
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
val aux_xcnv: File = new File(outputBase.getPath + ".aux_xcnv")
// Set as an @Output, so that its value is updated in the cloned jobs being scattered:
@Output
@Gather(classOf[DummyGatherFunction])
val posteriorsBase = outputBase
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
val dipPosteriors: File = new File(posteriorsBase.getPath + ".posteriors.DIP.txt")
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
val delPosteriors: File = new File(posteriorsBase.getPath + ".posteriors.DEL.txt")
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
val dupPosteriors: File = new File(posteriorsBase.getPath + ".posteriors.DUP.txt")
override def commandLine =
xhmmExec + " --discover" +
" -p " + xhmmParams +
" -r " + input +
" -R " + origRD +
" -c " + xcnv +
" -a " + aux_xcnv +
" -s " + posteriorsBase +
" " + discoverCommandLineParams +
" " + addCommand
override def description = "Discovers CNVs in normalized data: " + commandLine
}
val discover = new DiscoverCNVs(filterZscore.filteredZscored, filterOriginal.sameFiltered)
add(discover)
abstract class BaseGenotypeCNVs(inputParam: File, xcnv: File, origRDParam: File, outName: String) extends SamplesScatterable(xhmmExec, groups) with LongRunTime {
@Input(doc = "")
val input = inputParam
@Input(doc = "")
val xhmmParams = xhmmParamsArg
@Input(doc = "")
val origRD = origRDParam
@Input(doc = "")
val inXcnv = xcnv
@Output
@Gather(classOf[MergeVCFsGatherFunction])
val vcf: File = new File(outName)
override def commandLine =
xhmmExec + " --genotype" +
" -p " + xhmmParams +
" -r " + input +
" -g " + inXcnv +
" -F " + referenceFile +
" -R " + origRD +
" -v " + vcf +
" " + genotypeCommandLineParams +
" " + addCommand
}
class GenotypeCNVs(inputParam: File, xcnv: File, origRDParam: File) extends BaseGenotypeCNVs(inputParam, xcnv, origRDParam, outputBase.getPath + ".vcf") {
override def description = "Genotypes discovered CNVs in all samples: " + commandLine
}
class GenotypeCNVandSubsegments(inputParam: File, xcnv: File, origRDParam: File) extends BaseGenotypeCNVs(inputParam, xcnv, origRDParam, outputBase.getPath + ".subsegments.vcf") {
override def commandLine =
super.commandLine +
" --subsegments" +
" --maxTargetsInSubsegment " + maxTargetsInSubsegment +
" --genotypeQualThresholdWhenNoExact " + subsegmentGenotypeThreshold
override def description = "Genotypes discovered CNVs (and their sub-segments, of up to " + maxTargetsInSubsegment + " targets) in all samples: " + commandLine
}
val genotype = new GenotypeCNVs(filterZscore.filteredZscored, discover.xcnv, filterOriginal.sameFiltered)
add(genotype)
@ -330,7 +423,7 @@ class xhmmCNVpipeline extends QScript {
if (targetSampleFiltersString != "")
command += " " + targetSampleFiltersString
def commandLine = command
override def commandLine = command
override def description = "Filters samples and targets and then mean-centers the targets: " + command
}
@ -353,7 +446,7 @@ class xhmmCNVpipeline extends QScript {
" -r " + input +
" --PCAfiles " + PCAbase
def commandLine = command
override def commandLine = command
override def description = "Runs PCA on mean-centered data: " + command
}
@ -382,7 +475,7 @@ class xhmmCNVpipeline extends QScript {
if (PCAnormalizeMethodString != "")
command += " " + PCAnormalizeMethodString
def commandLine = command
override def commandLine = command
override def description = "Normalizes mean-centered data using PCA information: " + command
}
@ -408,7 +501,7 @@ class xhmmCNVpipeline extends QScript {
if (targetSampleNormalizedFiltersString != "")
command += " " + targetSampleNormalizedFiltersString
def commandLine = command
override def commandLine = command
override def description = "Filters and z-score centers (by sample) the PCA-normalized data: " + command
}
@ -433,100 +526,90 @@ class xhmmCNVpipeline extends QScript {
sampFilters.map(u => " --excludeSamples " + u).reduceLeft(_ + "" + _) +
" -o " + sameFiltered
def commandLine = command
override def commandLine = command
override def description = "Filters original read-depth data to be the same as filtered, normalized data: " + command
}
}
class DiscoverCNVs(inputParam: File, origRDParam: File) extends CommandLineFunction with LongRunTime {
@Input(doc = "")
val input = inputParam
@Input(doc = "")
val xhmmParams = xhmmParamsArg
abstract class SamplesScatterable(val xhmmExec: File, val groups: List[Group]) extends ScatterGatherableFunction with CommandLineFunction {
this.scatterCount = groups.size
this.scatterClass = classOf[SamplesScatterFunction]
@Input(doc = "")
val origRD = origRDParam
@Input(doc = "", required=false)
var keepSampleIDs: Option[String] = None
@Output
val xcnv: File = new File(outputBase.getPath + ".xcnv")
def addCommand = if (keepSampleIDs.isDefined) ("--keepSampleIDs " + keepSampleIDs.get) else ""
}
@Output
val aux_xcnv: File = new File(outputBase.getPath + ".aux_xcnv")
class SamplesScatterFunction extends ScatterFunction with InProcessFunction {
protected var groups: List[Group] = _
override def scatterCount = groups.size
val posteriorsBase = outputBase.getPath
@Output(doc="Scatter function outputs")
var scatterSamples: Seq[File] = Nil
@Output
val dipPosteriors: File = new File(posteriorsBase + ".posteriors.DIP.txt")
@Output
val delPosteriors: File = new File(posteriorsBase + ".posteriors.DEL.txt")
@Output
val dupPosteriors: File = new File(posteriorsBase + ".posteriors.DUP.txt")
var command: String =
xhmmExec + " --discover" +
" -p " + xhmmParams +
" -r " + input +
" -R " + origRD +
" -c " + xcnv +
" -a " + aux_xcnv +
" -s " + posteriorsBase +
" " + discoverCommandLineParams
def commandLine = command
override def description = "Discovers CNVs in normalized data: " + command
override def init() {
this.groups = this.originalFunction.asInstanceOf[SamplesScatterable].groups
}
abstract class BaseGenotypeCNVs(inputParam: File, xcnv: File, origRDParam: File) extends CommandLineFunction with LongRunTime {
@Input(doc = "")
val input = inputParam
@Input(doc = "")
val xhmmParams = xhmmParamsArg
@Input(doc = "")
val origRD = origRDParam
@Input(doc = "")
val inXcnv = xcnv
var command: String =
xhmmExec + " --genotype" +
" -p " + xhmmParams +
" -r " + input +
" -g " + inXcnv +
" -F " + referenceFile +
" -R " + origRD +
" " + genotypeCommandLineParams
override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) {
val scatterPart = IOUtils.absolute(cloneFunction.commandDirectory, "keepSampleIDs.txt")
cloneFunction.setFieldValue("keepSampleIDs", Some(scatterPart))
this.scatterSamples :+= scatterPart
}
class GenotypeCNVs(inputParam: File, xcnv: File, origRDParam: File) extends BaseGenotypeCNVs(inputParam, xcnv, origRDParam) {
@Output
val vcf: File = new File(outputBase.getPath + ".vcf")
override def run() {
if (groups.size != this.scatterSamples.size)
throw new Exception("Internal inconsistency error in scattering jobs")
command +=
" -v " + vcf
(groups, this.scatterSamples).zipped foreach {
(group, sampsFile) => {
val sampsWriter = new PrintWriter(new PrintStream(sampsFile))
def commandLine = command
override def description = "Genotypes discovered CNVs in all samples: " + command
}
class GenotypeCNVandSubsegments(inputParam: File, xcnv: File, origRDParam: File) extends BaseGenotypeCNVs(inputParam, xcnv, origRDParam) {
@Output
val vcf: File = new File(outputBase.getPath + ".subsegments.vcf")
command +=
" -v " + vcf +
" --subsegments" +
" --maxTargetsInSubsegment " + maxTargetsInSubsegment +
" --genotypeQualThresholdWhenNoExact " + subsegmentGenotypeThreshold
def commandLine = command
override def description = "Genotypes discovered CNVs (and their sub-segments, of up to " + maxTargetsInSubsegment + " targets) in all samples: " + command
for (samp <- group.samples) {
try {
sampsWriter.printf("%s%n", samp)
}
catch {
case e: Exception => throw e
}
}
sampsWriter.close
}
}
}
}
trait MergeVCFs extends CommandLineFunction {
var xhmmExec: File = _
@Input(doc = "")
var inputVCFs: List[File] = Nil
@Output
var mergedVCF: File = null
override def commandLine =
xhmmExec + " --mergeVCFs" +
inputVCFs.map(input => " --mergeVCF " + input).reduceLeft(_ + "" + _) +
" -v " + mergedVCF
override def description = "Combines VCF outputs for multiple samples (at same loci): " + commandLine
}
class MergeVCFsGatherFunction extends MergeVCFs with GatherFunction {
override def freezeFieldValues() {
super.freezeFieldValues()
this.xhmmExec = originalFunction.asInstanceOf[SamplesScatterable].xhmmExec
this.inputVCFs = this.gatherParts.toList
this.mergedVCF = this.originalOutput
}
}
class DummyGatherFunction extends InProcessFunction with GatherFunction {
override def run() {}
}

View File

@ -25,16 +25,19 @@
package org.broadinstitute.gatk.queue.extensions.gatk
import java.io.File
import org.broadinstitute.gatk.queue.util.VCF_BAM_utilities
import java.io.{PrintStream, PrintWriter, File}
import org.broadinstitute.gatk.queue.function.scattergather.ScatterGatherableFunction
import org.broadinstitute.gatk.engine.downsampling.DownsampleType
import org.broadinstitute.gatk.utils.commandline.{Input, Gather, Output}
import org.broadinstitute.gatk.queue.function.CommandLineFunction
import org.broadinstitute.gatk.queue.function.{InProcessFunction, CommandLineFunction}
import org.broadinstitute.gatk.tools.walkers.coverage.CoverageUtils
import scala.collection.JavaConversions._
import scala.Some
import org.broadinstitute.gatk.utils.text.XReadLines
import org.broadinstitute.gatk.queue.util.VCF_BAM_utilities
package object DoC {
class DoC(val bams: List[File], val DoC_output: File, val countType: CoverageUtils.CountPileupType, val MAX_DEPTH: Int, val minMappingQuality: Int, val minBaseQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int]) extends CommandLineGATK with ScatterGatherableFunction {
class DoC(val bams: List[File], val DoC_output: File, val countType: CoverageUtils.CountPileupType, val MAX_DEPTH: Int, val minMappingQuality: Int, val minBaseQuality: Int, val scatterCountInput: Int, val START_BIN: Int, val NUM_BINS: Int, val minCoverageCalcs: Seq[Int], val sampleRenameMappingFile: Option[File] = None) extends CommandLineGATK with ScatterGatherableFunction {
val DOC_OUTPUT_SUFFIX: String = ".sample_interval_summary"
// So that the output files of this DoC run get deleted once they're used further downstream:
@ -43,6 +46,8 @@ package object DoC {
this.analysis_type = "DepthOfCoverage"
this.input_file = bams
if (sampleRenameMappingFile.isDefined)
this.sample_rename_mapping_file = sampleRenameMappingFile.get
this.downsample_to_coverage = Some(MAX_DEPTH)
this.downsampling_type = DownsampleType.BY_SAMPLE
@ -69,7 +74,7 @@ package object DoC {
override def shortDescription = "DoC: " + DoC_output
}
class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality: Int, minBaseQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int]) extends DoC(bams, DoC_output, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs) {
class DoCwithDepthOutputAtEachBase(bams: List[File], DoC_output: File, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality: Int, minBaseQuality: Int, scatterCountInput: Int, START_BIN: Int, NUM_BINS: Int, minCoverageCalcs: Seq[Int], sampleRenameMappingFile: Option[File] = None) extends DoC(bams, DoC_output, countType: CoverageUtils.CountPileupType, MAX_DEPTH: Int, minMappingQuality, minBaseQuality, scatterCountInput, START_BIN, NUM_BINS, minCoverageCalcs, sampleRenameMappingFile) {
// HACK for DoC to work properly within Queue:
@Output
@Gather(classOf[org.broadinstitute.gatk.queue.function.scattergather.SimpleTextGatherFunction])
@ -109,7 +114,7 @@ package object DoC {
override def toString(): String = String.format("[Group %s [%s] with samples %s against bams %s]", name, DoC_output, samples, bams)
}
class MergeGATKdepths(DoCsToCombine: List[File], outFile: String, columnSuffix: String, xhmmExec: File, sampleIDsMap: String, sampleIDsMapFromColumn: Int, sampleIDsMapToColumn: Int, rdPrecisionArg: Option[Int], outputTargetsBySamples: Boolean) extends CommandLineFunction {
class MergeGATKdepths(DoCsToCombine: List[File], outFile: String, columnSuffix: String, xhmmExec: File, rdPrecisionArg: Option[Int], outputTargetsBySamples: Boolean, sampleIDsMap: String = "", sampleIDsMapFromColumn: Int = 1, sampleIDsMapToColumn: Int = 2) extends CommandLineFunction {
@Input(doc = "")
var inputDoCfiles: List[File] = DoCsToCombine
@ -153,4 +158,81 @@ package object DoC {
override def description = "Sort all target intervals, merge overlapping ones, and print the resulting interval list: " + command
}
class ParsedBamListWithOptionalSampleMappings(bamsFile: File) {
var bams = bamsFile
var allBams = List[File]()
var bamsWithoutSampleMapping = List[File]()
var userMappedSampleToBams = scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]]
var sampleToBams = scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]]
}
def parseBamListWithOptionalSampleMappings(bamsFile: File): ParsedBamListWithOptionalSampleMappings = {
val r = new ParsedBamListWithOptionalSampleMappings(bamsFile)
val rows = asScalaIterator(new XReadLines(r.bams))
while (rows.hasNext) {
val line = rows.next
val splitLine = line.split("\\t")
if (splitLine.length < 1 || splitLine.length > 2)
throw new Exception("Invalid row in " + bamsFile.getPath + " : " + line)
val bam = splitLine(0)
val bamFile = new File(bam)
r.allBams ::= bamFile
if (splitLine.length == 2) {
val sampleName = splitLine(1)
if (r.userMappedSampleToBams.contains(sampleName))
throw new Exception("Cannot map multiple BAM files to the same sample name: " + sampleName)
r.userMappedSampleToBams += sampleName -> (scala.collection.mutable.Set.empty[File] + bamFile)
}
else {
r.bamsWithoutSampleMapping ::= bamFile
}
}
val autoMappedSampleToBams = VCF_BAM_utilities.getMapOfBAMsForSample(r.bamsWithoutSampleMapping)
val overlappingSamples = autoMappedSampleToBams.keys.toList.intersect(r.userMappedSampleToBams.keys.toList)
if (overlappingSamples.nonEmpty)
throw new Exception("Cannot have the same sample mapped to different BAMs: " + overlappingSamples.toString)
r.sampleToBams = autoMappedSampleToBams
r.userMappedSampleToBams.foreach{ keyVal => {r.sampleToBams += keyVal._1 -> keyVal._2} }
return r
}
class ProcessBamListWithOptionalSampleMappings(parsedBamList: ParsedBamListWithOptionalSampleMappings, outputBase: String) extends InProcessFunction {
@Input(doc="")
var bams: File = parsedBamList.bams
@Output(doc="")
var bamsList: File = new File(outputBase + ".bam.list")
@Output(doc="")
var bamSampleMap: File = new File(outputBase + ".bam_sample.txt")
def run = {
val bamsListWriter = new PrintWriter(new PrintStream(bamsList))
for (bam <- parsedBamList.allBams) {
bamsListWriter.printf("%s%n", bam)
}
bamsListWriter.close
val bamSampleMapWriter = new PrintWriter(new PrintStream(bamSampleMap))
for ((sampleName, sampleNameBams) <- parsedBamList.userMappedSampleToBams) {
sampleNameBams.foreach { bam => bamSampleMapWriter.printf("%s\t%s%n", bam, sampleName) }
}
bamSampleMapWriter.close
}
}
}