Added a pipeline for merging batches. For now takes a file containing a list of VCFs, and a file containing a list of bams. Does not do anything smart (e.g. if you leave out some .bams or add some extra ones, you will not be warned). Heavy lifting done in (the beginnings of) a library for managing multi-batch or multi-project tasks.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4771 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
chartl 2010-12-02 07:31:59 +00:00
parent 99b942b0b4
commit 220fb0c44a
2 changed files with 120 additions and 0 deletions

View File

@ -0,0 +1,39 @@
import java.io.{FileReader, BufferedReader}
import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.queue.extensions.gatk.CommandLineGATK
import org.broadinstitute.sting.queue.pipeline.{ProjectManagement, BamProcessing, VariantCalling}
import org.broadinstitute.sting.queue.{QException, QScript}
import collection.JavaConversions._
import org.broadinstitute.sting.utils.yaml.YamlUtils
class batchMergePipeline extends QScript {
batchMerge =>
@Argument(doc="VCF list",shortName="vcfs") var vcfList: File = _
@Argument(doc="bam list",shortName="bams") var bamList: File = _
@Argument(doc="sting dir",shortName="sting") var stingDir: String = _
@Argument(doc="reference file",shortName="ref") var ref: File = _
@Argument(doc="batched output",shortName="batch") var batchOut: File = _
def addAll( cmds : List[CommandLineFunction]) : Unit = { cmds.foreach( c => add(c)) }
def script = {
var vcfs : List[File] = extractFileEntries(vcfList)
var bams : List[File] = extractFileEntries(bamList)
var pmLib = new ProjectManagement(stingDir)
addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref))
}
def extractFileEntries(in: File): List[File] = {
var reader : BufferedReader = new BufferedReader(new FileReader(in))
var entries : List[File] = Nil
var line : String = reader.readLine
while ( line != null ) {
entries :+= new File(line)
line = reader.readLine()
}
return entries
}
}

View File

@ -0,0 +1,81 @@
package org.broadinstitute.sting.queue.pipeline
import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util._
import java.io.File
import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.gatk.DownsampleType
import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.queue.function.CommandLineFunction
class ProjectManagement(stingPath: String) {
pm =>
var stingDirPath : String = stingPath
class PassFilterSites(vcf_files: List[File], out_list: File) extends CommandLineFunction {
@Input(doc="List of VCFs to extract PF sites from") var vcfs = vcf_files
@Output(doc="The file to write the site list to") var out_intervals = out_list
@Argument(doc="Path to the SortByRef script") var sortByRef: String = _
@Argument(doc="Path to the reference file on disk") var ref: File = _
def commandLine = {
"grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3}' | sort -n -k2,2 | uniq | perl %s - %s.fai | awk '{print $1\":\"$2}' > %s".format(
vcf_files.foldLeft[String]("")( (b,a) => b + " " + a.getAbsolutePath), sortByRef, ref.getAbsolutePath, out_list.getAbsolutePath
)
}
}
def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = {
var cmds : List[CommandLineFunction] = Nil
var pfSites : PassFilterSites = new PassFilterSites(callVCFs,swapExt(mergedVCF,".vcf",".pf.intervals.list"))
pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl"
pfSites.ref = ref
cmds :+= pfSites
var calcs: List[UGCalcLikelihoods] = allBams.map( a => LikelihoodCalc(a,ref,pfSites.out_intervals) )
cmds ++= calcs
cmds :+= VariantCallMerge(calcs.map( a => a.out), ref, pfSites.out_intervals, mergedVCF)
return cmds
}
def LikelihoodCalc( bam: File, ref: File, intervals: File ) : UGCalcLikelihoods = {
var calc = new UGCalcLikelihoods
calc.input_file :+= bam
calc.reference_sequence = ref
calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
calc.intervals :+= intervals
calc.downsample_to_coverage = Some(300)
calc.memoryLimit = Some(2)
calc.min_base_quality_score = Some(22)
calc.min_mapping_quality_score = Some(20)
calc.genotype = true
calc.output_all_callable_bases = true
calc.out = swapExt(bam,".bam",".likelihoods.vcf")
return calc
}
def VariantCallMerge( likelihoods: List[File], ref: File, intervals: File, output: File) : UGCallVariants = {
var call = new UGCallVariants
call.reference_sequence = ref
call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar")
call.intervals :+= intervals
call.memoryLimit = Some(4)
call.out = output
call.rodBind ++= likelihoods.map( a => new RodBind(a.getName.replace(".vcf",""),"vcf",a) )
return call
}
def swapExt(file: File, oldExtension: String, newExtension: String) =
new File(file.getName.stripSuffix(oldExtension) + newExtension)
}