Tightening of the batch merging pipeline. Optimized to run on hour queue, so please: if you run this, crush 'hour' with it. Testing is forthcoming, but it merged 700 samples overnight.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4805 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
16e1bbd380
commit
f8dd59c1d1
|
|
@ -4,6 +4,7 @@ import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK
|
||||||
import org.broadinstitute.sting.queue.pipeline.{ProjectManagement, BamProcessing, VariantCalling}
|
import org.broadinstitute.sting.queue.pipeline.{ProjectManagement, BamProcessing, VariantCalling}
|
||||||
import org.broadinstitute.sting.queue.{QException, QScript}
|
import org.broadinstitute.sting.queue.{QException, QScript}
|
||||||
import collection.JavaConversions._
|
import collection.JavaConversions._
|
||||||
|
import org.broadinstitute.sting.utils.text.XReadLines
|
||||||
import org.broadinstitute.sting.utils.yaml.YamlUtils
|
import org.broadinstitute.sting.utils.yaml.YamlUtils
|
||||||
|
|
||||||
class batchMergePipeline extends QScript {
|
class batchMergePipeline extends QScript {
|
||||||
|
|
@ -22,18 +23,10 @@ class batchMergePipeline extends QScript {
|
||||||
var vcfs : List[File] = extractFileEntries(vcfList)
|
var vcfs : List[File] = extractFileEntries(vcfList)
|
||||||
var bams : List[File] = extractFileEntries(bamList)
|
var bams : List[File] = extractFileEntries(bamList)
|
||||||
var pmLib = new ProjectManagement(stingDir)
|
var pmLib = new ProjectManagement(stingDir)
|
||||||
addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref,25))
|
addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref,20))
|
||||||
}
|
}
|
||||||
|
|
||||||
def extractFileEntries(in: File): List[File] = {
|
def extractFileEntries(in: File): List[File] = {
|
||||||
var reader : BufferedReader = new BufferedReader(new FileReader(in))
|
return (new XReadLines(in)).readLines.toList.map( new File(_) )
|
||||||
var entries : List[File] = Nil
|
|
||||||
var line : String = reader.readLine
|
|
||||||
while ( line != null ) {
|
|
||||||
entries :+= new File(line)
|
|
||||||
line = reader.readLine()
|
|
||||||
}
|
|
||||||
|
|
||||||
return entries
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,6 +27,7 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
|
||||||
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = {
|
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = {
|
||||||
if (originalFunction.isInstanceOf[CommandLineGATK]) {
|
if (originalFunction.isInstanceOf[CommandLineGATK]) {
|
||||||
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
|
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
|
||||||
|
if ( gatk.BTI != null && gatk.BTIMR == null) throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)")
|
||||||
gatk.reference_sequence != null
|
gatk.reference_sequence != null
|
||||||
} else false
|
} else false
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import org.broadinstitute.sting.utils.yaml.YamlUtils
|
||||||
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
||||||
|
|
||||||
class ProjectManagement(stingPath: String) {
|
class ProjectManagement(stingPath: String) {
|
||||||
|
// TODO -- MAJOR scatter/gather numbers are hard-coded. Really need a function to set it to the right number. These are optimized to get everything on 'hour'
|
||||||
|
|
||||||
pm =>
|
pm =>
|
||||||
|
|
||||||
|
|
@ -28,6 +29,15 @@ class ProjectManagement(stingPath: String) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Vcf2Intervals(in_vcf: File, out_intervals: File) extends CommandLineFunction {
|
||||||
|
@Input(doc="The VCF to convert") var vcf: File = in_vcf
|
||||||
|
@Output(doc="The output bed file") var intervals: File = out_intervals
|
||||||
|
|
||||||
|
def commandLine = {
|
||||||
|
"grep -v \\\\# %s | awk '{print $1\":\"$2}' | uniq > %s".format(vcf.getAbsolutePath,intervals.getAbsolutePath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File, size: Int) : List[CommandLineFunction] = {
|
def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File, size: Int) : List[CommandLineFunction] = {
|
||||||
var cmds : List[CommandLineFunction] = Nil
|
var cmds : List[CommandLineFunction] = Nil
|
||||||
var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf"))
|
var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf"))
|
||||||
|
|
@ -35,45 +45,49 @@ class ProjectManagement(stingPath: String) {
|
||||||
pfSites.ref = ref
|
pfSites.ref = ref
|
||||||
|
|
||||||
cmds :+= pfSites
|
cmds :+= pfSites
|
||||||
|
|
||||||
|
var ints : Vcf2Intervals = new Vcf2Intervals(pfSites.out_vcf,swapExt(pfSites.out_vcf,".vcf",".intervals.list"))
|
||||||
|
|
||||||
|
cmds :+= ints
|
||||||
|
|
||||||
var calcs: List[UGCalcLikelihoods] = allBams.grouped(size).toList.zipWithIndex.map(u => LikelihoodCalc(u._1,ref,pfSites.out_vcf, new File("batch%d.likelihoods.vcf".format(u._2))))
|
var calcs: List[UGCalcLikelihoods] = allBams.grouped(size).toList.zipWithIndex.map(u => LikelihoodCalc(u._1,ref,ints.intervals,pfSites.out_vcf, new File("batch%d.likelihoods.vcf".format(u._2))))
|
||||||
|
|
||||||
cmds ++= calcs
|
cmds ++= calcs
|
||||||
|
|
||||||
cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, pfSites.out_vcf, mergedVCF)
|
cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, ints.intervals, mergedVCF)
|
||||||
|
|
||||||
return cmds
|
return cmds
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def LikelihoodCalc( bams: List[File], ref: File, alleleVCF: File, outVCF: File ) : UGCalcLikelihoods = {
|
def LikelihoodCalc( bams: List[File], ref: File, intervals: File, alleleVCF: File, outVCF: File ) : UGCalcLikelihoods = {
|
||||||
var calc = new UGCalcLikelihoods
|
var calc = new UGCalcLikelihoods
|
||||||
calc.input_file ++= bams
|
calc.input_file ++= bams
|
||||||
calc.reference_sequence = ref
|
calc.reference_sequence = ref
|
||||||
calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
|
calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
|
||||||
calc.downsample_to_coverage = Some(300)
|
calc.downsample_to_coverage = Some(300)
|
||||||
calc.memoryLimit = if ( bams.size < 5 ) Some(2) else if(bams.size<50) Some(4) else Some(6)
|
calc.memoryLimit = if ( bams.size < 5 ) Some(2) else if(bams.size<50) Some(4) else Some(6)
|
||||||
calc.scatterCount = if (bams.size < 5 ) 1 else if (bams.size < 50) 10 else 50
|
calc.scatterCount = if (bams.size < 5 ) 1 else if (bams.size < 50) 60 else 120
|
||||||
calc.min_base_quality_score = Some(22)
|
calc.min_base_quality_score = Some(22)
|
||||||
calc.min_mapping_quality_score = Some(20)
|
calc.min_mapping_quality_score = Some(20)
|
||||||
calc.genotype = true
|
calc.genotype = true
|
||||||
calc.output_all_callable_bases = true
|
calc.output_all_callable_bases = true
|
||||||
calc.out = outVCF
|
calc.out = outVCF
|
||||||
calc.rodBind :+= new RodBind("allele","VCF",alleleVCF)
|
calc.rodBind :+= new RodBind("allele","VCF",alleleVCF)
|
||||||
calc.BTI = "allele"
|
calc.intervals :+= intervals
|
||||||
|
|
||||||
return calc
|
return calc
|
||||||
}
|
}
|
||||||
|
|
||||||
def VariantCallMerge( likelihoods: List[File], ref: File, intervalVCF: File, output: File) : UGCallVariants = {
|
def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = {
|
||||||
var call = new UGCallVariants
|
var call = new UGCallVariants
|
||||||
call.reference_sequence = ref
|
call.reference_sequence = ref
|
||||||
call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
|
call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
|
||||||
call.rodBind :+= new RodBind("allele","vcf",intervalVCF)
|
call.intervals :+= intervals
|
||||||
call.BTI = "allele"
|
|
||||||
call.memoryLimit = Some(8)
|
call.memoryLimit = Some(8)
|
||||||
call.out = output
|
call.out = output
|
||||||
call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) )
|
call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) )
|
||||||
|
call.scatterCount = 30
|
||||||
|
|
||||||
return call
|
return call
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue