Major refactoring of library and full calling pipeline (v2) structure.

Arguments to the full calling qscript (and indeed, any qscript that wants them) are now specified via the PipelineArgumentCollection

Libraries require a Pipeline object for instantiation -- eliminating their previous dependence on yaml files

Functions added to PipelineUtils to build out the proper Pipeline object from the PipelineArgumentCollection, which now contains 
additional arguments to specify pipeline properties (name, ref, bams, dbsnp, interval list); which are mutually exclusive with
the yaml file.

Pipeline length reduced to a mere 62 lines.




git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4790 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
chartl 2010-12-05 02:33:54 +00:00
parent bef48e7a42
commit 0944184832
6 changed files with 169 additions and 82 deletions

View File

@ -1,79 +1,31 @@
import org.broadinstitute.sting.commandline.ArgumentCollection
import org.broadinstitute.sting.datasources.pipeline.Pipeline import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK
import org.broadinstitute.sting.queue.pipeline.{BamProcessing,VariantCalling} import org.broadinstitute.sting.queue.pipeline._
import org.broadinstitute.sting.queue.util.PipelineUtils
import org.broadinstitute.sting.queue.{QException, QScript} import org.broadinstitute.sting.queue.{QException, QScript}
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.yaml.YamlUtils
class fullCallingPipelineV2 extends QScript { class fullCallingPipelineV2 extends QScript {
qscript => fcp =>
@Argument(doc="Number of cleaning jobs", shortName="cleaningJobs", required=false) @ArgumentCollection var pipelineArgs = new PipelineArgumentCollection
var cleaningJobs: Int = 1
@Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y")
var yamlFile: File = _
@Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false)
var trigger: File = _
@Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable")
var refseqTable: File = _
@Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false)
var picardFixMatesJar: File = new java.io.File("/seq/software/picard/current/bin/FixMateInformation.jar")
@Input(doc="path to GATK jar", shortName="gatk")
var gatkJar: File = _
@Input(doc="target Ti/Tv ratio for recalibration", shortName="titv", required=true)
var target_titv: Float = _
@Input(doc="per-sample downsampling level",shortName="dcov",required=false)
var downsampling_coverage = 300
@Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false)
var num_snp_scatter_jobs = 20
@Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false)
var num_indel_scatter_jobs = 5
@Input(doc="Skip indel-cleaning for BAM files (for testing only)", shortName="skipCleaning", required=false)
var skip_cleaning = false
//@Input(doc="ADPR script")
//var adprScript: File = _
//@Input(doc="Sequencing maching name (for use by adpr)")
//var machine: String = _
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
//var protocol: String = _
private var pipeline: Pipeline = _ private var pipeline: Pipeline = _
trait CommandLineGATKArgs extends CommandLineGATK {
this.intervals :+= qscript.pipeline.getProject.getIntervalList
this.jarFile = qscript.gatkJar
this.reference_sequence = qscript.pipeline.getProject.getReferenceFile
this.memoryLimit = Some(4)
}
// ------------ SETUP THE PIPELINE ----------- //
def script = { def script = {
pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile) pipelineArgs.verifyArguments
var callingLib: VariantCalling = new VariantCalling(qscript.yamlFile,qscript.gatkJar) pipeline = PipelineUtils.loadPipelineFromPAC(pipelineArgs)
var cleaningLib: BamProcessing = new BamProcessing(qscript.yamlFile,qscript.gatkJar,qscript.picardFixMatesJar) var callingLib: VariantCalling = new VariantCalling(pipeline,fcp.pipelineArgs.gatkJar)
var cleaningLib: BamProcessing = new BamProcessing(pipeline,fcp.pipelineArgs.gatkJar,fcp.pipelineArgs.picardFixMatesJar)
val projectBase: String = qscript.pipeline.getProject.getName val projectBase: String = fcp.pipeline.getProject.getName
val cleanedBase: String = projectBase + ".cleaned" val cleanedBase: String = projectBase + ".cleaned"
val uncleanedBase: String = projectBase + ".uncleaned" val uncleanedBase: String = projectBase + ".uncleaned"
// there are commands that use all the bam files // there are commands that use all the bam files
val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated")) val recalibratedSamples = fcp.pipeline.getSamples.filter( u => ( u.getBamFiles.contains("recalibrated") || u.getBamFiles.contains("cleaned") ) )
var bamsToClean: List[(File,File)] = Nil var bamsToClean: List[(File,File)] = Nil
var recalBams: List[File] = Nil var recalBams: List[File] = Nil
@ -90,11 +42,11 @@ class fullCallingPipelineV2 extends QScript {
cleanedBams :+= sample.getBamFiles.get("cleaned") cleanedBams :+= sample.getBamFiles.get("cleaned")
} }
if ( !qscript.skip_cleaning ) { if ( !fcp.pipelineArgs.skip_cleaning ) {
addAll(cleaningLib.StandardIndelRealign(bamsToClean,qscript.cleaningJobs)) addAll(cleaningLib.StandardIndelRealign(bamsToClean,fcp.pipelineArgs.cleaningJobs))
} }
if (!qscript.skip_cleaning) { if (!fcp.pipelineArgs.skip_cleaning ) {
endToEnd(cleanedBase, cleanedBams, callingLib) endToEnd(cleanedBase, cleanedBams, callingLib)
} else { } else {
endToEnd(uncleanedBase, recalBams, callingLib) endToEnd(uncleanedBase, recalBams, callingLib)
@ -107,12 +59,8 @@ class fullCallingPipelineV2 extends QScript {
var handfilt_vcf = new File(base+"_snps.handfiltered.annotated.vcf") var handfilt_vcf = new File(base+"_snps.handfiltered.annotated.vcf")
var indel_vcf = new File(base+"_indel_calls.vcf") var indel_vcf = new File(base+"_indel_calls.vcf")
addAll(lib.StandardCallingPipeline(bamFiles,indel_vcf,recal_vcf,handfilt_vcf,qscript.target_titv,qscript.refseqTable)) addAll(lib.StandardCallingPipeline(bamFiles,indel_vcf,recal_vcf,handfilt_vcf,fcp.pipelineArgs.target_titv,fcp.pipelineArgs.refseqTable))
} }
def addAll(clfs: List[CommandLineFunction]) = { def addAll(clfs: List[CommandLineFunction]) = { clfs.foreach(c => add(c)) }
for ( c <- clfs ) {
add(c)
}
}
} }

View File

@ -16,10 +16,13 @@ import org.broadinstitute.sting.queue.util.{PipelineUtils, IOUtils}
import org.broadinstitute.sting.commandline.{Output, Input} import org.broadinstitute.sting.commandline.{Output, Input}
import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction
class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) { class BamProcessing(attribs: Pipeline, gatkJar: File, fixMatesJar: File) {
library => library =>
var attributes: Pipeline = YamlUtils.load(classOf[Pipeline],yaml) var attributes : Pipeline = attribs
def this(yaml: File, gatkJar: File, fixMatesJar: File) = this(YamlUtils.load(classOf[Pipeline],yaml),gatkJar,fixMatesJar)
trait StandardCommandLineGATK extends CommandLineGATK { trait StandardCommandLineGATK extends CommandLineGATK {
this.reference_sequence = library.attributes.getProject.getReferenceFile this.reference_sequence = library.attributes.getProject.getReferenceFile

View File

@ -0,0 +1,86 @@
package org.broadinstitute.sting.queue.pipeline
import org.broadinstitute.sting.commandline.{Input, Argument, Hidden}
import java.io.File
class PipelineArgumentCollection {
@Argument(doc="Number of cleaning jobs", shortName="cleaningJobs", required=false)
var cleaningJobs: Int = 1
@Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y", required=false)
var yamlFile: File = _
@Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false)
var trigger: File = _
@Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable")
var refseqTable: File = _
@Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false)
var picardFixMatesJar: File = new java.io.File("/seq/software/picard/current/bin/FixMateInformation.jar")
@Input(doc="path to GATK jar", shortName="gatk")
var gatkJar: File = _
@Input(doc="target Ti/Tv ratio for recalibration", shortName="titv", required=true)
var target_titv: Float = _
@Input(doc="per-sample downsampling level",shortName="dcov",required=false)
var downsampling_coverage = 300
@Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false)
var num_snp_scatter_jobs = 20
@Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false)
var num_indel_scatter_jobs = 5
@Input(doc="Skip indel-cleaning for BAM files (for testing only)", shortName="skipCleaning", required=false)
var skip_cleaning = false
@Input(doc="List of samples and bams (in the form sample_id k1:v1,k2:v2 "+
"cleaned:/path/to/cleaned.bam,recalibrated:/path/to/recal.bam,unreacalibrated:/path/to/unrecal.bam)."+
"Mutually exclusive with YAML",required=false, shortName="pBams")
var projectBams: File = _
@Input(doc="The project name. Mutually exclusive with YAML.", required = false, shortName="pName")
var projectName: String = _
@Input(doc="The reference file. Mutually exclusive with YAML.", required=false, shortName="pRef")
var projectRef: File = _
@Input(doc="The project interval list. Mutually exclusive with YAML.", required=false, shortName="pInt")
var projectIntervals: File = _
@Input(doc="The project dbsnp. Mutually exclusive with YAML",required=false, shortName="pDB")
var projectDBSNP: File = _
//@Input(doc="ADPR script")
//var adprScript: File = _
//@Input(doc="Sequencing maching name (for use by adpr)")
//var machine: String = _
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
//var protocol: String = _
def verifyArguments = {
// if no yaml file we require all the mutually exclusive arguments
if ( yamlFile == null ) {
if ( projectBams == null ) throw new IllegalArgumentException("No YAML file provided; and no project bam list. Pipeline requires either YAML file, or full project specs.")
if ( projectName == null ) throw new IllegalArgumentException("No YAML file provided; and no project name. Pipeline requires either YAML file, or full project specs.")
if ( projectRef == null) throw new IllegalArgumentException("No YAML file provided; and no project reference. Pipeline requires either YAML file, or full project specs.")
if ( projectIntervals == null ) throw new IllegalArgumentException("No YAML file provided; and no project intervals. Pipeline requires either YAML file, or full project specs.")
if ( projectDBSNP == null ) throw new IllegalArgumentException("No YAML file provided; and no project DBSNP. Pipeline requires either YAML file, or full project specs.")
} else {
if ( projectBams != null || projectName != null || projectRef != null || projectIntervals != null || projectDBSNP != null )
throw new IllegalArgumentException("YAML file provided, along with other project spec arguments. YAML file is mutually exclusive with project specs.")
}
}
}

View File

@ -15,22 +15,22 @@ class ProjectManagement(stingPath: String) {
var stingDirPath : String = stingPath var stingDirPath : String = stingPath
class PassFilterSites(vcf_files: List[File], out_list: File) extends CommandLineFunction { class PassFilterAlleles(vcf_files: List[File], out_list: File) extends CommandLineFunction {
@Input(doc="List of VCFs to extract PF sites from") var vcfs = vcf_files @Input(doc="List of VCFs to extract PF sites from") var vcfs = vcf_files
@Output(doc="The file to write the site list to") var out_intervals = out_list @Output(doc="The file to write the site list to") var out_intervals = out_list
@Argument(doc="Path to the SortByRef script") var sortByRef: String = _ @Argument(doc="Path to the SortByRef script") var sortByRef: String = _
@Argument(doc="Path to the reference file on disk") var ref: File = _ @Argument(doc="Path to the reference file on disk") var ref: File = _
def commandLine = { def commandLine = {
"grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3}' | sort -n -k2,2 | uniq | perl %s - %s.fai | awk '{print $1\":\"$2}' > %s".format( "egrep \"FORMAT|format\" %s | cut -f1-8 > %s ; grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3\"\\t\"$4\"\\t\"$5\"\\t\"$6\"\\t.\\t.\\t.\"}' | sort -n -k2,2 | uniq | perl %s - %s.fai >> %s".format(
vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath vcf_files(1).getAbsolutePath, out_list.getAbsolutePath, vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath
) )
} }
} }
def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = { def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = {
var cmds : List[CommandLineFunction] = Nil var cmds : List[CommandLineFunction] = Nil
var pfSites : PassFilterSites = new PassFilterSites(callVCFs,swapExt(mergedVCF,".vcf",".pf.intervals.list")) var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf"))
pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl" pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl"
pfSites.ref = ref pfSites.ref = ref
@ -46,12 +46,11 @@ class ProjectManagement(stingPath: String) {
} }
def LikelihoodCalc( bam: File, ref: File, intervals: File ) : UGCalcLikelihoods = { def LikelihoodCalc( bam: File, ref: File, alleleVCF: File ) : UGCalcLikelihoods = {
var calc = new UGCalcLikelihoods var calc = new UGCalcLikelihoods
calc.input_file :+= bam calc.input_file :+= bam
calc.reference_sequence = ref calc.reference_sequence = ref
calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
calc.intervals :+= intervals
calc.downsample_to_coverage = Some(300) calc.downsample_to_coverage = Some(300)
calc.memoryLimit = Some(2) calc.memoryLimit = Some(2)
calc.min_base_quality_score = Some(22) calc.min_base_quality_score = Some(22)
@ -59,15 +58,18 @@ class ProjectManagement(stingPath: String) {
calc.genotype = true calc.genotype = true
calc.output_all_callable_bases = true calc.output_all_callable_bases = true
calc.out = swapExt(bam,".bam",".likelihoods.vcf") calc.out = swapExt(bam,".bam",".likelihoods.vcf")
calc.rodBind :+= new RodBind("allele","VCF",alleleVCF)
calc.BTI = "allele"
return calc return calc
} }
def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = { def VariantCallMerge( likelihoods: List[File], ref: File, intervalVCF: File, output: File) : UGCallVariants = {
var call = new UGCallVariants var call = new UGCallVariants
call.reference_sequence = ref call.reference_sequence = ref
call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
call.intervals :+= intervals call.rodBind :+= new RodBind("allele","vcf",intervalVCF)
call.BTI = "allele"
call.memoryLimit = Some(4) call.memoryLimit = Some(4)
call.out = output call.out = output
call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) ) call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) )

View File

@ -8,11 +8,13 @@ import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.function.CommandLineFunction
class VariantCalling(yaml: File,gatkJar: File) { class VariantCalling(attribs: Pipeline,gatkJar: File) {
vc => vc =>
// load attributes // load attributes
var attributes: Pipeline = YamlUtils.load(classOf[Pipeline], yaml) var attributes = attribs
def this(yaml: File, gatkJar: File) = this(YamlUtils.load(classOf[Pipeline],yaml),gatkJar)
/** /**
* Trait to propagate basic attributes throughout the library * Trait to propagate basic attributes throughout the library

View File

@ -5,6 +5,10 @@ import java.io.File
import org.broadinstitute.sting.utils.GenomeLocParser import org.broadinstitute.sting.utils.GenomeLocParser
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.utils.interval.IntervalUtils import org.broadinstitute.sting.utils.interval.IntervalUtils
import org.broadinstitute.sting.queue.pipeline.PipelineArgumentCollection
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline}
import org.broadinstitute.sting.utils.text.XReadLines
class PipelineUtils { class PipelineUtils {
@ -52,4 +56,46 @@ object PipelineUtils{
splitContigs.map{case (size, contigs) => contigs} splitContigs.map{case (size, contigs) => contigs}
} }
def loadPipelineFromPAC(args: PipelineArgumentCollection) : Pipeline = {
if ( args.yamlFile != null ) {
return YamlUtils.load(classOf[Pipeline], args.yamlFile)
} else {
return loadPipelineFromSpec(args.projectName,args.projectRef,args.projectIntervals,args.projectDBSNP,args.projectBams)
}
}
def loadPipelineFromSpec(name: String, ref: File, ivals: File, dbsnp: File, pBamList: File) : Pipeline = {
var newPipeline : Pipeline = new Pipeline
var pipeProject : PipelineProject = new PipelineProject
var pipeSamples : List[PipelineSample] = ((new XReadLines(pBamList)).readLines).toList.map( bamSpecToSample )
pipeProject.setName(name)
pipeProject.setReferenceFile(ref)
pipeProject.setIntervalList(ivals)
pipeProject.setDbsnpFile(dbsnp)
newPipeline.setProject(pipeProject)
newPipeline.setSamples(pipeSamples)
return newPipeline
}
//todo -- find a better name for this function
def bamSpecToSample(spec: String) : PipelineSample = {
var sam : PipelineSample = new PipelineSample
var spStr : Array[String] = spec.split("\\s")
sam.setId(spStr(0))
var tagStr : Array[String] = spStr(1).split(",")
var tagMap : java.util.HashMap[String,String] = new java.util.HashMap[String,String](tagStr.size)
tagStr.filter( u => ! u.equals("")).foreach( u => tagMap.put(u.split(":")(0),u.split(":")(1)) )
sam.setTags(tagMap)
var bamStr : Array[String] = spStr(2).split(",")
var bamMap : java.util.HashMap[String,File] = new java.util.HashMap[String,File](bamStr.size)
bamStr.foreach( u => bamMap.put( u.split(":")(0), new File(u.split(":")(1) ) ) )
sam.setBamFiles(bamMap)
return sam
}
} }