From 6c6e52def9998299edcbb237a81d35ea214416b8 Mon Sep 17 00:00:00 2001 From: kshakir Date: Thu, 19 May 2011 22:47:02 +0000 Subject: [PATCH] Renamed FCP to HybridSelectionPipeline. Reviewed pipelines with dev team. HSP updates: - Calling SNPs and Indels at the same time then using SelectVariants to separate them for filtering - Moved logs next to the files like in WGP - Flattened outputs into one directory - The file names for the final outputs are now .vcf and .eval - Updated test to pass the chr20 intervals instead of a boolean - Removed MultiFCP WGP updates: - Only cleaning and calling chromosomes 1-22, X, Y, MT - Splitting SNPs from indels, filtering indels, then merging the selected SNPs and selected Indels back together to make sure there are no collisions in CombineVariants - Still running VQSR on the recombined SNPs plus hard filtered indels - Using hard indel filters from delangel - Reduced number of tranches with rpoplin - Changed prior for dbsnp from 10 to 8 with rpoplin - Assuming identical samples on both CombineVariants - Explicitly using variant merge option UNION even though it's the default - Not setting the default genotype merge option PRIORITIZE - Generating a vcf and eval for each tranche git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5825 348d0f76-0448-11de-a6fe-93d51630548a --- .../qscript/playground/FullCallingPipeline.q | 191 ------------------ .../playground/HybridSelectionPipeline.scala | 176 ++++++++++++++++ .../playground/MultiFullCallingPipeline.scala | 99 --------- .../playground/WholeGenomePipeline.scala | 171 ++++++++-------- .../sting/queue/pipeline/PipelineTest.scala | 8 +- ...cala => HybridSelectionPipelineTest.scala} | 18 +- .../MultiFullCallingPipelineTest.scala | 108 ---------- 7 files changed, 281 insertions(+), 490 deletions(-) delete mode 100755 scala/qscript/playground/FullCallingPipeline.q create mode 100755 scala/qscript/playground/HybridSelectionPipeline.scala delete mode 100644 scala/qscript/playground/MultiFullCallingPipeline.scala rename scala/test/org/broadinstitute/sting/queue/pipeline/playground/{FullCallingPipelineTest.scala => HybridSelectionPipelineTest.scala} (86%) delete mode 100644 scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala diff --git a/scala/qscript/playground/FullCallingPipeline.q b/scala/qscript/playground/FullCallingPipeline.q deleted file mode 100755 index b6a14ce6f..000000000 --- a/scala/qscript/playground/FullCallingPipeline.q +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -import org.broadinstitute.sting.datasources.pipeline.Pipeline -import org.broadinstitute.sting.queue.extensions.gatk._ -import org.broadinstitute.sting.queue.function.ListWriterFunction -import org.broadinstitute.sting.queue.library.ipf.intervals.ExpandIntervals -import org.broadinstitute.sting.queue.QScript -import collection.JavaConversions._ -import org.broadinstitute.sting.utils.broad.PicardPipeline - -class FullCallingPipeline extends QScript { - qscript => - - @Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y") - var yamlFile: File = _ - - @Input(doc="level of parallelism for UnifiedGenotyper (both for SNPs and indels). By default is set to 20.", shortName="varScatter", required=false) - var variantCallerScatterCount = 20 - - @Argument(doc="memory limit for UnifiedGenotyper (both for SNPs and indels). By default is set to 4g.", shortName="varMemory", required=false) - var variantCallerMemory = 4 - - @Argument(doc="expand each target in input intervals by the specified number of bases (50 bases by default)", shortName="expand", required=false) - var expandIntervals = 50 - - private var pipeline: Pipeline = _ - - trait CommandLineGATKArgs extends CommandLineGATK { - this.reference_sequence = qscript.pipeline.getProject.getReferenceFile - this.intervals = List(qscript.pipeline.getProject.getIntervalList) - this.memoryLimit = 4 - } - - def script() { - pipeline = PicardPipeline.parse(qscript.yamlFile) - - val projectBase: String = qscript.pipeline.getProject.getName - val base = projectBase + ".cleaned" - val bamType = "cleaned" - - // Make the bam list - val writeBamList = new ListWriterFunction - writeBamList.analysisName = base + "_BamList" - writeBamList.jobOutputFile = ".queue/logs/Overall/WriteBamList.out" - writeBamList.inputFiles = qscript.pipeline.getSamples.filter(_.getBamFiles.contains(bamType)).map(_.getBamFiles.get(bamType)).toList - writeBamList.listFile = "Resources/" + base +".bamfiles.list" - add(writeBamList) - - val ei = new ExpandIntervals( - qscript.pipeline.getProject.getIntervalList, - 1, - qscript.expandIntervals, - "Resources/" + base + ".flanks.interval_list", - qscript.pipeline.getProject.getReferenceFile, - "INTERVALS", - "INTERVALS") - ei.jobOutputFile = ".queue/logs/Overall/ExpandIntervals.out" - - if (qscript.expandIntervals > 0) { - add(ei) - } - - trait ExpandedIntervals extends CommandLineGATK { - if (qscript.expandIntervals > 0) { - this.intervals :+= ei.outList - } - } - - // Call indels - val indels = new UnifiedGenotyper with CommandLineGATKArgs with ExpandedIntervals - indels.analysisName = base + "_indels" - indels.jobOutputFile = ".queue/logs/IndelCalling/UnifiedGenotyper.indels.out" - indels.downsample_to_coverage = 600 - indels.genotype_likelihoods_model = org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeLikelihoodsCalculationModel.Model.INDEL - indels.GSA_PRODUCTION_ONLY = true /* use old indel genotyper */ - indels.input_file = List(writeBamList.listFile) - indels.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp) - indels.out = "IndelCalls/" + base+".indels.vcf" - indels.scatterCount = qscript.variantCallerScatterCount - indels.memoryLimit = qscript.variantCallerMemory - add(indels) - - // Filter indels - val filteredIndels = new VariantFiltration with CommandLineGATKArgs with ExpandedIntervals - filteredIndels.analysisName = base + "_filteredIndels" - filteredIndels.jobOutputFile = ".queue/logs/IndelCalling/VariantFiltration.indels.out" - filteredIndels.filterName ++= List("IndelQUALFilter","IndelSBFilter","IndelQDFilter") - filteredIndels.filterExpression ++= List("\"QUAL<30.0\"","\"SB>-1.0\"","\"QD<2.0\"") - filteredIndels.variantVCF = indels.out - filteredIndels.out = swapExt("IndelCalls", indels.out, ".vcf",".filtered.vcf") - add(filteredIndels) - - // Call snps - val snps = new UnifiedGenotyper with CommandLineGATKArgs with ExpandedIntervals - snps.analysisName = base+"_snps" - snps.jobOutputFile = ".queue/logs/SNPCalling/UnifiedGenotyper.snps.out" - snps.downsample_to_coverage = 600 - snps.genotype_likelihoods_model = org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeLikelihoodsCalculationModel.Model.SNP - snps.input_file = List(writeBamList.listFile) - snps.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp) - snps.out = "SnpCalls/" + base+".snps.vcf" - snps.scatterCount = qscript.variantCallerScatterCount - snps.memoryLimit = qscript.variantCallerMemory - add(snps) - - // Filter snps - val filteredSNPs = new VariantFiltration with CommandLineGATKArgs with ExpandedIntervals - filteredSNPs.analysisName = base+"_filteredSNPs" - filteredSNPs.jobOutputFile = ".queue/logs/SNPCalling/VariantFiltration.snps.out" - filteredSNPs.filterName ++= List("SNPSBFilter","SNPQDFilter","SNPHRunFilter") - filteredSNPs.filterExpression ++= List("\"SB>=0.10\"","\"QD<5.0\"","\"HRun>=4\"") - filteredSNPs.clusterWindowSize = 10 - filteredSNPs.clusterSize = 3 - filteredSNPs.rodBind :+= RodBind("mask", "VCF", filteredIndels.out) - filteredSNPs.variantVCF = snps.out - filteredSNPs.out = swapExt("SnpCalls",snps.out,".vcf",".filtered.vcf") - add(filteredSNPs) - - // Combine indels and snps into one VCF - val combineAll = new CombineVariants with CommandLineGATKArgs with ExpandedIntervals - combineAll.analysisName = base + "_combineAll" - combineAll.jobOutputFile = ".queue/logs/Combined/CombineVariants.out" - combineAll.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION - combineAll.rod_priority_list = "Indels,SNPs" - combineAll.rodBind :+= RodBind("Indels", "VCF", filteredIndels.out) - combineAll.rodBind :+= RodBind("SNPs", "VCF", filteredSNPs.out) - combineAll.out = "CombinedCalls/" + base + ".snps_and_indels.filtered.vcf" - add(combineAll) - - // Annotate variants - val annotated = new GenomicAnnotator with CommandLineGATKArgs with ExpandedIntervals - annotated.analysisName = base+"_annotated" - annotated.jobOutputFile = ".queue/logs/Combined/GenomicAnnotator.out" - annotated.rodToIntervalTrackName = "variant" - annotated.rodBind :+= RodBind("variant", "VCF", combineAll.out) - annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable) - annotated.out = base + ".snps_and_indels.filtered.annotated.vcf" - add(annotated) - - // Variant eval the standard region - val stdEval = new VariantEval with CommandLineGATKArgs - stdEval.analysisName = base+"_StandardVariantEval" - stdEval.jobOutputFile = ".queue/logs/Overall/VariantEval.std.out" - stdEval.doNotUseAllStandardStratifications = true - stdEval.doNotUseAllStandardModules = true - stdEval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") - stdEval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") - stdEval.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getEvalDbsnpType, qscript.pipeline.getProject.getEvalDbsnp) - stdEval.rodBind :+= RodBind("eval", "VCF", annotated.out) - stdEval.out = swapExt(annotated.out, ".vcf", ".eval") - add(stdEval) - - if (qscript.expandIntervals > 0) { - // Variant eval the flanking region - val flanksEval = new VariantEval with CommandLineGATKArgs - flanksEval.analysisName = base+"_FlanksVariantEval" - flanksEval.jobOutputFile = ".queue/logs/Overall/VariantEval.flanks.out" - flanksEval.intervals = List(ei.outList) - flanksEval.doNotUseAllStandardStratifications = true - flanksEval.doNotUseAllStandardModules = true - flanksEval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") - flanksEval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") - flanksEval.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getEvalDbsnpType, qscript.pipeline.getProject.getEvalDbsnp) - flanksEval.rodBind :+= RodBind("eval", "VCF", annotated.out) - flanksEval.out = swapExt(annotated.out, ".vcf", ".flanks.eval") - add(flanksEval) - } - } -} diff --git a/scala/qscript/playground/HybridSelectionPipeline.scala b/scala/qscript/playground/HybridSelectionPipeline.scala new file mode 100755 index 000000000..9fce689e5 --- /dev/null +++ b/scala/qscript/playground/HybridSelectionPipeline.scala @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +import org.broadinstitute.sting.datasources.pipeline.Pipeline +import org.broadinstitute.sting.queue.extensions.gatk._ +import org.broadinstitute.sting.queue.function.ListWriterFunction +import org.broadinstitute.sting.queue.library.ipf.intervals.ExpandIntervals +import org.broadinstitute.sting.queue.QScript +import collection.JavaConversions._ +import org.broadinstitute.sting.utils.broad.PicardPipeline + +class HybridSelectionPipeline extends QScript { + qscript => + + @Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y") + var yamlFile: File = _ + + @Input(doc="level of parallelism for UnifiedGenotyper. By default is set to 20.", shortName="varScatter", required=false) + var variantCallerScatterCount = 20 + + @Argument(doc="memory limit for UnifiedGenotyper. By default is set to 4g.", shortName="varMemory", required=false) + var variantCallerMemory = 4 + + @Argument(doc="expand each target in input intervals by the specified number of bases (50 bases by default)", shortName="expand", required=false) + var expandIntervals = 50 + + private var pipeline: Pipeline = _ + + trait CommandLineGATKArgs extends CommandLineGATK { + this.reference_sequence = qscript.pipeline.getProject.getReferenceFile + this.intervals = List(qscript.pipeline.getProject.getIntervalList) + this.memoryLimit = 4 + } + + def script() { + pipeline = PicardPipeline.parse(qscript.yamlFile) + + val projectBase = qscript.pipeline.getProject.getName + val bamType = "cleaned" + + val writeBamList = new ListWriterFunction + writeBamList.inputFiles = qscript.pipeline.getSamples.filter(_.getBamFiles.contains(bamType)).map(_.getBamFiles.get(bamType)).toList + writeBamList.listFile = projectBase +".bam.list" + writeBamList.jobOutputFile = writeBamList.listFile + ".out" + add(writeBamList) + + val flankIntervals = projectBase + ".flanks.intervals" + + if (qscript.expandIntervals > 0) { + val ei = new ExpandIntervals( + qscript.pipeline.getProject.getIntervalList, + 1, + qscript.expandIntervals, + flankIntervals, + qscript.pipeline.getProject.getReferenceFile, + "INTERVALS", + "INTERVALS") + ei.jobOutputFile = ei.outList + ".out" + + add(ei) + } + + trait ExpandedIntervals extends CommandLineGATK { + if (qscript.expandIntervals > 0) + this.intervals :+= flankIntervals + } + + val call = new UnifiedGenotyper with CommandLineGATKArgs with ExpandedIntervals + call.input_file = List(writeBamList.listFile) + call.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp) + call.downsample_to_coverage = 600 + call.genotype_likelihoods_model = org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeLikelihoodsCalculationModel.Model.BOTH + call.GSA_PRODUCTION_ONLY = true + call.out = projectBase + ".unfiltered.vcf" + call.jobOutputFile = call.out + ".out" + call.scatterCount = qscript.variantCallerScatterCount + call.memoryLimit = qscript.variantCallerMemory + add(call) + + val selectSNPs = new SelectVariants with CommandLineGATKArgs with ExpandedIntervals + selectSNPs.selectSNPs = true + selectSNPs.rodBind :+= RodBind("variant", "VCF", call.out) + selectSNPs.out = projectBase + ".snps.unfiltered.vcf" + selectSNPs.jobOutputFile = selectSNPs.out + ".out" + add(selectSNPs) + + val selectIndels = new SelectVariants with CommandLineGATKArgs with ExpandedIntervals + selectIndels.selectIndels = true + selectIndels.rodBind :+= RodBind("variant", "VCF", call.out) + selectIndels.out = projectBase + ".indels.unfiltered.vcf" + selectIndels.jobOutputFile = selectIndels.out + ".out" + add(selectIndels) + + val filterSNPs = new VariantFiltration with CommandLineGATKArgs with ExpandedIntervals + filterSNPs.variantVCF = selectSNPs.out + filterSNPs.filterName = List("SNP_SB", "SNP_QD", "SNP_HRun") + filterSNPs.filterExpression = List("\"SB>=0.10\"", "\"QD<5.0\"", "\"HRun>=4\"") + filterSNPs.clusterWindowSize = 10 + filterSNPs.clusterSize = 3 + filterSNPs.out = projectBase + ".snps.filtered.vcf" + filterSNPs.jobOutputFile = filterSNPs.out + ".out" + add(filterSNPs) + + val filterIndels = new VariantFiltration with CommandLineGATKArgs with ExpandedIntervals + filterIndels.variantVCF = selectIndels.out + filterIndels.filterName = List("Indel_QUAL", "Indel_SB", "Indel_QD") + filterIndels.filterExpression = List("\"QUAL<30.0\"", "\"SB>-1.0\"", "\"QD<2.0\"") + filterIndels.out = projectBase + ".indels.filtered.vcf" + filterIndels.jobOutputFile = filterIndels.out + ".out" + add(filterIndels) + + val combineSNPsIndels = new CombineVariants with CommandLineGATKArgs with ExpandedIntervals + combineSNPsIndels.rodBind :+= RodBind("indels", "VCF", filterIndels.out) + combineSNPsIndels.rodBind :+= RodBind("snps", "VCF", filterSNPs.out) + combineSNPsIndels.rod_priority_list = "indels,snps" + combineSNPsIndels.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION + combineSNPsIndels.assumeIdenticalSamples = true + combineSNPsIndels.out = projectBase + ".unannotated.vcf" + combineSNPsIndels.jobOutputFile = combineSNPsIndels.out + ".out" + add(combineSNPsIndels) + + val annotate = new GenomicAnnotator with CommandLineGATKArgs with ExpandedIntervals + annotate.rodBind :+= RodBind("variant", "VCF", combineSNPsIndels.out) + annotate.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable) + annotate.rodToIntervalTrackName = "variant" + annotate.out = projectBase + ".vcf" + annotate.jobOutputFile = annotate.out + ".out" + add(annotate) + + val targetEval = new VariantEval with CommandLineGATKArgs + targetEval.rodBind :+= RodBind("eval", "VCF", annotate.out) + targetEval.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getEvalDbsnpType, qscript.pipeline.getProject.getEvalDbsnp) + targetEval.doNotUseAllStandardStratifications = true + targetEval.doNotUseAllStandardModules = true + targetEval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") + targetEval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") + targetEval.out = projectBase + ".eval" + targetEval.jobOutputFile = targetEval.out + ".out" + add(targetEval) + + if (qscript.expandIntervals > 0) { + val flanksEval = new VariantEval with CommandLineGATKArgs + flanksEval.rodBind :+= RodBind("eval", "VCF", annotate.out) + flanksEval.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getEvalDbsnpType, qscript.pipeline.getProject.getEvalDbsnp) + flanksEval.intervals = List(flankIntervals) + flanksEval.doNotUseAllStandardStratifications = true + flanksEval.doNotUseAllStandardModules = true + flanksEval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") + flanksEval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") + flanksEval.out = projectBase + ".flanks.eval" + flanksEval.jobOutputFile = flanksEval.out + ".out" + add(flanksEval) + } + } +} diff --git a/scala/qscript/playground/MultiFullCallingPipeline.scala b/scala/qscript/playground/MultiFullCallingPipeline.scala deleted file mode 100644 index bbcb8eea7..000000000 --- a/scala/qscript/playground/MultiFullCallingPipeline.scala +++ /dev/null @@ -1,99 +0,0 @@ -import collection.JavaConversions -import org.broadinstitute.sting.queue.function.JavaCommandLineFunction -import org.broadinstitute.sting.queue.QScript -import org.broadinstitute.sting.queue.util.IOUtils -import org.broadinstitute.sting.utils.text.XReadLines - -class MultiFullCallingPipeline extends QScript { - qscript => - - @Input(doc="Sting home", shortName="stingHome") - var stingHome: File = _ - - @Input(doc="yaml lists to run", shortName="YL") - var yamlList: File = _ - - @Argument(doc="number of jobs per batch", shortName="BS") - var batchSize: Int = _ - - @Argument(doc="pipeline status to", shortName="PS", required = false) - var pipelineStatusTo: String = _ - - @Argument(doc="pipeline job queue", shortName="PJQ", required = false) - var pipelineJobQueue: String = _ - - @Argument(doc="pipeline short queue", shortName="PSQ", required = false) - var pipelineShortQueue: String = _ - - @Argument(doc="pipeline priority", shortName="PP", required = false) - var pipelinePriority: Option[Int] = None - - @Argument(doc="pipeline retry", shortName="PR", required = false) - var pipelineRetry: Option[Int] = None - - @Argument(doc="run with -tearScript", shortName="TS") - var runWithTearScript = false - - def script { - // Global arguments for all pipeline runs - stingHome = IOUtils.absolute(stingHome) - val queueJar = new File(stingHome, "dist/Queue.jar") - val pipelineScript = new File(stingHome, "scala/qscript/playground/FullCallingPipeline.q") - val gatkJar = new File(stingHome, "dist/GenomeAnalysisTK.jar") - val tearScript = if (runWithTearScript) new File(stingHome, "R/DataProcessingReport/GetTearsheetStats.R") else null - - // Parse the yaml list - var yamls = List.empty[File] - for (yaml <- JavaConversions.asScalaIterator(new XReadLines(yamlList))) - yamls :+= new File(yaml) - - // The list of previous outputs - val lastOuts = new Array[File](batchSize) - for (yamlGroup <- yamls.grouped(batchSize)) { - for ((yaml, i) <- yamlGroup.zipWithIndex) { - // Get the last output for index(i), which is null for the first job. - val lastOut = lastOuts(i) - - // Run the pipeline on the yaml waiting for the last output. - val runPipeline = new RunPipeline(yaml, lastOut) - - // Add this run to the graph. - add(runPipeline) - - // Have the next job at index(i) wait for this output file. - lastOuts(i) = runPipeline.jobOutputFile - } - } - - /** - * Runs a yaml in a pipeline only after a previous pipeline - * run has produced the passed in output file. - */ - class RunPipeline(yamlFile: File, lastOutput: File) extends JavaCommandLineFunction { - private var yamlName = yamlFile.getName.stripSuffix(".yaml") - - @Input(doc="output file to wait for", required=false) - var waitJobOutputFile = lastOutput - - @Output(doc="virtual output file tagging this pipeline as complete") - var pipelineComplete = new File(yamlFile.getParentFile, yamlName + ".mfcp") - - commandDirectory = yamlFile.getParentFile - jobOutputFile = IOUtils.absolute(commandDirectory, yamlName + ".queue.txt") - jarFile = queueJar - memoryLimit = 1 - - override def commandLine = super.commandLine + - optional(" -statusTo ", qscript.pipelineStatusTo) + - optional(" -jobQueue ", qscript.pipelineJobQueue) + - optional(" -shortJobQueue ", qscript.pipelineShortQueue) + - optional(" -jobPriority ", qscript.pipelinePriority) + - optional(" -retry ", qscript.pipelineRetry) + - optional(" -tearScript ", tearScript) + - " -S %s --gatkjar %s -jobProject %s -jobPrefix %s -Y %s -bsub -run" - .format(pipelineScript, gatkJar, yamlName, yamlName, yamlFile) - - override def dotString = "Queue: " + yamlName - } - } -} diff --git a/scala/qscript/playground/WholeGenomePipeline.scala b/scala/qscript/playground/WholeGenomePipeline.scala index 1c467ca90..8227150df 100644 --- a/scala/qscript/playground/WholeGenomePipeline.scala +++ b/scala/qscript/playground/WholeGenomePipeline.scala @@ -24,7 +24,6 @@ import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.extensions.gatk._ -import collection.JavaConversions._ import org.broadinstitute.sting.utils.interval.IntervalUtils class WholeGenomePipeline extends QScript { @@ -56,36 +55,35 @@ class WholeGenomePipeline extends QScript { this.memoryLimit = 2 } - case class ChrRange(chr: String, start: Long, stop: Long) { - def toInterval = "%s:%d-%d".format(chr, start, stop) - override def toString = toInterval + case class Interval(chr: String, start: Long, stop: Long) { + override def toString = "%s:%d-%d".format(chr, start, stop) } var runIntervals = List.empty[String] def script() { - val rangeMap = Map( - "cent1" -> new ChrRange("1", 121429168, 121529168), - "cent16" -> new ChrRange("16", 40844464, 40944464), - "chr20" -> new ChrRange("20", 1, 63025520), - "chr20_100k" -> new ChrRange("20", 100001, 200000)) - - var ranges = Traversable.empty[ChrRange] - val chrs = IntervalUtils.getContigSizes(reference) + var intervals = Traversable.empty[Interval] runType = runType.toLowerCase if (runType == "wg") { - // use all chromosomes from 1 to their length - ranges = chrs.map { case (chr, len) => new ChrRange(chr, 1, len.longValue) } - runIntervals = Nil + val contigs = (1 to 22).map(_.toString) ++ List("X", "Y", "MT") + val sizes = IntervalUtils.getContigSizes(reference) + intervals = contigs.map(chr => new Interval(chr, 1, sizes.get(chr).longValue)) + runIntervals = Nil } else { - rangeMap.get(runType) match { + val locs = Map( + "cent1" -> new Interval("1", 121429168, 121529168), + "cent16" -> new Interval("16", 40844464, 40944464), + "chr20" -> new Interval("20", 1, 63025520), + "chr20_100k" -> new Interval("20", 100001, 200000)) + + locs.get(runType) match { case Some(range) => - ranges = List(range) - runIntervals = List(range.toInterval) + intervals = List(range) + runIntervals = List(range.toString) case None => - throw new RuntimeException("Inalid runType: " + runType + ". Must be one of: " + rangeMap.keys.mkString(", ") + ", or wg") + throw new RuntimeException("Invalid runType: " + runType + ". Must be one of: " + locs.keys.mkString(", ") + ", or wg") } } @@ -93,10 +91,10 @@ class WholeGenomePipeline extends QScript { val projectBase = project + "." + runType var chunkVcfs = List.empty[File] - for (range <- ranges) { - val chr = range.chr - val chrStart = range.start - val chrStop = range.stop + for (interval <- intervals) { + val chr = interval.chr + val chrStart = interval.start + val chrStop = interval.stop var start = chrStart var chunkNumber = 1 @@ -157,73 +155,88 @@ class WholeGenomePipeline extends QScript { } } - val combineVCFs = new CombineVariants with CommandLineGATKArgs - combineVCFs.rodBind = chunkVcfs.zipWithIndex.map { case (vcf, index) => RodBind("input"+index, "VCF", vcf) } - combineVCFs.rod_priority_list = chunkVcfs.zipWithIndex.map { case (vcf, index) => "input"+index }.mkString(",") - combineVCFs.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION - combineVCFs.genotypemergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.GenotypeMergeType.PRIORITIZE - combineVCFs.out = projectBase + ".merged.vcf" - combineVCFs.jobOutputFile = combineVCFs.out + ".out" - combineVCFs.assumeIdenticalSamples = true - add(combineVCFs) + val combineChunks = new CombineVariants with CommandLineGATKArgs + combineChunks.rodBind = chunkVcfs.zipWithIndex.map { case (vcf, index) => RodBind("input"+index, "VCF", vcf) } + combineChunks.rod_priority_list = chunkVcfs.zipWithIndex.map { case (vcf, index) => "input"+index }.mkString(",") + combineChunks.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION + combineChunks.assumeIdenticalSamples = true + combineChunks.out = projectBase + ".unfiltered.vcf" + combineChunks.jobOutputFile = combineChunks.out + ".out" + add(combineChunks) - val sv = new SelectVariants with CommandLineGATKArgs - sv.selectIndels = true - sv.rodBind :+= RodBind("variant", "VCF", combineVCFs.out) - sv.out = projectBase + ".indels.vcf" - sv.jobOutputFile = sv.out + ".out" - add(sv) + val selectSNPs = new SelectVariants with CommandLineGATKArgs + selectSNPs.selectSNPs = true + selectSNPs.rodBind :+= RodBind("variant", "VCF", combineChunks.out) + selectSNPs.out = projectBase + ".snps.unrecalibrated.vcf" + selectSNPs.jobOutputFile = selectSNPs.out + ".out" + add(selectSNPs) - val filter = new VariantFiltration with CommandLineGATKArgs - filter.variantVCF = sv.out - filter.filterName :+= "HARD_TO_VALIDATE" - filter.filterExpression :+= "\"MQ0 >= 4 && (MQ0 / (1.0 * DP)) > 0.1\"" - filter.out = swapExt(sv.out, ".vcf", ".filtered.vcf") - filter.jobOutputFile = filter.out + ".out" - add(filter) + val selectIndels = new SelectVariants with CommandLineGATKArgs + selectIndels.selectIndels = true + selectIndels.rodBind :+= RodBind("variant", "VCF", combineChunks.out) + selectIndels.out = projectBase + ".indels.unfiltered.vcf" + selectIndels.jobOutputFile = selectIndels.out + ".out" + add(selectIndels) - val recombine = new CombineVariants with CommandLineGATKArgs - recombine.rodBind :+= RodBind("indels", "VCF", sv.out) - recombine.rodBind :+= RodBind("all", "VCF", combineVCFs.out) - recombine.rod_priority_list = "indels,all" - recombine.genotypemergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.GenotypeMergeType.PRIORITIZE - recombine.out = swapExt(combineVCFs.out, ".vcf", ".filtered.vcf") - recombine.jobOutputFile = recombine.out + ".out" - add(recombine) + val filterIndels = new VariantFiltration with CommandLineGATKArgs + filterIndels.variantVCF = selectIndels.out + filterIndels.filterName = List("Indel_QUAL", "Indel_SB", "Indel_QD", "Indel_HRun", "Indel_HaplotypeScore") + filterIndels.filterExpression = List("\"QUAL<30.0\"", "\"SB>-1.0\"", "\"QD<2.0\"", "\"HRun>15\"", "\"HaplotypeScore>20.0\"") + filterIndels.out = projectBase + ".indels.filtered.vcf" + filterIndels.jobOutputFile = filterIndels.out + ".out" + add(filterIndels) + val combineSNPsIndels = new CombineVariants with CommandLineGATKArgs + combineSNPsIndels.rodBind :+= RodBind("indels", "VCF", selectIndels.out) + combineSNPsIndels.rodBind :+= RodBind("all", "VCF", combineChunks.out) + combineSNPsIndels.rod_priority_list = "indels,all" + combineSNPsIndels.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION + combineSNPsIndels.assumeIdenticalSamples = true + combineSNPsIndels.out = projectBase + ".unrecalibrated.vcf" + combineSNPsIndels.jobOutputFile = combineSNPsIndels.out + ".out" + add(combineSNPsIndels) + val vr = new VariantRecalibrator with CommandLineGATKArgs - vr.rodBind :+= RodBind("input", "VCF", recombine.out) + vr.rodBind :+= RodBind("input", "VCF", combineSNPsIndels.out) vr.rodBind :+= RodBind("hapmap", "VCF", hapmap, "known=false,training=true,truth=true,prior=15.0") vr.rodBind :+= RodBind("omni", "VCF", omni, "known=false,training=true,truth=false,prior=12.0") - vr.rodBind :+= RodBind("dbsnp", "VCF", dbsnp, "known=true,training=false,truth=false,prior=10.0") - vr.use_annotation = List("QD", "SB", "HaplotypeScore", "HRun") + vr.rodBind :+= RodBind("dbsnp", "VCF", dbsnp, "known=true,training=false,truth=false,prior=8.0") vr.trustAllPolymorphic = true - vr.TStranche = List("100.0", "99.9", "99.5", "99.3", "99.0", "98.9", "98.8", "98.5", "98.4", "98.3", "98.2", "98.1", "98.0", "97.9", "97.8", "97.5", "97.0", "95.0", "90.0") - vr.tranches_file = swapExt(filter.out, ".vcf", ".tranches") - vr.recal_file = swapExt(filter.out, ".vcf", ".recal") + vr.use_annotation = List("QD", "HaplotypeScore", "HRun", "MQRankSum", "ReadPosRankSum") + vr.TStranche = List( + "100.0", "99.9", "99.5", "99.3", + "99.0", "98.9", "98.8", + "98.5", "98.4", "98.3", "98.2", "98.1", + "98.0", + "97.0", + "95.0", + "90.0") + vr.tranches_file = projectBase + ".tranches" + vr.recal_file = projectBase + ".recal" vr.jobOutputFile = vr.recal_file + ".out" add(vr) - val ar = new ApplyRecalibration with CommandLineGATKArgs - ar.rodBind :+= RodBind("input", "VCF", recombine.out) - ar.tranches_file = vr.tranches_file - ar.recal_file = vr.recal_file - ar.ts_filter_level = 99.0 - ar.ignore_filter :+= "HARD_TO_VALIDATE" - ar.out = swapExt(recombine.out, ".vcf", ".recalibrated.vcf") - ar.jobOutputFile = ar.out + ".out" - add(ar) + for (tranche <- vr.TStranche) { + val ar = new ApplyRecalibration with CommandLineGATKArgs + ar.rodBind :+= RodBind("input", "VCF", combineSNPsIndels.out) + ar.tranches_file = vr.tranches_file + ar.recal_file = vr.recal_file + ar.ts_filter_level = tranche.toDouble + ar.out = projectBase + ".recalibrated." + tranche + ".vcf" + ar.jobOutputFile = ar.out + ".out" + add(ar) - val stdEval = new VariantEval with CommandLineGATKArgs - stdEval.tranchesFile = vr.tranches_file - stdEval.rodBind :+= RodBind("dbsnp", "VCF", dbsnp) - stdEval.rodBind :+= RodBind("eval", "VCF", ar.out) - stdEval.doNotUseAllStandardStratifications = true - stdEval.doNotUseAllStandardModules = true - stdEval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") - stdEval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") - stdEval.out = swapExt(ar.out, ".vcf", ".eval") - stdEval.jobOutputFile = stdEval.out + ".out" - add(stdEval) + val eval = new VariantEval with CommandLineGATKArgs + eval.tranchesFile = vr.tranches_file + eval.rodBind :+= RodBind("eval", "VCF", ar.out) + eval.rodBind :+= RodBind("dbsnp", "VCF", dbsnp) + eval.doNotUseAllStandardStratifications = true + eval.doNotUseAllStandardModules = true + eval.evalModule = List("SimpleMetricsByAC", "TiTvVariantEvaluator", "CountVariants") + eval.stratificationModule = List("EvalRod", "CompRod", "Novelty", "Filter", "FunctionalClass", "Sample") + eval.out = swapExt(ar.out, ".vcf", ".eval") + eval.jobOutputFile = eval.out + ".out" + add(eval) + } } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index 96f6f3482..ca7581bed 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -31,7 +31,7 @@ import org.broadinstitute.sting.commandline.CommandLineProgram import java.util.Date import java.text.SimpleDateFormat import org.broadinstitute.sting.BaseTest -import org.broadinstitute.sting.queue.{QException, QCommandLine} +import org.broadinstitute.sting.queue.QCommandLine import org.broadinstitute.sting.datasources.pipeline.{Pipeline, PipelineProject, PipelineSample} import org.broadinstitute.sting.utils.broad.PicardAggregationUtils import org.broadinstitute.sting.queue.util.{Logging, ProcessController} @@ -114,17 +114,17 @@ object PipelineTest extends BaseTest with Logging { /** * Creates a new pipeline project for hg19 with b37 132 dbsnp for genotyping, and b37 129 dbsnp for eval. * @param projectName Name of the project. - * @param chr20 True if only chr20 should be evaluated or the whole exome. + * @param intervals The intervals file to use. * @return a new pipeline project. */ - def createHg19Project(projectName: String, chr20: Boolean) = { + def createHg19Project(projectName: String, intervals: String) = { val project = new PipelineProject project.setName(projectName) project.setReferenceFile(new File(BaseTest.hg19Reference)) project.setGenotypeDbsnp(new File(BaseTest.b37dbSNP132)) project.setEvalDbsnp(new File(BaseTest.b37dbSNP129)) project.setRefseqTable(new File(BaseTest.hg19Refseq)) - project.setIntervalList(new File(if (chr20) BaseTest.hg19Chr20Intervals else BaseTest.hg19Intervals)) + project.setIntervalList(new File(intervals)) project } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala similarity index 86% rename from scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala rename to scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala index 00c1e6c18..08c6227a9 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala @@ -25,17 +25,17 @@ package org.broadinstitute.sting.queue.pipeline.playground import org.testng.annotations.{DataProvider, Test} -import collection.JavaConversions._ import java.io.File import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, Pipeline} import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.queue.pipeline._ +import org.broadinstitute.sting.BaseTest -class FullCallingPipelineTest { +class HybridSelectionPipelineTest { def datasets = List(k1gChr20Dataset) val k1gChr20Dataset = { - val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true) + val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", BaseTest.hg19Chr20Intervals) dataset.validations :+= new IntegerValidation("CountVariants", "dbsnp.eval.called.all.all.all", "nCalledLoci", 1392) dataset.validations :+= new IntegerValidation("CountVariants", "dbsnp.eval.called.all.known.all", "nCalledLoci", 1143) @@ -47,8 +47,8 @@ class FullCallingPipelineTest { dataset } - def newK1gDataset(projectName: String, chr20: Boolean) = { - val project = PipelineTest.createHg19Project(projectName, chr20) + def newK1gDataset(projectName: String, intervals: String) = { + val project = PipelineTest.createHg19Project(projectName, intervals) var samples = List.empty[PipelineSample] for (k1gBam <- PipelineTest.k1gBams) samples :+= PipelineTest.createK1gSample(projectName, k1gBam) @@ -60,14 +60,14 @@ class FullCallingPipelineTest { datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray @Test(dataProvider="datasets") - def testFullCallingPipeline(dataset: PipelineDataset) { + def testHybridSelectionPipeline(dataset: PipelineDataset) { val projectName = dataset.pipeline.getProject.getName - val testName = "FullCallingPipeline-" + projectName + val testName = "HybridSelectionPipeline-" + projectName val yamlFile = writeYaml(testName, dataset.pipeline) // Run the pipeline with the expected inputs. val pipelineCommand = - "-retry 1 -S scala/qscript/playground/FullCallingPipeline.q -jobProject %s -Y %s" + "-retry 1 -S scala/qscript/playground/HybridSelectionPipeline.scala -jobProject %s -Y %s" .format(projectName, yamlFile) val pipelineSpec = new PipelineTestSpec @@ -76,7 +76,7 @@ class FullCallingPipelineTest { pipelineSpec.jobQueue = dataset.jobQueue pipelineSpec.evalSpec = new PipelineTestEvalSpec - pipelineSpec.evalSpec.evalReport = projectName + ".cleaned.snps_and_indels.filtered.annotated.eval" + pipelineSpec.evalSpec.evalReport = projectName + ".eval" pipelineSpec.evalSpec.validations = dataset.validations PipelineTest.executeTest(pipelineSpec) diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala deleted file mode 100644 index d19b59d41..000000000 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -package org.broadinstitute.sting.queue.pipeline.playground - -import collection.JavaConversions._ -import org.broadinstitute.sting.datasources.pipeline.Pipeline -import org.broadinstitute.sting.utils.yaml.YamlUtils -import org.testng.annotations.{Test, DataProvider} -import org.broadinstitute.sting.queue.pipeline.{PipelineTestSpec, PipelineTest} -import java.io.{PrintWriter, File} -import org.apache.commons.io.IOUtils - -class MultiFullCallingPipelineTest { - def datasets = List(k1gChr20Dataset) - - val k1gChr20Dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true, "hour") - val k1gExomeDataset = newK1gDataset("Barcoded_1000G_WEx", false, "gsa") - - def newK1gDataset(datasetName: String, chr20: Boolean, pipelineJobQueue: String) = { - var dataset = new MultiPipelineDataset - dataset.name = datasetName - dataset.pipelineJobQueue = pipelineJobQueue - for (k1gBam <- PipelineTest.k1gBams) { - val project = PipelineTest.createHg19Project("SingleSample_" + k1gBam.sampleId, chr20) - val sample = PipelineTest.createK1gSample("Sample", k1gBam) - dataset.samplePipelines :+= PipelineTest.createPipeline(project, List(sample)) - } - dataset - } - - @DataProvider(name="datasets")//, parallel=true) - final def convertDatasets: Array[Array[AnyRef]] = - datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray - - @Test(dataProvider="datasets", enabled=false) - def testMultiFullCallingPipeline(dataset: MultiPipelineDataset) { - val projectName = dataset.name - val testName = "MultiFullCallingPipeline-" + projectName - - var yamlFiles = List.empty[File] - for (samplePipeline <- dataset.samplePipelines) - yamlFiles :+= writeYaml(testName, samplePipeline) - - val yamlList = writeYamlList(testName, yamlFiles) - - // Run the pipeline with the expected inputs. - val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" + - " -jobProject %s -YL %s -PJQ %s -PR 2 -stingHome %s") - .format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir) - - val pipelineSpec = new PipelineTestSpec - pipelineSpec.name = testName - pipelineSpec.args = pipelineCommand - pipelineSpec.jobQueue = "gsa" - - PipelineTest.executeTest(pipelineSpec) - } - - private def writeYaml(testName: String, pipeline: Pipeline) = { - val runDir = PipelineTest.runDir(testName) - val yamlFile = new File(runDir, pipeline.getProject.getName + "/" + pipeline.getProject.getName + ".yaml").getAbsoluteFile - yamlFile.getParentFile.mkdirs - YamlUtils.dump(pipeline, yamlFile) - yamlFile - } - - private def writeYamlList(testName: String, yamlFiles: List[File]) = { - val runDir = PipelineTest.runDir(testName) - val yamlList = new File(runDir, testName + "_yamls.list").getAbsoluteFile - yamlList.getParentFile.mkdirs - val writer = new PrintWriter(yamlList) - try { - for (yamlFile <- yamlFiles) - writer.println(yamlFile.toString) - } finally { - IOUtils.closeQuietly(writer) - } - yamlList - } - - class MultiPipelineDataset (var name: String = null, - var pipelineJobQueue: String = null, - var samplePipelines: List[Pipeline] = Nil) { - override def toString = name - } -}