2011-01-20 20:54:03 +08:00
import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction
import org.broadinstitute.sting.queue.QScript
import org.apache.commons.io.FilenameUtils ;
class DistributedGATKPerformance extends QScript {
qscript =>
@Argument ( shortName = "gatk" , doc = "gatk jar file" , required = true )
var gatkJarFile : File = _
@Argument ( shortName = "outputDir" , doc = "output directory" , required = false )
var outputDir : String = ""
2011-01-26 21:45:40 +08:00
@Argument ( shortName = "dataset" , doc = "selects the datasets to run. If not provided, all datasets will be used" , required = false )
2011-01-20 20:54:03 +08:00
var datasets : List [ String ] = Nil
2011-01-25 00:45:07 +08:00
@Argument ( shortName = "waysParallel" , doc = "selects the datasets to run. If not provided, all datasets will be used" , required = false )
var waysParallelArg : List [ Int ] = Nil
2011-01-20 20:54:03 +08:00
@Argument ( shortName = "long" , doc = "runs long calculations" , required = false )
var long : Boolean = false
2011-01-25 00:45:07 +08:00
@Argument ( shortName = "test" , doc = "runs long calculations" , required = false )
var test : Boolean = false
2011-01-26 21:45:40 +08:00
@Argument ( shortName = "limitTo30Min" , doc = "runs long calculations" , required = false )
var limitTo30Min : Boolean = false
@Argument ( shortName = "trackerDir" , doc = "root directory for distributed tracker files" , required = false )
var trackerDir : String = "" // "/humgen/gsa-scr1/depristo/tmp/"
2011-01-20 20:54:03 +08:00
2011-01-25 00:45:07 +08:00
trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "DEBUG" ; jarFile = gatkJarFile ; memoryLimit = Some ( 2 ) ; }
2011-01-20 20:54:03 +08:00
class Target (
val baseName : String ,
val reference : File ,
val dbsnpFile : String ,
val hapmapFile : String ,
val maskFile : String ,
val bamList : File ,
val goldStandard_VCF : File ,
val intervals : String ,
val titvTarget : Double ,
2011-01-26 21:45:40 +08:00
val isLowpass : Boolean ,
val useBAQ : Boolean ) {
2011-01-20 20:54:03 +08:00
val name = qscript . outputDir + baseName
val clusterFile = new File ( name + ".clusters" )
def rawVCF ( part : String ) = new File ( name + "." + part + ".raw.vcf" )
val filteredVCF = new File ( name + ".filtered.vcf" )
val titvRecalibratedVCF = new File ( name + ".titv.recalibrated.vcf" )
val tsRecalibratedVCF = new File ( name + ".ts.recalibrated.vcf" )
val goldStandardName = qscript . outputDir + "goldStandard/" + baseName
val goldStandardClusterFile = new File ( goldStandardName + ".clusters" )
}
val hg18 = new File ( "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta" )
val b36 = new File ( "/humgen/1kg/reference/human_b36_both.fasta" )
val b37 = new File ( "/humgen/1kg/reference/human_g1k_v37.fasta" )
val dbSNP_hg18 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/dbSNP/dbsnp_130_hg18.rod"
val dbSNP_b36 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/dbSNP/dbsnp_130_b36.rod"
val dbSNP_b37 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/dbSNP/dbsnp_132_b37.leftAligned.vcf"
val hapmap_hg18 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/HapMap/3.3/sites_r27_nr.hg18_fwd.vcf"
val hapmap_b36 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/HapMap/3.3/sites_r27_nr.b36_fwd.vcf"
val hapmap_b37 = "/humgen/gsa-hpprojects/GATK/data/Comparisons/Validated/HapMap/3.3/sites_r27_nr.b37_fwd.vcf"
val indelMask_b36 = "/humgen/1kg/processing/pipeline_test_bams/pilot1.dindel.mask.b36.bed"
val indelMask_b37 = "/humgen/1kg/processing/pipeline_test_bams/pilot1.dindel.mask.b37.bed"
// ToDos:
// reduce the scope of the datasets so the script is more nimble
// figure out how to give names to all the Queue-LSF logs (other than Q-1931@node1434-24.out) so that it is easier to find logs for certain steps
// create gold standard BAQ'd bam files, no reason to always do it on the fly
// Analysis to add at the end of the script:
// auto generation of the cluster plots
// spike in NA12878 to the exomes and to the lowpass, analysis of how much of her variants are being recovered compared to single sample exome or HiSeq calls
// produce Kiran's Venn plots based on comparison between new VCF and gold standard produced VCF
val lowPass : Boolean = true
val targetDataSets : Map [ String , Target ] = Map (
"HiSeq" -> new Target ( "NA12878.HiSeq" , hg18 , dbSNP_hg18 , hapmap_hg18 ,
"/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/HiSeq.WGS.cleaned.indels.10.mask" ,
new File ( "/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.HiSeq.WGS.bwa.cleaned.recal.bam" ) ,
new File ( "/home/radon01/depristo/work/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/HiSeq.WGS.cleaned.ug.snpfiltered.indelfiltered.vcf" ) ,
2011-01-26 21:45:40 +08:00
"/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/whole_genome_chunked.hg18.intervals" , 2.07 , ! lowPass , true ) ,
2011-01-20 20:54:03 +08:00
"FIN" -> new Target ( "FIN" , b37 , dbSNP_b37 , hapmap_b37 , indelMask_b37 ,
new File ( "/humgen/1kg/processing/pipeline_test_bams/FIN.79sample.Nov2010.chr20.bam" ) ,
new File ( "/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf" ) , // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
2011-01-26 21:45:40 +08:00
"/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals" , 2.3 , lowPass , true ) ,
2011-01-20 20:54:03 +08:00
"WEx" -> new Target ( "NA12878.WEx" , hg18 , dbSNP_hg18 , hapmap_hg18 ,
"/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/GA2.WEx.cleaned.indels.10.mask" ,
new File ( "/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.WEx.cleaned.recal.bam" ) ,
new File ( "/home/radon01/depristo/work/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/GA2.WEx.cleaned.ug.snpfiltered.indelfiltered.vcf" ) ,
2011-01-26 21:45:40 +08:00
"/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.targets.interval_list" , 2.6 , ! lowPass , true ) ,
2011-01-20 20:54:03 +08:00
"TGPWExGdA" -> new Target ( "1000G.WEx.GdA" , b37 , dbSNP_b37 , hapmap_b37 , indelMask_b37 ,
2011-01-26 21:45:40 +08:00
new File ( "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/Barcoded_1000G_WEx_Reduced_Plate_1.20.cleaned.list" ) , // BUGBUG: reduce from 60 to 20 people
2011-01-20 20:54:03 +08:00
new File ( "/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf" ) , // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
2011-01-26 21:45:40 +08:00
"/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list" , 2.6 , ! lowPass , true ) ,
2011-01-20 20:54:03 +08:00
"LowPassN60" -> new Target ( "lowpass.N60" , b36 , dbSNP_b36 , hapmap_b36 , indelMask_b36 ,
new File ( "/humgen/1kg/analysis/bamsForDataProcessingPapers/lowpass_b36/lowpass.chr20.cleaned.matefixed.bam" ) , // the bam list to call from
new File ( "/home/radon01/depristo/work/oneOffProjects/VQSRCutByNRS/lowpass.N60.chr20.filtered.vcf" ) , // the gold standard VCF file to run through the VQSR
2011-01-26 21:45:40 +08:00
"/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.b36.intervals" , 2.3 , lowPass , true ) , // chunked interval list to use with Queue's scatter/gather functionality
// "LowPassAugust" -> new Target("ALL.august.v4", b37, dbSNP_b37, hapmap_b37, indelMask_b37, // BUGBUG: kill this, it is too large
// new File("/humgen/1kg/processing/allPopulations_chr20_august_release.cleaned.merged.bams/ALL.cleaned.merged.list"),
// new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"),
// "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, false),
2011-01-20 20:54:03 +08:00
"LowPassEUR363Nov" -> new Target ( "EUR.nov2010" , b37 , dbSNP_b37 , hapmap_b37 , indelMask_b37 ,
new File ( "/humgen/1kg/processing/pipeline_test_bams/EUR.363sample.Nov2010.chr20.bam" ) ,
new File ( "/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf" ) , // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
2011-01-26 21:45:40 +08:00
"/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals" , 2.3 , lowPass , false ) ,
2011-01-20 20:54:03 +08:00
"WExTrio" -> new Target ( "NA12878Trio.WEx" , b37 , dbSNP_b37 , hapmap_b37 , indelMask_b37 ,
2011-01-26 21:45:40 +08:00
new File ( "/humgen/gsa-hpprojects/NA12878Collection/bams/CEUTrio.HiSeq.WEx.bwa.cleaned.recal.bams.list" ) ,
2011-01-20 20:54:03 +08:00
new File ( "/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf" ) , // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
2011-01-26 21:45:40 +08:00
"/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list" , 2.6 , ! lowPass , true )
2011-01-20 20:54:03 +08:00
)
2011-01-26 21:45:40 +08:00
def getTargetInterval ( target : Target ) : List [ String ] = target . name match {
case "NA12878.HiSeq" => List ( "chr1" )
case "FIN" => List ( "20" )
case _ => List ( target . intervals )
}
2011-01-20 20:54:03 +08:00
def script = {
// Selects the datasets in the -dataset argument and adds them to targets.
var targets : List [ Target ] = List ( )
2011-01-26 21:45:40 +08:00
if ( ! datasets . isEmpty )
for ( ds <- datasets )
targets : := targetDataSets ( ds ) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way?
else // If -dataset is not specified, all datasets are used.
for ( targetDS <- targetDataSets . valuesIterator ) // for Scala 2.7 or older, use targetDataSets.values
targets : := targetDS
val nWays = if ( test ) List ( 32 ) else { if ( long ) List ( 1 , 2 , 4 , 8 ) else List ( 16 , 32 , 64 , 128 ) }
2011-01-22 02:39:25 +08:00
//val nWays = List(2)
2011-01-20 20:54:03 +08:00
for ( target <- targets ) {
2011-01-25 00:45:07 +08:00
for ( scatterP <- if ( test ) List ( false ) else List ( true , false ) )
2011-01-26 21:45:40 +08:00
for ( nWaysParallel <- nWays ) {
2011-01-22 02:39:25 +08:00
val aname = "ptype_%s.nways_%d" . format ( if ( scatterP ) "sg" else "dist" , nWaysParallel )
def addUG ( ug : UnifiedGenotyper ) = {
if ( ! long )
ug . jobLimitSeconds = Some ( 60 * 60 * 4 )
2011-01-26 21:45:40 +08:00
if ( limitTo30Min )
2011-01-25 00:45:07 +08:00
ug . jobLimitSeconds = Some ( 60 * 30 )
2011-01-22 02:39:25 +08:00
add ( ug ) ;
}
// add scatter/gather or distributed parallelism
if ( scatterP ) {
var ug : UnifiedGenotyper = new UnifiedGenotyper ( target , aname )
ug . scatterCount = nWaysParallel
ug . intervalsString ++= List ( target . intervals )
addUG ( ug )
} else {
2011-01-20 20:54:03 +08:00
for ( part <- 1 to nWaysParallel ) {
2011-01-22 02:39:25 +08:00
var ug : UnifiedGenotyper = new UnifiedGenotyper ( target , aname + ".part" + part )
2011-01-26 21:45:40 +08:00
ug . intervalsString ++= getTargetInterval ( target )
ug . processingTracker = new File ( trackerDir + target . name + "." + aname + ".distributed.txt" )
ug . processingTrackerID = Some ( part )
2011-01-22 02:39:25 +08:00
if ( part == 1 )
ug . performanceLog = new File ( "%s.%s.pf.log" . format ( target . name , aname ) )
2011-01-25 00:45:07 +08:00
ug . processingTrackerStatusFile = new File ( "%s.%s.%d.ptstatus.log" . format ( target . name , aname , part ) )
2011-01-22 02:39:25 +08:00
addUG ( ug )
2011-01-20 20:54:03 +08:00
}
2011-01-22 02:39:25 +08:00
}
2011-01-20 20:54:03 +08:00
}
}
}
// 1.) Call SNPs with UG
2011-01-22 02:39:25 +08:00
class UnifiedGenotyper ( t : Target , aname : String ) extends org . broadinstitute . sting . queue . extensions . gatk . UnifiedGenotyper with UNIVERSAL_GATK_ARGS {
2011-01-20 20:54:03 +08:00
this . reference_sequence = t . reference
this . dcov = Some ( if ( t . isLowpass ) { 50 } else { 250 } )
this . stand_call_conf = Some ( if ( t . isLowpass ) { 4.0 } else { 30.0 } )
this . stand_emit_conf = Some ( if ( t . isLowpass ) { 4.0 } else { 30.0 } )
this . input_file : += t.bamList
2011-01-22 02:39:25 +08:00
this . out = t . rawVCF ( aname )
2011-01-26 21:45:40 +08:00
this . baq = Some ( if ( t . useBAQ ) { org . broadinstitute . sting . utils . baq . BAQ . CalculationMode . RECALCULATE } else { org . broadinstitute . sting . utils . baq . BAQ . CalculationMode . OFF } )
2011-01-22 02:39:25 +08:00
this . analysisName = t . name + "_UG." + aname
2011-01-20 20:54:03 +08:00
if ( t . dbsnpFile . endsWith ( ".rod" ) )
this . DBSNP = new File ( t . dbsnpFile )
else if ( t . dbsnpFile . endsWith ( ".vcf" ) )
this . rodBind : += RodBind ( " dbsnp " , " VCF " , t . dbsnpFile )
}
}