From 094418483241554323c1e60a03d250cf45412875 Mon Sep 17 00:00:00 2001 From: chartl Date: Sun, 5 Dec 2010 02:33:54 +0000 Subject: [PATCH] Major refactoring of library and full calling pipeline (v2) structure. Arguments to the full calling qscript (and indeed, any qscript that wants them) are now specified via the PipelineArgumentCollection Libraries require a Pipeline object for instantiation -- eliminating their previous dependence on yaml files Functions added to PipelineUtils to build out the proper Pipeline object from the PipelineArgumentCollection, which now contains additional arguments to specify pipeline properties (name, ref, bams, dbsnp, interval list); which are mutually exclusive with the yaml file. Pipeline length reduced to a mere 62 lines. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4790 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/chartl/fullCallingPipelineV2.q | 86 ++++--------------- .../sting/queue/pipeline/BamProcessing.scala | 7 +- .../pipeline/PipelineArgumentCollection.scala | 86 +++++++++++++++++++ .../queue/pipeline/ProjectManagement.scala | 18 ++-- .../sting/queue/pipeline/VariantCalling.scala | 8 +- .../sting/queue/util/PipelineUtils.scala | 46 ++++++++++ 6 files changed, 169 insertions(+), 82 deletions(-) create mode 100755 scala/src/org/broadinstitute/sting/queue/pipeline/PipelineArgumentCollection.scala diff --git a/scala/qscript/chartl/fullCallingPipelineV2.q b/scala/qscript/chartl/fullCallingPipelineV2.q index 2c3976d70..0f0efe01c 100755 --- a/scala/qscript/chartl/fullCallingPipelineV2.q +++ b/scala/qscript/chartl/fullCallingPipelineV2.q @@ -1,79 +1,31 @@ +import org.broadinstitute.sting.commandline.ArgumentCollection import org.broadinstitute.sting.datasources.pipeline.Pipeline import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK -import org.broadinstitute.sting.queue.pipeline.{BamProcessing,VariantCalling} +import org.broadinstitute.sting.queue.pipeline._ +import org.broadinstitute.sting.queue.util.PipelineUtils import org.broadinstitute.sting.queue.{QException, QScript} import collection.JavaConversions._ import org.broadinstitute.sting.utils.yaml.YamlUtils class fullCallingPipelineV2 extends QScript { - qscript => - - @Argument(doc="Number of cleaning jobs", shortName="cleaningJobs", required=false) - var cleaningJobs: Int = 1 - - @Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y") - var yamlFile: File = _ - - @Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false) - var trigger: File = _ - - @Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable") - var refseqTable: File = _ - - @Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false) - var picardFixMatesJar: File = new java.io.File("/seq/software/picard/current/bin/FixMateInformation.jar") - - @Input(doc="path to GATK jar", shortName="gatk") - var gatkJar: File = _ - - @Input(doc="target Ti/Tv ratio for recalibration", shortName="titv", required=true) - var target_titv: Float = _ - - @Input(doc="per-sample downsampling level",shortName="dcov",required=false) - var downsampling_coverage = 300 - - @Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false) - var num_snp_scatter_jobs = 20 - - @Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false) - var num_indel_scatter_jobs = 5 - - @Input(doc="Skip indel-cleaning for BAM files (for testing only)", shortName="skipCleaning", required=false) - var skip_cleaning = false - - //@Input(doc="ADPR script") - //var adprScript: File = _ - - //@Input(doc="Sequencing maching name (for use by adpr)") - //var machine: String = _ - - //@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection") - //var protocol: String = _ + fcp => + @ArgumentCollection var pipelineArgs = new PipelineArgumentCollection + private var pipeline: Pipeline = _ - trait CommandLineGATKArgs extends CommandLineGATK { - this.intervals :+= qscript.pipeline.getProject.getIntervalList - this.jarFile = qscript.gatkJar - this.reference_sequence = qscript.pipeline.getProject.getReferenceFile - this.memoryLimit = Some(4) - } - - - // ------------ SETUP THE PIPELINE ----------- // - - def script = { - pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile) - var callingLib: VariantCalling = new VariantCalling(qscript.yamlFile,qscript.gatkJar) - var cleaningLib: BamProcessing = new BamProcessing(qscript.yamlFile,qscript.gatkJar,qscript.picardFixMatesJar) + pipelineArgs.verifyArguments + pipeline = PipelineUtils.loadPipelineFromPAC(pipelineArgs) + var callingLib: VariantCalling = new VariantCalling(pipeline,fcp.pipelineArgs.gatkJar) + var cleaningLib: BamProcessing = new BamProcessing(pipeline,fcp.pipelineArgs.gatkJar,fcp.pipelineArgs.picardFixMatesJar) - val projectBase: String = qscript.pipeline.getProject.getName + val projectBase: String = fcp.pipeline.getProject.getName val cleanedBase: String = projectBase + ".cleaned" val uncleanedBase: String = projectBase + ".uncleaned" // there are commands that use all the bam files - val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated")) + val recalibratedSamples = fcp.pipeline.getSamples.filter( u => ( u.getBamFiles.contains("recalibrated") || u.getBamFiles.contains("cleaned") ) ) var bamsToClean: List[(File,File)] = Nil var recalBams: List[File] = Nil @@ -90,11 +42,11 @@ class fullCallingPipelineV2 extends QScript { cleanedBams :+= sample.getBamFiles.get("cleaned") } - if ( !qscript.skip_cleaning ) { - addAll(cleaningLib.StandardIndelRealign(bamsToClean,qscript.cleaningJobs)) + if ( !fcp.pipelineArgs.skip_cleaning ) { + addAll(cleaningLib.StandardIndelRealign(bamsToClean,fcp.pipelineArgs.cleaningJobs)) } - if (!qscript.skip_cleaning) { + if (!fcp.pipelineArgs.skip_cleaning ) { endToEnd(cleanedBase, cleanedBams, callingLib) } else { endToEnd(uncleanedBase, recalBams, callingLib) @@ -107,12 +59,8 @@ class fullCallingPipelineV2 extends QScript { var handfilt_vcf = new File(base+"_snps.handfiltered.annotated.vcf") var indel_vcf = new File(base+"_indel_calls.vcf") - addAll(lib.StandardCallingPipeline(bamFiles,indel_vcf,recal_vcf,handfilt_vcf,qscript.target_titv,qscript.refseqTable)) + addAll(lib.StandardCallingPipeline(bamFiles,indel_vcf,recal_vcf,handfilt_vcf,fcp.pipelineArgs.target_titv,fcp.pipelineArgs.refseqTable)) } - def addAll(clfs: List[CommandLineFunction]) = { - for ( c <- clfs ) { - add(c) - } - } + def addAll(clfs: List[CommandLineFunction]) = { clfs.foreach(c => add(c)) } } diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala index cbf9f9356..74cfd05f2 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala @@ -16,10 +16,13 @@ import org.broadinstitute.sting.queue.util.{PipelineUtils, IOUtils} import org.broadinstitute.sting.commandline.{Output, Input} import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction -class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) { +class BamProcessing(attribs: Pipeline, gatkJar: File, fixMatesJar: File) { library => - var attributes: Pipeline = YamlUtils.load(classOf[Pipeline],yaml) + var attributes : Pipeline = attribs + + def this(yaml: File, gatkJar: File, fixMatesJar: File) = this(YamlUtils.load(classOf[Pipeline],yaml),gatkJar,fixMatesJar) + trait StandardCommandLineGATK extends CommandLineGATK { this.reference_sequence = library.attributes.getProject.getReferenceFile diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/PipelineArgumentCollection.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/PipelineArgumentCollection.scala new file mode 100755 index 000000000..e7c741cef --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/PipelineArgumentCollection.scala @@ -0,0 +1,86 @@ +package org.broadinstitute.sting.queue.pipeline + +import org.broadinstitute.sting.commandline.{Input, Argument, Hidden} +import java.io.File + + +class PipelineArgumentCollection { + + @Argument(doc="Number of cleaning jobs", shortName="cleaningJobs", required=false) + var cleaningJobs: Int = 1 + + @Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y", required=false) + var yamlFile: File = _ + + @Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false) + var trigger: File = _ + + @Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable") + var refseqTable: File = _ + + @Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false) + var picardFixMatesJar: File = new java.io.File("/seq/software/picard/current/bin/FixMateInformation.jar") + + @Input(doc="path to GATK jar", shortName="gatk") + var gatkJar: File = _ + + @Input(doc="target Ti/Tv ratio for recalibration", shortName="titv", required=true) + var target_titv: Float = _ + + @Input(doc="per-sample downsampling level",shortName="dcov",required=false) + var downsampling_coverage = 300 + + @Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false) + var num_snp_scatter_jobs = 20 + + @Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false) + var num_indel_scatter_jobs = 5 + + @Input(doc="Skip indel-cleaning for BAM files (for testing only)", shortName="skipCleaning", required=false) + var skip_cleaning = false + + @Input(doc="List of samples and bams (in the form sample_id k1:v1,k2:v2 "+ + "cleaned:/path/to/cleaned.bam,recalibrated:/path/to/recal.bam,unreacalibrated:/path/to/unrecal.bam)."+ + "Mutually exclusive with YAML",required=false, shortName="pBams") + var projectBams: File = _ + + @Input(doc="The project name. Mutually exclusive with YAML.", required = false, shortName="pName") + var projectName: String = _ + + @Input(doc="The reference file. Mutually exclusive with YAML.", required=false, shortName="pRef") + var projectRef: File = _ + + @Input(doc="The project interval list. Mutually exclusive with YAML.", required=false, shortName="pInt") + var projectIntervals: File = _ + + @Input(doc="The project dbsnp. Mutually exclusive with YAML",required=false, shortName="pDB") + var projectDBSNP: File = _ + + //@Input(doc="ADPR script") + //var adprScript: File = _ + + //@Input(doc="Sequencing maching name (for use by adpr)") + //var machine: String = _ + + //@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection") + //var protocol: String = _ + + def verifyArguments = { + // if no yaml file we require all the mutually exclusive arguments + if ( yamlFile == null ) { + if ( projectBams == null ) throw new IllegalArgumentException("No YAML file provided; and no project bam list. Pipeline requires either YAML file, or full project specs.") + + if ( projectName == null ) throw new IllegalArgumentException("No YAML file provided; and no project name. Pipeline requires either YAML file, or full project specs.") + + if ( projectRef == null) throw new IllegalArgumentException("No YAML file provided; and no project reference. Pipeline requires either YAML file, or full project specs.") + + if ( projectIntervals == null ) throw new IllegalArgumentException("No YAML file provided; and no project intervals. Pipeline requires either YAML file, or full project specs.") + + if ( projectDBSNP == null ) throw new IllegalArgumentException("No YAML file provided; and no project DBSNP. Pipeline requires either YAML file, or full project specs.") + } else { + if ( projectBams != null || projectName != null || projectRef != null || projectIntervals != null || projectDBSNP != null ) + throw new IllegalArgumentException("YAML file provided, along with other project spec arguments. YAML file is mutually exclusive with project specs.") + } + } + +} \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala index 8f3e30f0b..c255819b2 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala @@ -15,22 +15,22 @@ class ProjectManagement(stingPath: String) { var stingDirPath : String = stingPath - class PassFilterSites(vcf_files: List[File], out_list: File) extends CommandLineFunction { + class PassFilterAlleles(vcf_files: List[File], out_list: File) extends CommandLineFunction { @Input(doc="List of VCFs to extract PF sites from") var vcfs = vcf_files @Output(doc="The file to write the site list to") var out_intervals = out_list @Argument(doc="Path to the SortByRef script") var sortByRef: String = _ @Argument(doc="Path to the reference file on disk") var ref: File = _ def commandLine = { - "grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3}' | sort -n -k2,2 | uniq | perl %s - %s.fai | awk '{print $1\":\"$2}' > %s".format( - vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath + "egrep \"FORMAT|format\" %s | cut -f1-8 > %s ; grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3\"\\t\"$4\"\\t\"$5\"\\t\"$6\"\\t.\\t.\\t.\"}' | sort -n -k2,2 | uniq | perl %s - %s.fai >> %s".format( + vcf_files(1).getAbsolutePath, out_list.getAbsolutePath, vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath ) } } def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = { var cmds : List[CommandLineFunction] = Nil - var pfSites : PassFilterSites = new PassFilterSites(callVCFs,swapExt(mergedVCF,".vcf",".pf.intervals.list")) + var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf")) pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl" pfSites.ref = ref @@ -46,12 +46,11 @@ class ProjectManagement(stingPath: String) { } - def LikelihoodCalc( bam: File, ref: File, intervals: File ) : UGCalcLikelihoods = { + def LikelihoodCalc( bam: File, ref: File, alleleVCF: File ) : UGCalcLikelihoods = { var calc = new UGCalcLikelihoods calc.input_file :+= bam calc.reference_sequence = ref calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") - calc.intervals :+= intervals calc.downsample_to_coverage = Some(300) calc.memoryLimit = Some(2) calc.min_base_quality_score = Some(22) @@ -59,15 +58,18 @@ class ProjectManagement(stingPath: String) { calc.genotype = true calc.output_all_callable_bases = true calc.out = swapExt(bam,".bam",".likelihoods.vcf") + calc.rodBind :+= new RodBind("allele","VCF",alleleVCF) + calc.BTI = "allele" return calc } - def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = { + def VariantCallMerge( likelihoods: List[File], ref: File, intervalVCF: File, output: File) : UGCallVariants = { var call = new UGCallVariants call.reference_sequence = ref call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") - call.intervals :+= intervals + call.rodBind :+= new RodBind("allele","vcf",intervalVCF) + call.BTI = "allele" call.memoryLimit = Some(4) call.out = output call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) ) diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala index 024ecfd3f..3983e3538 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala @@ -8,12 +8,14 @@ import org.broadinstitute.sting.queue.extensions.gatk._ import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.queue.function.CommandLineFunction -class VariantCalling(yaml: File,gatkJar: File) { +class VariantCalling(attribs: Pipeline,gatkJar: File) { vc => // load attributes - var attributes: Pipeline = YamlUtils.load(classOf[Pipeline], yaml) - + var attributes = attribs + + def this(yaml: File, gatkJar: File) = this(YamlUtils.load(classOf[Pipeline],yaml),gatkJar) + /** * Trait to propagate basic attributes throughout the library */ diff --git a/scala/src/org/broadinstitute/sting/queue/util/PipelineUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/PipelineUtils.scala index 19c34071e..ec6aa28d7 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/PipelineUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/PipelineUtils.scala @@ -5,6 +5,10 @@ import java.io.File import org.broadinstitute.sting.utils.GenomeLocParser import collection.JavaConversions._ import org.broadinstitute.sting.utils.interval.IntervalUtils +import org.broadinstitute.sting.queue.pipeline.PipelineArgumentCollection +import org.broadinstitute.sting.utils.yaml.YamlUtils +import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline} +import org.broadinstitute.sting.utils.text.XReadLines class PipelineUtils { @@ -52,4 +56,46 @@ object PipelineUtils{ splitContigs.map{case (size, contigs) => contigs} } + + def loadPipelineFromPAC(args: PipelineArgumentCollection) : Pipeline = { + if ( args.yamlFile != null ) { + return YamlUtils.load(classOf[Pipeline], args.yamlFile) + } else { + return loadPipelineFromSpec(args.projectName,args.projectRef,args.projectIntervals,args.projectDBSNP,args.projectBams) + } + } + + def loadPipelineFromSpec(name: String, ref: File, ivals: File, dbsnp: File, pBamList: File) : Pipeline = { + var newPipeline : Pipeline = new Pipeline + var pipeProject : PipelineProject = new PipelineProject + var pipeSamples : List[PipelineSample] = ((new XReadLines(pBamList)).readLines).toList.map( bamSpecToSample ) + + pipeProject.setName(name) + pipeProject.setReferenceFile(ref) + pipeProject.setIntervalList(ivals) + pipeProject.setDbsnpFile(dbsnp) + + newPipeline.setProject(pipeProject) + newPipeline.setSamples(pipeSamples) + + return newPipeline + } + + //todo -- find a better name for this function + def bamSpecToSample(spec: String) : PipelineSample = { + var sam : PipelineSample = new PipelineSample + var spStr : Array[String] = spec.split("\\s") + sam.setId(spStr(0)) + var tagStr : Array[String] = spStr(1).split(",") + var tagMap : java.util.HashMap[String,String] = new java.util.HashMap[String,String](tagStr.size) + tagStr.filter( u => ! u.equals("")).foreach( u => tagMap.put(u.split(":")(0),u.split(":")(1)) ) + sam.setTags(tagMap) + var bamStr : Array[String] = spStr(2).split(",") + var bamMap : java.util.HashMap[String,File] = new java.util.HashMap[String,File](bamStr.size) + bamStr.foreach( u => bamMap.put( u.split(":")(0), new File(u.split(":")(1) ) ) ) + sam.setBamFiles(bamMap) + + return sam + + } } \ No newline at end of file