diff --git a/scala/qscript/chartl/batchMergePipeline.q b/scala/qscript/chartl/batchMergePipeline.q index f78e19a8c..92042c873 100755 --- a/scala/qscript/chartl/batchMergePipeline.q +++ b/scala/qscript/chartl/batchMergePipeline.q @@ -4,6 +4,7 @@ import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK import org.broadinstitute.sting.queue.pipeline.{ProjectManagement, BamProcessing, VariantCalling} import org.broadinstitute.sting.queue.{QException, QScript} import collection.JavaConversions._ +import org.broadinstitute.sting.utils.text.XReadLines import org.broadinstitute.sting.utils.yaml.YamlUtils class batchMergePipeline extends QScript { @@ -22,18 +23,10 @@ class batchMergePipeline extends QScript { var vcfs : List[File] = extractFileEntries(vcfList) var bams : List[File] = extractFileEntries(bamList) var pmLib = new ProjectManagement(stingDir) - addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref,25)) + addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref,20)) } def extractFileEntries(in: File): List[File] = { - var reader : BufferedReader = new BufferedReader(new FileReader(in)) - var entries : List[File] = Nil - var line : String = reader.readLine - while ( line != null ) { - entries :+= new File(line) - line = reader.readLine() - } - - return entries + return (new XReadLines(in)).readLines.toList.map( new File(_) ) } } \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index 78206816a..256aab583 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -27,6 +27,7 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { def isScatterGatherable(originalFunction: ScatterGatherableFunction) = { if (originalFunction.isInstanceOf[CommandLineGATK]) { val gatk = originalFunction.asInstanceOf[CommandLineGATK] + if ( gatk.BTI != null && gatk.BTIMR == null) throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)") gatk.reference_sequence != null } else false } diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala index 008b82515..8592305f4 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala @@ -10,6 +10,7 @@ import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.queue.function.CommandLineFunction class ProjectManagement(stingPath: String) { + // TODO -- MAJOR scatter/gather numbers are hard-coded. Really need a function to set it to the right number. These are optimized to get everything on 'hour' pm => @@ -28,6 +29,15 @@ class ProjectManagement(stingPath: String) { } } + class Vcf2Intervals(in_vcf: File, out_intervals: File) extends CommandLineFunction { + @Input(doc="The VCF to convert") var vcf: File = in_vcf + @Output(doc="The output bed file") var intervals: File = out_intervals + + def commandLine = { + "grep -v \\\\# %s | awk '{print $1\":\"$2}' | uniq > %s".format(vcf.getAbsolutePath,intervals.getAbsolutePath) + } + } + def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File, size: Int) : List[CommandLineFunction] = { var cmds : List[CommandLineFunction] = Nil var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf")) @@ -35,45 +45,49 @@ class ProjectManagement(stingPath: String) { pfSites.ref = ref cmds :+= pfSites + + var ints : Vcf2Intervals = new Vcf2Intervals(pfSites.out_vcf,swapExt(pfSites.out_vcf,".vcf",".intervals.list")) + + cmds :+= ints - var calcs: List[UGCalcLikelihoods] = allBams.grouped(size).toList.zipWithIndex.map(u => LikelihoodCalc(u._1,ref,pfSites.out_vcf, new File("batch%d.likelihoods.vcf".format(u._2)))) + var calcs: List[UGCalcLikelihoods] = allBams.grouped(size).toList.zipWithIndex.map(u => LikelihoodCalc(u._1,ref,ints.intervals,pfSites.out_vcf, new File("batch%d.likelihoods.vcf".format(u._2)))) cmds ++= calcs - cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, pfSites.out_vcf, mergedVCF) + cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, ints.intervals, mergedVCF) return cmds } - def LikelihoodCalc( bams: List[File], ref: File, alleleVCF: File, outVCF: File ) : UGCalcLikelihoods = { + def LikelihoodCalc( bams: List[File], ref: File, intervals: File, alleleVCF: File, outVCF: File ) : UGCalcLikelihoods = { var calc = new UGCalcLikelihoods calc.input_file ++= bams calc.reference_sequence = ref calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") calc.downsample_to_coverage = Some(300) calc.memoryLimit = if ( bams.size < 5 ) Some(2) else if(bams.size<50) Some(4) else Some(6) - calc.scatterCount = if (bams.size < 5 ) 1 else if (bams.size < 50) 10 else 50 + calc.scatterCount = if (bams.size < 5 ) 1 else if (bams.size < 50) 60 else 120 calc.min_base_quality_score = Some(22) calc.min_mapping_quality_score = Some(20) calc.genotype = true calc.output_all_callable_bases = true calc.out = outVCF calc.rodBind :+= new RodBind("allele","VCF",alleleVCF) - calc.BTI = "allele" + calc.intervals :+= intervals return calc } - def VariantCallMerge( likelihoods: List[File], ref: File, intervalVCF: File, output: File) : UGCallVariants = { + def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = { var call = new UGCallVariants call.reference_sequence = ref call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") - call.rodBind :+= new RodBind("allele","vcf",intervalVCF) - call.BTI = "allele" + call.intervals :+= intervals call.memoryLimit = Some(8) call.out = output call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) ) + call.scatterCount = 30 return call }