From 45c794d0665acf183472e2298405012617a6cead Mon Sep 17 00:00:00 2001 From: ebanks Date: Fri, 11 Sep 2009 13:09:37 +0000 Subject: [PATCH] pipeline is complete git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@1583 348d0f76-0448-11de-a6fe-93d51630548a --- python/RunPilot2Pipeline.py | 174 +++++++++++++++++++----------------- 1 file changed, 91 insertions(+), 83 deletions(-) diff --git a/python/RunPilot2Pipeline.py b/python/RunPilot2Pipeline.py index dd7cf4888..5f6ee69f5 100755 --- a/python/RunPilot2Pipeline.py +++ b/python/RunPilot2Pipeline.py @@ -41,19 +41,6 @@ if __name__ == "__main__": def outputDir(suffix): return os.path.join(OPTIONS.outputdir, suffix) - intervals_dir = outputDir("intervals") - cleaner_output = outputDir("cleaner") - injector_output = outputDir("bams") - snp_output = outputDir("calls/unfiltered_snps") - filter_output = outputDir("calls/filtered_snps") - indel_output = outputDir("calls/indels") - final_bam_dir = "/humgen/gsa-hphome1/projects/1kg_pilot2/useTheseBamsForAnalyses" - - samples = ["NA12878","NA12891","NA12892","NA19238","NA19239","NA19240"] - techs = ["SLX"] - chrs = range(1, 23) + ["X"] - DoCs = [82,91,70,56,68,86] - # Official genome-wide Depth of Coverage tables for pilot 2, freeze 5: # NA12878 NA12891 NA12892 NA19238 NA19239 NA19240 # 454: 36 18 @@ -61,12 +48,31 @@ if __name__ == "__main__": # SOLID: 37 64 # 454+SLD: 64 77 # ALL: 138 150 + DoC_454 = {"NA12878":36,"NA19240":18} + DoC_slx = {"NA12878":82,"NA12891":91,"NA12892":70,"NA19238":56,"NA19239":68,"NA19240":86} + DoC_solid = {"NA12878":37,"NA19240":64} + DoC_454solid = {"NA12878":64,"NA19240":77} + DoC_all = {"NA12878":138,"NA19240":150} + DoC_hash = {"454":DoC_454,"SLX":DoC_slx,"SOLID":DoC_solid,"454SOLID":DoC_454solid,"ALL":DoC_all} + MQ_hash = {"SLX":100,"SOLID":5,"454":5,"454SOLID":10,"ALL":110} - for sample, DoC in zip(samples, DoCs): + intervals_dir = outputDir("intervals") + cleaner_output = outputDir("cleaner") + injector_output = outputDir("bams") + snp_output = outputDir("calls/unfiltered_snps") + filter_output = outputDir("calls/filtered_snps") + indel_output = outputDir("calls/indels") + final_bam_dir = outputDir("useTheseBamsForAnalyses") + #final_bam_dir = "/humgen/gsa-hphome1/projects/1kg_pilot2/useTheseBamsForAnalyses" + + samples = ["NA12878","NA12891","NA12892","NA19238","NA19239","NA19240"] + techs = ["SLX"] + chrs = range(1, 23) + ["X"] + + for sample in samples: # # Actually do some work here # - MQs = [100,5,5] def finalBam(tech): return os.path.join(final_bam_dir, "%s.%s.bam" % ( sample, tech )) def outputFileTech(root, tech, name): @@ -80,124 +86,126 @@ if __name__ == "__main__": if sample in ["NA12878", "NA19240"]: myTechs = techs + ["SOLID","454"] - for tech,mappingQuality in zip(myTechs,MQs): + for tech in myTechs: - myChrs = chrs - if sample in ["NA12891", "NA19239"]: - myChrs = chrs + ["Y"] - def badSnpsChr( tech, chr ): - return os.path.join(cleaner_output, "%s.chr%s.%s.realigner.badsnps" % ( sample, chr, tech )) + if ( tech != "454" ): + myChrs = chrs + if sample in ["NA12891", "NA19239"]: + myChrs = chrs + ["Y"] + def badSnpsChr( tech, chr ): + return os.path.join(cleaner_output, "%s.chr%s.%s.realigner.badsnps" % ( sample, chr, tech )) - for chr in myChrs: + def makeJobName(suffix): + return sample + "." + tech + "." + suffix + def makeJobClass(suffix): + return sample + ".*." + suffix - bam = "/broad/1KG/DCC/ftp/pilot_data/%s/alignment/%s.chrom%s.%s.SRP000032.2009_07.bam" % ( sample, sample, chr, tech ) + for chr in myChrs: - def outputFile(root, name): - return os.path.join(root, "%s.chr%s.%s.%s" % ( sample, chr, tech, name )) - def MismatchIntervals(bam, outputFile, intervals): - return config.gatkCmd('MismatchIntervals') + " -o " + outputFile + " -L " + intervals + " -I " + bam - def IndelIntervals(bam, outputFile, intervals): - return config.gatkCmd('IndelIntervals') + " -o " + outputFile + " -L " + intervals + " -I " + bam - def MergeIntervals(bam, files, outputFile, intervals): - return config.gatkCmd('MergeIntervals') + " -o " + outputFile + " ".join(map( lambda x: " -intervals " + x, files )) + " -L " + intervals + " -I " + bam - def CleanIntervals(bam, outputFile, intervals, snpfile): - return config.gatkCmd('IntervalCleaner') + " -O " + outputFile + " -L " + intervals + " -I " + bam - def Injector(bam, outputFile, intervals, inputfile): - return config.gatkCmd('CleanedReadInjector') + " --output_bam " + outputFile + " -L " + intervals + " -I " + bam + " --cleaned_reads " + inputfile + bam = "/broad/1KG/DCC/ftp/pilot_data/%s/alignment/%s.chrom%s.%s.SRP000032.2009_07.bam" % ( sample, sample, chr, tech ) - jobid = None - mergeid = None - bamToCallFrom = finalBam(tech) + def outputFile(root, name): + return os.path.join(root, "%s.chr%s.%s.%s" % ( sample, chr, tech, name )) + def MismatchIntervals(bam, outputFile, intervals): + return config.gatkCmd('MismatchIntervals') + " -o " + outputFile + " -L " + intervals + " -I " + bam + def IndelIntervals(bam, outputFile, intervals): + return config.gatkCmd('IndelIntervals') + " -o " + outputFile + " -L " + intervals + " -I " + bam + def MergeIntervals(bam, files, outputFile, intervals): + return config.gatkCmd('MergeIntervals') + " -o " + outputFile + " ".join(map( lambda x: " -intervals " + x, files )) + " -L " + intervals + " -I " + bam + def CleanIntervals(bam, outputFile, intervals, snpfile): + return config.gatkCmd('IntervalCleaner') + " -O " + outputFile + " -L " + intervals + " -I " + bam + def Injector(bam, outputFile, intervals, inputfile): + return config.gatkCmd('CleanedReadInjector') + " --output_bam " + outputFile + " -L " + intervals + " -I " + bam + " --cleaned_reads " + inputfile + + jobid = None - if ( tech != "454" ): mismatchIntervalsFile = outputFile(intervals_dir, "mismatches.intervals") cmd = MismatchIntervals(bam, mismatchIntervalsFile, str(chr)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, mismatchIntervalsFile, just_print_commands = OPTIONS.dry, waitID = jobid) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, mismatchIntervalsFile, just_print_commands = OPTIONS.dry, waitID = None, jobName = makeJobName("phase1." + str(chr))) indelIntervalsFile = outputFile(intervals_dir, "indels.intervals") cmd = IndelIntervals(bam, indelIntervalsFile, str(chr)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelIntervalsFile, just_print_commands = OPTIONS.dry, waitID = jobid) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelIntervalsFile, just_print_commands = OPTIONS.dry, waitID = None, jobName = makeJobName("phase1." + str(chr))) mergedIntervalsFile = outputFile(intervals_dir, "merged.intervals") cmd = MergeIntervals(bam, [mismatchIntervalsFile, indelIntervalsFile], mergedIntervalsFile, str(chr)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, mergedIntervalsFile, just_print_commands = OPTIONS.dry, waitID = jobid) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, mergedIntervalsFile, just_print_commands = OPTIONS.dry, waitID = makeJobName("phase1." + str(chr))) cleanedFile = outputFile(cleaner_output, "bam") badsnpsFile = badSnpsChr(tech, str(chr)) cmd = CleanIntervals(bam, cleanedFile, mergedIntervalsFile, badsnpsFile) jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, cleanedFile, just_print_commands = OPTIONS.dry, waitID = jobid) - injectedFile = outputFile(injector_output, "bam") cmd = Injector(bam, injectedFile, str(chr), cleanedFile) jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, injectedFile, just_print_commands = OPTIONS.dry, waitID = jobid) -# HOW DO I CALL MergeBAMBatch FROM HERE? -# def MergeBams(): -# return "MergeBAMBatch.py -d " + final_bam_dir + " -q " + OPTIONS.farmQueue + " -s '" + bamToCallFrom + "\t" + os.path.join(injector_output, "%s.chr*.%s.bam" % ( sample, tech )) + "'" -# cmd = MergeBams() -# jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, bamToCallFrom, just_print_commands = OPTIONS.dry, waitID = jobid) - mergeid = jobid + cmd = "samtools index " + injectedFile + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, injectedFile + ".bai", just_print_commands = OPTIONS.dry, waitID = jobid, jobName = makeJobName("phase2")) - cmd = "cat " - for chr in myChrs: - cmd = cmd + " " + badSnpsChr(tech, chr) - cmd = cmd + " > " + badSnps(tech) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, badSnps(tech), just_print_commands = OPTIONS.dry, waitID = mergeid) + def MergeBams(outputFile): + return "MergeBAMBatch.py -d " + cleaner_output + " -q " + OPTIONS.farmQueue + " -s '" + outputFile + "\t" + os.path.join(cleaner_output, "%s.chr*.%s.bam" % ( sample, tech )) + "'" + + cmd = MergeBams(finalBam(tech)) + mergeJobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, finalBam(tech), just_print_commands = OPTIONS.dry, waitID = makeJobName("phase2"), jobName = makeJobName("phase3")) + + cmd = "cat " + for chr in myChrs: + cmd = cmd + " " + badSnpsChr(tech, chr) + cmd = cmd + " > " + badSnps(tech) + badsnpsJobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, badSnps(tech), just_print_commands = OPTIONS.dry, waitID = makeJobName("phase2"), jobName = makeJobName("phase4")) def IndelCaller(bam, outputFile, fraction): return config.gatkCmd('IndelGenotyper') + " -O " + outputFile + " -I " + bam + " -minFraction " + fraction def SnpCaller(bam, outputFile): - return config.gatkCmd('SingleSampleGenotyper') + " -o " + outputFile + " -I " + bam + return config.gatkCmd('SingleSampleGenotyper') + " -varout " + outputFile + " -I " + bam def VarFiltration(bam, outputHead, snpcalls, badsnps, indelcalls, depth, mq): - return config.gatkCmd('VariantFiltration') + " -VOH " + outputHead + " -I " + bam + " -B variant,Variants," + snpcalls + ",cleaned,CleanedOutSnp," + badsnps + ",indels,SimpleIndel," + indelcalls + " -X DepthOfCoverage:max=" + depth + " -X MappingQualityZero:max=" + mq + return config.gatkCmd('VariantFiltration') + " -VOH " + outputHead + " -I " + bam + " -B variant,Variants," + snpcalls + ",cleaned,CleanedOutSNP," + badsnps + ",indels,SimpleIndel," + indelcalls + " -X DepthOfCoverage:max=" + depth + " -X MappingQualityZero:max=" + mq def VarFiltration454(bam, outputHead, snpcalls, depth, mq): return config.gatkCmd('VariantFiltration') + " -VOH " + outputHead + " -I " + bam + " -B variant,Variants," + snpcalls + " -X DepthOfCoverage:max=" + depth + " -X MappingQualityZero:max=" + mq + waitid = makeJobName("phase3") + if ( tech == "454" ): + waitid = None + bamToCallFrom = finalBam(tech) indelsFileHigh = outputFileTech(indel_output, tech, "high.calls") cmd = IndelCaller(bamToCallFrom, indelsFileHigh, "0.3") - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelsFileHigh, just_print_commands = OPTIONS.dry, waitID = mergeid) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelsFileHigh, just_print_commands = OPTIONS.dry, waitID = waitid) indelsFileLow = indelsForFiltering(tech) cmd = IndelCaller(bamToCallFrom, indelsFileLow, "0.1") - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelsFileLow, just_print_commands = OPTIONS.dry, waitID = mergeid) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, indelsFileLow, just_print_commands = OPTIONS.dry, waitID = waitid, jobName = makeJobName("phase4")) snpsFile = outputFileTech(snp_output, tech, "calls") cmd = SnpCaller(bamToCallFrom, snpsFile) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, snpsFile, just_print_commands = OPTIONS.dry, waitID = jobid) # wait on the low indel calls + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, snpsFile, just_print_commands = OPTIONS.dry, waitID = waitid, jobName = makeJobName("phase4")) varFiltFile = os.path.join(filter_output, "%s.%s" % ( sample, tech )) if ( tech != "454" ): - cmd = VarFiltration(bamToCallFrom, varFiltFile, snpsFile, badsnpsFile, indelsFileLow, str(DoC), str(mappingQuality)) + cmd = VarFiltration(bamToCallFrom, varFiltFile, snpsFile, badSnps(tech), indelsFileLow, str(DoC_hash[tech][sample]), str(MQ_hash[tech])) else: - cmd = VarFiltration454(bamToCallFrom, varFiltFile, snpsFile, str(DoC), str(mappingQuality)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, varFiltFile, just_print_commands = OPTIONS.dry, waitID = jobid) + cmd = VarFiltration454(bamToCallFrom, varFiltFile, snpsFile, str(DoC_hash[tech][sample]), str(MQ_hash[tech])) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, varFiltFile, just_print_commands = OPTIONS.dry, waitID = makeJobName("phase4")) def SnpCaller(bams, outputFile): - return config.gatkCmd('SingleSampleGenotyper') + " -o " + outputFile + " ".join(map( lambda x: " -I " + x, bams )) + return config.gatkCmd('SingleSampleGenotyper') + " -varout " + outputFile + " ".join(map( lambda x: " -I " + x, bams )) def VarFiltration(bams, outputHead, snpcalls, badsnps, indelcalls, depth, mq): - return config.gatkCmd('VariantFiltration') + " -VOH " + outputHead + " -B variant,Variants," + snpcalls + ",cleaned,CleanedOutSnp," + badsnps + ",indels,SimpleIndel," + indelcalls + " -X DepthOfCoverage:max=" + depth + " -X MappingQualityZero:max=" + mq + " ".join(map( lambda x: " -I " + x, bams )) + return config.gatkCmd('VariantFiltration') + " -VOH " + outputHead + " -B variant,Variants," + snpcalls + ",cleaned,CleanedOutSNP," + badsnps + ",indels,SimpleIndel," + indelcalls + " -X DepthOfCoverage:max=" + depth + " -X MappingQualityZero:max=" + mq + " ".join(map( lambda x: " -I " + x, bams )) -# -# HOW DO I MAKE THESE JOBS DEPEND ON THE MERGE IDS OF THE INDIVIDUAL SAMPLES??? -# (Or until everything else is done?) -# - solid454SnpsFile = outputFileTech(snp_output, "454-SOLID", "calls") - cmd = SnpCaller([finalBam("SOLID"),finalBam("454")], solid454SnpsFile) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, solid454SnpsFile, just_print_commands = OPTIONS.dry, waitID = allMergeIds) + if sample in ["NA12878", "NA19240"]: - solid454VarFiltFile = os.path.join(filter_output, "%s.454-SOLID" % ( sample )) - cmd = VarFiltration([finalBam("SOLID"),finalBam("454")], solid454VarFiltFile, solid454SnpsFile, badSnps("SOLID"), indelsForFiltering("SOLID"), str(DoC), str(mappingQuality)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, allVarFiltFile, just_print_commands = OPTIONS.dry, waitID = jobid) + solid454SnpsFile = outputFileTech(snp_output, "454-SOLID", "calls") + cmd = SnpCaller([finalBam("SOLID"),finalBam("454")], solid454SnpsFile) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, solid454SnpsFile, just_print_commands = OPTIONS.dry, waitID = makeJobClass("phase3")) - allSnpsFile = outputFileTech(snp_output, "allTechs", "calls") - cmd = SnpCaller([finalBam("SLX"),finalBam("SOLID"),finalBam("454")], solid454SnpsFile) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, allSnpsFile, just_print_commands = OPTIONS.dry, waitID = allMergeIds) - allVarFiltFile = os.path.join(filter_output, "%s.allTechs" % ( sample )) - cmd = VarFiltration([finalBam("SLX"),finalBam("SOLID"),finalBam("454")], varFiltFile, allSnpsFile, badSnps("SLX"), indelsForFiltering("SLX"), str(DoC), str(mappingQuality)) - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, allVarFiltFile, just_print_commands = OPTIONS.dry, waitID = jobid) -# -# HOW DO I COMBINE ALL OF THE MQs AND DOCs? -# + solid454VarFiltFile = os.path.join(filter_output, "%s.454-SOLID" % ( sample )) + cmd = VarFiltration([finalBam("SOLID"),finalBam("454")], solid454VarFiltFile, solid454SnpsFile, badSnps("SOLID"), indelsForFiltering("SOLID"), str(DoC_hash["454SOLID"][sample]), str(MQ_hash["454SOLID"])) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, solid454VarFiltFile, just_print_commands = OPTIONS.dry, waitID = jobid) + allSnpsFile = outputFileTech(snp_output, "allTechs", "calls") + cmd = SnpCaller([finalBam("SLX"),finalBam("SOLID"),finalBam("454")], allSnpsFile) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, allSnpsFile, just_print_commands = OPTIONS.dry, waitID = makeJobClass("phase3")) + allVarFiltFile = os.path.join(filter_output, "%s.allTechs" % ( sample )) + cmd = VarFiltration([finalBam("SLX"),finalBam("SOLID"),finalBam("454")], allVarFiltFile, allSnpsFile, badSnps("SLX"), indelsForFiltering("SLX"), str(DoC_hash["ALL"][sample]), str(MQ_hash["ALL"])) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, allVarFiltFile, just_print_commands = OPTIONS.dry, waitID = jobid)