350 lines
16 KiB
Plaintext
Executable File
350 lines
16 KiB
Plaintext
Executable File
|
|
import org.broadinstitute.sting.commandline.ArgumentSource
|
|
import org.broadinstitute.sting.datasources.pipeline.Pipeline
|
|
import org.broadinstitute.sting.queue.extensions.gatk._
|
|
import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
|
|
import org.broadinstitute.sting.queue.extensions.samtools._
|
|
import org.broadinstitute.sting.queue.function.ListWriterFunction
|
|
import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction}
|
|
import org.broadinstitute.sting.queue.QScript
|
|
import collection.JavaConversions._
|
|
import org.broadinstitute.sting.utils.yaml.YamlUtils
|
|
|
|
class FullCallingPipeline extends QScript {
|
|
qscript =>
|
|
|
|
@Argument(doc="the YAML file specifying inputs, interval lists, reference sequence, etc.", shortName="Y")
|
|
var yamlFile: File = _
|
|
|
|
@Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false)
|
|
var trigger: File = _
|
|
|
|
// TODO: Fix command lines that pass -refseqTable
|
|
@Input(doc="path to refseqTable (for GenomicAnnotator) if not present in the YAML", shortName="refseqTable", required=false)
|
|
var refseqTable: File = _
|
|
|
|
@Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false)
|
|
var picardFixMatesJar: File = new java.io.File("/seq/software/picard/current/bin/FixMateInformation.jar")
|
|
|
|
@Input(doc="path to GATK jar")
|
|
var gatkJar: File = _
|
|
|
|
@Input(doc="per-sample downsampling level",shortName="dcov",required=false)
|
|
var downsampling_coverage = 600
|
|
|
|
@Input(doc="level of parallelism for IndelRealigner. By default is set to 1.", shortName="cleanerScatter", required=false)
|
|
var num_cleaner_scatter_jobs = 1
|
|
|
|
@Input(doc="level of parallelism for UnifiedGenotyper. By default is set to 20.", shortName="snpScatter", required=false)
|
|
var num_snp_scatter_jobs = 20
|
|
|
|
//@Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false)
|
|
//var num_indel_scatter_jobs = 5
|
|
|
|
@Input(doc="Skip indel-cleaning for BAM files (for testing only)", shortName="skipCleaning", required=false)
|
|
var skip_cleaning = false
|
|
|
|
@Input(doc="ADPR script", shortName ="tearScript", required=false)
|
|
var tearScript: File = _
|
|
|
|
//@Input(doc="Sequencing maching name (for use by adpr)")
|
|
//var machine: String = _
|
|
|
|
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
|
|
//var protocol: String = _
|
|
|
|
// TODO: Fix command lines that pass -bigMemQueue
|
|
@Argument(doc="Unused", shortName="bigMemQueue", required=false)
|
|
var big_mem_queue: String = _
|
|
|
|
@Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false)
|
|
var short_job_queue: String = _
|
|
|
|
|
|
|
|
private var pipeline: Pipeline = _
|
|
|
|
trait CommandLineGATKArgs extends CommandLineGATK {
|
|
this.intervals = List(qscript.pipeline.getProject.getIntervalList)
|
|
this.jarFile = qscript.gatkJar
|
|
this.reference_sequence = qscript.pipeline.getProject.getReferenceFile
|
|
this.memoryLimit = Some(4)
|
|
}
|
|
|
|
|
|
// ------------ SETUP THE PIPELINE ----------- //
|
|
|
|
|
|
def script = {
|
|
pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile)
|
|
|
|
val projectBase: String = qscript.pipeline.getProject.getName
|
|
// TODO: Fix command lines that pass -refseqTable
|
|
if (qscript.refseqTable != null)
|
|
qscript.pipeline.getProject.setRefseqTable(qscript.refseqTable)
|
|
if (qscript.skip_cleaning) {
|
|
//endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind)
|
|
|
|
endToEnd(projectBase + ".uncleaned", "recalibrated")
|
|
} else {
|
|
|
|
// there are commands that use all the bam files
|
|
val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated"))
|
|
//val adprRScript = qscript.adprScript
|
|
//val seq = qscript.machine
|
|
//val expKind = qscript.protocol
|
|
|
|
for ( sample <- recalibratedSamples ) {
|
|
val sampleId = sample.getId
|
|
// put unclean bams in unclean genotypers in advance, create the extension files
|
|
val bam = sample.getBamFiles.get("recalibrated")
|
|
if (!sample.getBamFiles.contains("cleaned")) {
|
|
sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam"))
|
|
}
|
|
|
|
val cleaned_bam = sample.getBamFiles.get("cleaned")
|
|
val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list")
|
|
|
|
// create the cleaning commands
|
|
val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs
|
|
targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId))
|
|
targetCreator.jobName = "CreateTargets_"+sampleId
|
|
targetCreator.analysisName = "CreateTargets_"+sampleId
|
|
targetCreator.input_file :+= bam
|
|
targetCreator.out = indel_targets
|
|
targetCreator.memoryLimit = Some(2)
|
|
targetCreator.isIntermediate = true
|
|
|
|
val realigner = new IndelRealigner with CommandLineGATKArgs
|
|
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
|
|
realigner.analysisName = "RealignBam_"+sampleId
|
|
realigner.input_file = targetCreator.input_file
|
|
realigner.targetIntervals = targetCreator.out
|
|
realigner.intervals = Nil
|
|
realigner.intervalsString = Nil
|
|
realigner.scatterCount = num_cleaner_scatter_jobs
|
|
realigner.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp)
|
|
realigner.rodBind :+= RodBind("indels", "VCF", swapExt(realigner.reference_sequence.getParentFile, realigner.reference_sequence, "fasta", "1kg_pilot_indels.vcf"))
|
|
|
|
// if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates
|
|
if (realigner.scatterCount > 1) {
|
|
realigner.out = cleaned_bam
|
|
// While gathering run fix mates.
|
|
realigner.setupScatterFunction = {
|
|
case scatter: ScatterFunction =>
|
|
scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId))
|
|
scatter.jobOutputFile = new File(".queue/logs/Cleaning/%s/Scatter.out".format(sampleId))
|
|
}
|
|
realigner.setupCloneFunction = {
|
|
case (clone: CloneFunction, index: Int) =>
|
|
clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index))
|
|
clone.jobOutputFile = new File(".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index))
|
|
}
|
|
realigner.setupGatherFunction = {
|
|
case (gather: BamGatherFunction, source: ArgumentSource) =>
|
|
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
|
|
gather.jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
|
|
gather.memoryLimit = Some(6)
|
|
gather.jarFile = qscript.picardFixMatesJar
|
|
// Don't pass this AS=true to fix mates!
|
|
gather.assumeSorted = None
|
|
case (gather: GatherFunction, source: ArgumentSource) =>
|
|
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
|
|
gather.jobOutputFile = new File(".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName))
|
|
}
|
|
|
|
add(targetCreator,realigner)
|
|
} else {
|
|
realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam")
|
|
realigner.isIntermediate = true
|
|
|
|
// Explicitly run fix mates if the function won't be scattered.
|
|
val fixMates = new PicardBamJarFunction {
|
|
// Declare inputs/outputs for dependency tracking.
|
|
@Input(doc="unfixed bam") var unfixed: File = _
|
|
@Output(doc="fixed bam") var fixed: File = _
|
|
def inputBams = List(unfixed)
|
|
def outputBam = fixed
|
|
}
|
|
|
|
fixMates.jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
|
|
fixMates.memoryLimit = Some(6)
|
|
fixMates.jarFile = qscript.picardFixMatesJar
|
|
fixMates.unfixed = realigner.out
|
|
fixMates.fixed = cleaned_bam
|
|
fixMates.analysisName = "FixMates_"+sampleId
|
|
|
|
// Add the fix mates explicitly
|
|
add(targetCreator,realigner,fixMates)
|
|
}
|
|
|
|
var samtoolsindex = new SamtoolsIndexFunction
|
|
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
|
|
samtoolsindex.bamFile = cleaned_bam
|
|
samtoolsindex.analysisName = "index_cleaned_"+sampleId
|
|
samtoolsindex.jobQueue = qscript.short_job_queue
|
|
add(samtoolsindex)
|
|
}
|
|
|
|
//endToEnd(projectBase + ".cleaned", "cleaned", adprRscript, seq, expKind)
|
|
endToEnd(projectBase + ".cleaned", "cleaned")
|
|
}
|
|
}
|
|
|
|
def endToEnd(base: String, bamType: String) = {
|
|
|
|
val samples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains(bamType)).toList
|
|
val bamFiles = samples.map(_.getBamFiles.get(bamType))
|
|
|
|
// step through the un-indel-cleaned graph:
|
|
// 1a. call snps and indels
|
|
val snps = new UnifiedGenotyper with CommandLineGATKArgs
|
|
snps.jobOutputFile = new File(".queue/logs/SNPCalling/UnifiedGenotyper.out")
|
|
snps.analysisName = base+"_SNP_calls"
|
|
snps.input_file = bamFiles
|
|
snps.input_file = bamFiles
|
|
snps.group :+= "Standard"
|
|
snps.out = new File("SnpCalls", base+".vcf")
|
|
snps.downsample_to_coverage = Some(qscript.downsampling_coverage)
|
|
snps.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp)
|
|
snps.memoryLimit = Some(6)
|
|
|
|
snps.scatterCount = qscript.num_snp_scatter_jobs
|
|
snps.setupScatterFunction = {
|
|
case scatter: ScatterFunction =>
|
|
scatter.commandDirectory = new File("SnpCalls/ScatterGather")
|
|
scatter.jobOutputFile = new File(".queue/logs/SNPCalling/ScatterGather/Scatter.out")
|
|
}
|
|
snps.setupCloneFunction = {
|
|
case (clone: CloneFunction, index: Int) =>
|
|
clone.commandDirectory = new File("SnpCalls/ScatterGather/Scatter_%s".format(index))
|
|
clone.jobOutputFile = new File(".queue/logs/SNPCalling/ScatterGather/Scatter_%s.out".format(index))
|
|
}
|
|
snps.setupGatherFunction = {
|
|
case (gather: GatherFunction, source: ArgumentSource) =>
|
|
gather.commandDirectory = new File("SnpCalls/ScatterGather/Gather_%s".format(source.field.getName))
|
|
gather.jobOutputFile = new File(".queue/logs/SNPCalling/ScatterGather/Gather_%s.out".format(source.field.getName))
|
|
}
|
|
|
|
val indels = new UnifiedGenotyper with CommandLineGATKArgs
|
|
indels.jobOutputFile = new File(".queue/logs/IndelCalling/UnifiedGenotyper.out")
|
|
indels.analysisName = base+"_Indel_calls"
|
|
indels.input_file = bamFiles
|
|
indels.input_file = bamFiles
|
|
indels.group :+= "Standard"
|
|
indels.out = new File("IndelCalls", base+".vcf")
|
|
indels.downsample_to_coverage = Some(qscript.downsampling_coverage)
|
|
indels.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getGenotypeDbsnpType, qscript.pipeline.getProject.getGenotypeDbsnp)
|
|
indels.memoryLimit = Some(6)
|
|
indels.genotype_likelihoods_model = Option(org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeLikelihoodsCalculationModel.Model.DINDEL)
|
|
|
|
|
|
|
|
indels.scatterCount = qscript.num_snp_scatter_jobs
|
|
indels.setupScatterFunction = {
|
|
case scatter: ScatterFunction =>
|
|
scatter.commandDirectory = new File("IndelCalls/ScatterGather")
|
|
scatter.jobOutputFile = new File(".queue/logs/IndelCalling/ScatterGather/Scatter.out")
|
|
}
|
|
indels.setupCloneFunction = {
|
|
case (clone: CloneFunction, index: Int) =>
|
|
clone.commandDirectory = new File("IndelCalls/ScatterGather/Scatter_%s".format(index))
|
|
clone.jobOutputFile = new File(".queue/logs/IndelCalling/ScatterGather/Scatter_%s.out".format(index))
|
|
}
|
|
indels.setupGatherFunction = {
|
|
case (gather: GatherFunction, source: ArgumentSource) =>
|
|
gather.commandDirectory = new File("IndelCalls/ScatterGather/Gather_%s".format(source.field.getName))
|
|
gather.jobOutputFile = new File(".queue/logs/IndelCalling/ScatterGather/Gather_%s.out".format(source.field.getName))
|
|
}
|
|
|
|
|
|
// 1b. genomically annotate SNPs -- no longer slow
|
|
val annotated = new GenomicAnnotator with CommandLineGATKArgs
|
|
annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out")
|
|
annotated.rodBind :+= RodBind("variant", "VCF", snps.out)
|
|
annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable)
|
|
annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf")
|
|
annotated.rodToIntervalTrackName = "variant"
|
|
annotated.analysisName = base+"_GenomicAnnotator"
|
|
|
|
// 2.a filter on cluster and near indels
|
|
val masker = new VariantFiltration with CommandLineGATKArgs
|
|
masker.jobOutputFile = new File(".queue/logs/SNPCalling/Masker.out")
|
|
masker.variantVCF = annotated.out
|
|
masker.rodBind :+= RodBind("mask", "VCF", indels.out)
|
|
masker.maskName = "NearIndel"
|
|
masker.clusterWindowSize = Some(10)
|
|
masker.clusterSize = Some(3)
|
|
masker.out = swapExt("SnpCalls",annotated.out,".vcf",".indel.masked.vcf")
|
|
masker.analysisName = base+"_Cluster_and_Indel_filter"
|
|
|
|
// 2.b hand filter with standard filter
|
|
val handFilter = new VariantFiltration with CommandLineGATKArgs
|
|
handFilter.jobOutputFile = new File(".queue/logs/SNPCalling/HandFilter.out")
|
|
handFilter.variantVCF = masker.out
|
|
handFilter.rodBind :+= RodBind("mask", "VCF", indels.out)
|
|
//handFilter.filterName ++= List("StrandBias","AlleleBalance","QualByDepth","HomopolymerRun")
|
|
//handFilter.filterExpression ++= List("\"SB>=0.10\"","\"AB>=0.75\"","\"QD<5.0\"","\"HRun>=4\"")
|
|
handFilter.filterName ++= List("StrandBias","QualByDepth","HomopolymerRun")
|
|
handFilter.filterExpression ++= List("\"SB>=0.10\"","\"QD<5.0\"","\"HRun>=4\"")
|
|
handFilter.out = swapExt("SnpCalls",annotated.out,".vcf",".handfiltered.vcf")
|
|
handFilter.analysisName = base+"_HandFilter"
|
|
|
|
|
|
// 4. Variant eval the cut and the hand-filtered vcf files
|
|
val eval = new VariantEval with CommandLineGATKArgs
|
|
eval.jobOutputFile = new File(".queue/logs/SNPCalling/VariantEval.out")
|
|
// eval.rodBind :+= RodBind("evalOptimized", "VCF", cut.out)
|
|
eval.rodBind :+= RodBind("eval", "VCF", handFilter.out)
|
|
eval.evalModule ++= List("FunctionalClassBySample", "SimpleMetricsBySample", "CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator")
|
|
eval.reportLocation = new File("SnpCalls", base+".eval")
|
|
eval.reportType = Option(org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType.R)
|
|
eval.analysisName = base+"_VariantEval"
|
|
eval.rodBind :+= RodBind("dbsnp", qscript.pipeline.getProject.getEvalDbsnpType, qscript.pipeline.getProject.getEvalDbsnp)
|
|
|
|
// 5. Make the bam list
|
|
val listOfBams = new File(base +".BamFiles.list")
|
|
|
|
val writeBamList = new ListWriterFunction
|
|
writeBamList.inputFiles = bamFiles
|
|
writeBamList.listFile = listOfBams
|
|
writeBamList.analysisName = base + "_BamList"
|
|
writeBamList.jobOutputFile = new File(".queue/logs/SNPCalling/bamlist.out")
|
|
|
|
// 6. Run the ADPR and make pretty stuff
|
|
|
|
add(snps, indels, annotated,masker,handFilter,eval,writeBamList)
|
|
if (qscript.tearScript != null){
|
|
class rCommand extends CommandLineFunction{
|
|
@Input(doc="R script")
|
|
var script: File = _
|
|
@Input(doc="pipeline yaml")
|
|
var yaml: File = _
|
|
@Input(doc="list of bams")
|
|
var bamlist: File =_
|
|
@Input(doc="Eval files root")
|
|
var evalroot: File =_
|
|
@Output(doc="tearsheet loc")
|
|
var tearsheet: File =_
|
|
def commandLine = "Rscript %s -yaml %s -bamlist %s -evalroot %s -tearout %s".format(script, yaml, bamlist, evalroot, tearsheet)
|
|
}
|
|
|
|
val adpr = new rCommand
|
|
adpr.bamlist = listOfBams
|
|
adpr.yaml = qscript.yamlFile.getAbsoluteFile
|
|
adpr.script = qscript.tearScript
|
|
adpr.evalroot = eval.reportLocation
|
|
adpr.jobOutputFile = new File(".queue/logs/SNPCalling/adpr.out")
|
|
adpr.tearsheet = new File("SnpCalls", base + ".tearsheet.pdf")
|
|
adpr.analysisName = base + "_ADPR"
|
|
|
|
|
|
add(adpr)
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|