diff --git a/scala/qscript/chartl/batchMergePipeline.q b/scala/qscript/chartl/batchMergePipeline.q new file mode 100755 index 000000000..b00dfe7c0 --- /dev/null +++ b/scala/qscript/chartl/batchMergePipeline.q @@ -0,0 +1,39 @@ +import java.io.{FileReader, BufferedReader} +import org.broadinstitute.sting.datasources.pipeline.Pipeline +import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK +import org.broadinstitute.sting.queue.pipeline.{ProjectManagement, BamProcessing, VariantCalling} +import org.broadinstitute.sting.queue.{QException, QScript} +import collection.JavaConversions._ +import org.broadinstitute.sting.utils.yaml.YamlUtils + +class batchMergePipeline extends QScript { + batchMerge => + + @Argument(doc="VCF list",shortName="vcfs") var vcfList: File = _ + @Argument(doc="bam list",shortName="bams") var bamList: File = _ + @Argument(doc="sting dir",shortName="sting") var stingDir: String = _ + @Argument(doc="reference file",shortName="ref") var ref: File = _ + @Argument(doc="batched output",shortName="batch") var batchOut: File = _ + + def addAll( cmds : List[CommandLineFunction]) : Unit = { cmds.foreach( c => add(c)) } + + def script = { + + var vcfs : List[File] = extractFileEntries(vcfList) + var bams : List[File] = extractFileEntries(bamList) + var pmLib = new ProjectManagement(stingDir) + addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref)) + } + + def extractFileEntries(in: File): List[File] = { + var reader : BufferedReader = new BufferedReader(new FileReader(in)) + var entries : List[File] = Nil + var line : String = reader.readLine + while ( line != null ) { + entries :+= new File(line) + line = reader.readLine() + } + + return entries + } +} \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala new file mode 100755 index 000000000..6cb7f7700 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala @@ -0,0 +1,81 @@ +package org.broadinstitute.sting.queue.pipeline + +import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util._ +import java.io.File +import org.broadinstitute.sting.datasources.pipeline.Pipeline +import org.broadinstitute.sting.gatk.DownsampleType +import org.broadinstitute.sting.queue.extensions.gatk._ +import org.broadinstitute.sting.utils.yaml.YamlUtils +import org.broadinstitute.sting.queue.function.CommandLineFunction + +class ProjectManagement(stingPath: String) { + + pm => + + var stingDirPath : String = stingPath + + class PassFilterSites(vcf_files: List[File], out_list: File) extends CommandLineFunction { + @Input(doc="List of VCFs to extract PF sites from") var vcfs = vcf_files + @Output(doc="The file to write the site list to") var out_intervals = out_list + @Argument(doc="Path to the SortByRef script") var sortByRef: String = _ + @Argument(doc="Path to the reference file on disk") var ref: File = _ + + def commandLine = { + "grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3}' | sort -n -k2,2 | uniq | perl %s - %s.fai | awk '{print $1\":\"$2}' > %s".format( + vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath + ) + } + } + + def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = { + var cmds : List[CommandLineFunction] = Nil + var pfSites : PassFilterSites = new PassFilterSites(callVCFs,swapExt(mergedVCF,".vcf",".pf.intervals.list")) + pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl" + pfSites.ref = ref + + cmds :+= pfSites + + var calcs: List[UGCalcLikelihoods] = allBams.map( a => LikelihoodCalc(a,ref,pfSites.out_intervals) ) + + cmds ++= calcs + + cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, pfSites.out_intervals, mergedVCF) + + return cmds + + } + + def LikelihoodCalc( bam: File, ref: File, intervals: File ) : UGCalcLikelihoods = { + var calc = new UGCalcLikelihoods + calc.input_file :+= bam + calc.reference_sequence = ref + calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") + calc.intervals :+= intervals + calc.downsample_to_coverage = Some(300) + calc.memoryLimit = Some(2) + calc.min_base_quality_score = Some(22) + calc.min_mapping_quality_score = Some(20) + calc.genotype = true + calc.output_all_callable_bases = true + calc.out = swapExt(bam,".bam",".likelihoods.vcf") + + return calc + } + + def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = { + var call = new UGCallVariants + call.reference_sequence = ref + call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") + call.intervals :+= intervals + call.memoryLimit = Some(4) + call.out = output + call.rodBind ++= likelihoods.map( a => new RodBind(a.getName.replace(".vcf",""),"vcf",a) ) + + return call + } + + def swapExt(file: File, oldExtension: String, newExtension: String) = + new File(file.getName.stripSuffix(oldExtension) + newExtension) + +} \ No newline at end of file