2010-12-02 15:31:59 +08:00
package org.broadinstitute.sting.queue.pipeline
import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util._
import java.io.File
import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.gatk.DownsampleType
import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.queue.function.CommandLineFunction
class ProjectManagement ( stingPath : String ) {
2010-12-08 22:36:23 +08:00
// TODO -- MAJOR scatter/gather numbers are hard-coded. Really need a function to set it to the right number. These are optimized to get everything on 'hour'
2010-12-02 15:31:59 +08:00
pm =>
var stingDirPath : String = stingPath
2010-12-12 13:10:45 +08:00
def this ( f : File ) = this ( f . getAbsolutePath )
2010-12-05 10:33:54 +08:00
class PassFilterAlleles ( vcf_files : List [ File ] , out_list : File ) extends CommandLineFunction {
2010-12-02 15:31:59 +08:00
@Input ( doc = "List of VCFs to extract PF sites from" ) var vcfs = vcf_files
2010-12-07 04:12:23 +08:00
@Output ( doc = "The file to write the site list to" ) var out_vcf = out_list
2010-12-02 15:31:59 +08:00
@Argument ( doc = "Path to the SortByRef script" ) var sortByRef : String = _
@Argument ( doc = "Path to the reference file on disk" ) var ref : File = _
def commandLine = {
2010-12-05 10:33:54 +08:00
"egrep \"FORMAT|format\" %s | cut -f1-8 > %s ; grep PASS %s | tr ':' '\\t' | awk '{print $2\"\\t\"$3\"\\t\"$4\"\\t\"$5\"\\t\"$6\"\\t.\\t.\\t.\"}' | sort -n -k2,2 | uniq | perl %s - %s.fai >> %s" . format (
2010-12-12 13:10:45 +08:00
vcf_files ( 0 ) . getAbsolutePath , out_vcf . getAbsolutePath , vcf_files . foldLeft [ String ] ( "" ) ( ( b , a ) => b + " " + a . getAbsolutePath ) , sortByRef , ref . getAbsolutePath , out_vcf . getAbsolutePath
2010-12-02 15:31:59 +08:00
)
}
}
2010-12-08 22:36:23 +08:00
class Vcf2Intervals ( in_vcf : File , out_intervals : File ) extends CommandLineFunction {
@Input ( doc = "The VCF to convert" ) var vcf : File = in_vcf
@Output ( doc = "The output bed file" ) var intervals : File = out_intervals
def commandLine = {
"grep -v \\\\# %s | awk '{print $1\":\"$2}' | uniq > %s" . format ( vcf . getAbsolutePath , intervals . getAbsolutePath )
}
}
2010-12-07 02:23:54 +08:00
def MergeBatches ( callVCFs : List [ File ] , allBams : List [ File ] , mergedVCF : File , ref : File , size : Int ) : List [ CommandLineFunction ] = {
2010-12-02 15:31:59 +08:00
var cmds : List [ CommandLineFunction ] = Nil
2010-12-05 10:33:54 +08:00
var pfSites : PassFilterAlleles = new PassFilterAlleles ( callVCFs , swapExt ( mergedVCF , ".vcf" , ".pf.alleles.vcf" ) )
2010-12-02 15:31:59 +08:00
pfSites . sortByRef = pm . stingDirPath + "perl/sortByRef.pl"
pfSites . ref = ref
cmds : += pfSites
2010-12-08 22:36:23 +08:00
var ints : Vcf2Intervals = new Vcf2Intervals ( pfSites . out_vcf , swapExt ( pfSites . out_vcf , ".vcf" , ".intervals.list" ) )
cmds : += ints
2010-12-02 15:31:59 +08:00
2010-12-12 13:10:45 +08:00
var calcs : List [ UGCalcLikelihoods ] = allBams . grouped ( size ) . toList . zipWithIndex . map ( u => LikelihoodCalc ( u . _1 , ref , ints . intervals , pfSites . out_vcf , new File ( "MBatch%d.likelihoods.vcf" . format ( u . _2 ) ) ) )
2010-12-02 15:31:59 +08:00
cmds ++= calcs
2010-12-08 22:36:23 +08:00
cmds : += VariantCallMerge ( calcs . map ( a => a . out ) , ref , ints . intervals , mergedVCF )
2010-12-02 15:31:59 +08:00
return cmds
}
2010-12-07 04:12:23 +08:00
2010-12-08 22:36:23 +08:00
def LikelihoodCalc ( bams : List [ File ] , ref : File , intervals : File , alleleVCF : File , outVCF : File ) : UGCalcLikelihoods = {
2010-12-02 15:31:59 +08:00
var calc = new UGCalcLikelihoods
2010-12-07 02:23:54 +08:00
calc . input_file ++= bams
2010-12-02 15:31:59 +08:00
calc . reference_sequence = ref
calc . jarFile = new File ( pm . stingDirPath + "dist/GenomeAnalysisTK.jar" )
calc . downsample_to_coverage = Some ( 300 )
2010-12-07 02:23:54 +08:00
calc . memoryLimit = if ( bams . size < 5 ) Some ( 2 ) else if ( bams . size < 50 ) Some ( 4 ) else Some ( 6 )
2010-12-08 22:36:23 +08:00
calc . scatterCount = if ( bams . size < 5 ) 1 else if ( bams . size < 50 ) 60 else 120
2010-12-02 15:31:59 +08:00
calc . min_base_quality_score = Some ( 22 )
calc . min_mapping_quality_score = Some ( 20 )
calc . genotype = true
calc . output_all_callable_bases = true
2010-12-07 02:23:54 +08:00
calc . out = outVCF
2010-12-05 10:33:54 +08:00
calc . rodBind : += new RodBind ( " allele " , " VCF " , alleleVCF )
2010-12-08 22:36:23 +08:00
calc . intervals : += intervals
2010-12-02 15:31:59 +08:00
return calc
}
2010-12-08 22:36:23 +08:00
def VariantCallMerge ( likelihoods : List [ File ] , ref : File , intervals : File , output : File ) : UGCallVariants = {
2010-12-02 15:31:59 +08:00
var call = new UGCallVariants
call . reference_sequence = ref
call . jarFile = new File ( pm . stingDirPath + "dist/GenomeAnalysisTK.jar" )
2010-12-08 22:36:23 +08:00
call . intervals : += intervals
2010-12-07 02:23:54 +08:00
call . memoryLimit = Some ( 8 )
2010-12-02 15:31:59 +08:00
call . out = output
2010-12-03 02:39:59 +08:00
call . rodBind ++= likelihoods . map ( a => new RodBind ( "variant" + a . getName . replace ( ".vcf" , "" ) , "vcf" , a ) )
2010-12-08 22:36:23 +08:00
call . scatterCount = 30
2010-12-02 15:31:59 +08:00
return call
}
def swapExt ( file : File , oldExtension : String , newExtension : String ) =
new File ( file . getName . stripSuffix ( oldExtension ) + newExtension )
}