diff --git a/python/farm_commands2.py b/python/farm_commands2.py index 14dd74ece..1afab77d9 100755 --- a/python/farm_commands2.py +++ b/python/farm_commands2.py @@ -7,6 +7,12 @@ import re import unittest import tempfile +def all(iterable): + for element in iterable: + if not element: + return False + return True + # maximum number of unnamed jobs allowed sa dependencies MAX_UNNAMED_DEPENDENCIES = 10 diff --git a/python/madPipelineUtils.py b/python/madPipelineUtils.py new file mode 100755 index 000000000..2b24c6cd2 --- /dev/null +++ b/python/madPipelineUtils.py @@ -0,0 +1,118 @@ +from farm_commands2 import * +import os.path +import sys +from optparse import OptionParser +from datetime import date +import glob +import operator +import faiReader +import math +import shutil +import string + +GATK_STABLE = '/home/radon01/depristo/dev/GenomeAnalysisTKStable/trunk/dist/GenomeAnalysisTK.jar' +GATK_DEV = '/home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/GenomeAnalysisTK.jar' +GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_STABLE + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO ' + +# add to GATK to enable dbSNP aware cleaning +# -D /humgen/gsa-scr1/GATK_Data/dbsnp_129_hg18.rod + +#hg18 = ['chrM'] + ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] +#b36 = [str(i) for i in range(1,23)] + ['X', 'Y', 'MT'] + +hg18 = ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] +b36 = [str(i) for i in range(1,23)] + ['X', 'Y'] + + +HG18_TO_B36 = { + 'hg18' : 'b36', + '/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta' : '/broad/1KG/reference/human_b36_both.fasta', + '/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta.fai' : '/broad/1KG/reference/human_b36_both.fasta.fai', + 'chrM' : 'MT', + 'chr' : '' } + +def hg18args_to_b36(s): + for key, value in HG18_TO_B36.iteritems(): + s = s.replace(key, value) + return s + +def appendExtension(path, newExt, addExtension = True): + root, basename = os.path.split(path) + basename, ext = os.path.splitext(basename) + print root, basename, ext, path + s = basename + '.' + newExt + if addExtension: s += ext + return os.path.join(root, s) +# return os.path.join(OPTIONS.dir, s) + +class PipelineArgs: + def __init__( self, GATK = GATK, ref = 'hg18', name = None, memory = '4g' ): + self.GATK = GATK + self.ref = ref + self.name = name + self.memory = memory + + def convertToB36(self): + return this.ref == 'b36' + + def addGATKArg(self, arg): + if arg[0] != ' ': + arg = ' ' + arg + self.GATK += arg + + def getCommandName(self, suffix): + if self.name != None: + return self.name + "." + suffix + else: + return suffix + + def finalizedGATKCommand(self, args): + cmd = (self.GATK % self.memory) + args + if self.convertToB36: + cmd = hg18args_to_b36(cmd) + return cmd + +# +# General features +# +def simpleGATKCommand( pargs, name, args, lastJobs ): + cmd = pargs.finalizedGATKCommand(args) + return [FarmJob(cmd, jobName = pargs.getCommandName(name), dependencies = lastJobs)] + +# +# Takes a simpleGATKCommand and splits it by chromosome, merging output +# +def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeCommands ): + def makeChrCmd(chr): + chrOutputMap = map(lambda x: appendExtension(x, chr), outputsToParallelize) + chr_cmd_str = cmd.cmd_str_from_user + for x, y in zip(outputsToParallelize, chrOutputMap): + chr_cmd_str = chr_cmd_str.replace(x, y) + chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies) + return chrCmd, chrOutputMap + + splits = map( makeChrCmd, hg18 ) + splitCommands = map(lambda x: x[0], splits) + + def mergeCommand1(i): + mergeCommand = mergeCommands[i] + mergeFile = outputsToParallelize[i] + splitFiles = map(lambda x: x[1][i], splits) + return FarmJob(mergeCommand(splitFiles, mergeFile), jobName = cmd.jobName + '.merge', dependencies = splitCommands) + + mergeCommands = map(mergeCommand1, range(len(outputsToParallelize))) + + return splitCommands + mergeCommands + +def mergeVCFs(splitFiles, mergeFile): + splitFilesString = ' '.join(splitFiles) + cmd = "python ~/dev/GenomeAnalysisTK/trunk/python/mergeVCFs.py -a -f /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta.fai %s > %s" % (splitFilesString, mergeFile) + return cmd + +def mergeByCat(splitFiles, mergeFile): + splitFilesString = ' '.join(splitFiles) + return "cat %s > %s" % (splitFilesString, mergeFile) + +def indexBAMFile( name, bamFile, lastJobs ): + cmd = 'samtools index ' + bamFile + return [FarmJob(cmd, jobName = 'samtools.index.' + name, dependencies = lastJobs)], None diff --git a/python/makeIndelMask.py b/python/makeIndelMask.py new file mode 100755 index 000000000..8eb323f54 --- /dev/null +++ b/python/makeIndelMask.py @@ -0,0 +1,39 @@ +from farm_commands2 import * +import os.path +import sys +from optparse import OptionParser +from datetime import date +import glob +import operator +import faiReader +import math +import shutil +import string + +def main(): + global OPTIONS + usage = "usage: %prog [options] indelCalls.bed maskSize indelsMask.bed" + parser = OptionParser(usage=usage) + #parser.add_option("", "--dry", dest="dry", + # action='store_true', default=False, + # help="If provided, nothing actually gets run, just a dry run") + + (OPTIONS, args) = parser.parse_args() + if len(args) != 3: + parser.error("incorrect number of arguments") + + indelCalls, maskSize, indelsMask = args + maskSize = int(maskSize) + + out = open(indelsMask, 'w') + for line in open(indelCalls): + # chr1 71996 72005 -AAAAAAAAA:4/6 + chr, indelStart, indelStop, notes = line.split() + maskStart = int(indelStart) - maskSize + maskStop = int(indelStop) + maskSize + maskNotes = notes + ":+/-" + str(maskSize) + print >> out, '\t'.join([chr, str(maskStart), str(maskStop), maskNotes]) + out.close() + +if __name__ == "__main__": + main() diff --git a/python/realignBamByChr.py b/python/realignBamByChr.py new file mode 100755 index 000000000..79aa999dd --- /dev/null +++ b/python/realignBamByChr.py @@ -0,0 +1,103 @@ +from farm_commands2 import * +import os.path +import sys +from optparse import OptionParser +from datetime import date +import glob +import operator +import faiReader +import math +import shutil +import string +from madPipelineUtils import * + +def main(): + global OPTIONS + usage = "usage: %prog [options] stages input.bam outputRoot" + parser = OptionParser(usage=usage) + parser.add_option("", "--dry", dest="dry", + action='store_true', default=False, + help="If provided, nothing actually gets run, just a dry run") + parser.add_option("", "--b36", dest="useB36", + action='store_true', default=False, + help="If provided, BAM is assumed to aligned to b36 named chromosomes") + parser.add_option("-v", "--verbose", dest="verbose", + action='store_true', default=False, + help="") + parser.add_option("-n", "--name", dest="name", + type="string", default="realignBamByChr", + help="Farm queue to send processing jobs to") + parser.add_option("-q", "--farm", dest="farmQueue", + type="string", default=None, + help="Farm queue to send processing jobs to") + parser.add_option("-e", "--extraArgs", dest="extraArgs", + type="string", default=None, + help="") + + (OPTIONS, args) = parser.parse_args() + if len(args) != 3: + parser.error("incorrect number of arguments") + + stages = map(string.lower, args[0].split(",")) + inputBam, outputRoot = args[1:] + outputBamList = outputRoot + '.bams.list' + + STAGES = ['targets', 'realign', 'index'] + for stage in stages: + if stage not in STAGES: + sys.exit('unknown stage ' + stage) + + myPipelineArgs = PipelineArgs(name = OPTIONS.name) + if ( OPTIONS.useB36 ): + myPipelineArgs.ref = 'b36' + + allJobs = [] + + def includeStage(name): + return name in stages + + out = open(outputBamList, 'w') + for chr in hg18: + lastJobs = None + + def updateNewJobs(newjobs, lastJobs): + if OPTIONS.verbose: + print 'New jobs', newjobs + allJobs.append(newjobs) + if newjobs != []: + lastJobs = newjobs + return [], lastJobs + + newJobs = [] + + def execStage(name, func, args = [], lastJobs = []): + if OPTIONS.verbose: print 'Name is', name + newJobs, results = func(myPipelineArgs, chr, inputBam, outputRoot + '.' + chr, args, lastJobs) + if includeStage(name): newJobs, lastJobs = updateNewJobs(newJobs, lastJobs) + return newJobs, lastJobs, results + + newJobs, lastJobs, intervals = execStage('targets', createTargets) + newJobs, lastJobs, realignedBam = execStage('realign', realign, intervals, lastJobs) + # need to merge and then index + newJobs, lastJobs, ignore = execStage('index', index, realignedBam, lastJobs) + print >> out, os.path.abspath(realignedBam) + + out.close() + print 'EXECUTING JOBS' + executeJobs(allJobs, farm_queue = OPTIONS.farmQueue, just_print_commands = OPTIONS.dry) + +def createTargets( myPipelineArgs, chr, inputBam, outputRoot, args, lastJobs ): + outputIntervals = outputRoot + ".intervals" + GATKArgs = '-T RealignerTargetCreator -I %s -o %s -mrl 10000 -L %s' % (inputBam, outputIntervals, chr) + return simpleGATKCommand( myPipelineArgs, 'CreateInterval' + chr, GATKArgs, lastJobs ), outputIntervals + +def realign( myPipelineArgs, chr, inputBam, outputRoot, intervals, lastJobs ): + outputBAM = outputRoot + ".bam" + GATKArgs = '-T IndelRealigner -I %s -targetIntervals %s --output %s -sort ON_DISK -mrl 100000 -L %s' % (inputBam, intervals, outputBAM, chr) + return simpleGATKCommand( myPipelineArgs, 'Realign' + chr, GATKArgs, lastJobs ), outputBAM + +def index( myPipelineArgs, chr, inputBam, outputRoot, realignedBam, lastJobs ): + return indexBAMFile( myPipelineArgs.name, realignedBam, lastJobs ) + +if __name__ == "__main__": + main() diff --git a/python/vcf_b36_to_hg18.py b/python/vcf_b36_to_hg18.py new file mode 100755 index 000000000..57102b3ae --- /dev/null +++ b/python/vcf_b36_to_hg18.py @@ -0,0 +1,38 @@ +from farm_commands2 import * +import os.path +import sys +from optparse import OptionParser +from datetime import date +import glob +import operator +import faiReader +import math +import shutil +import string + +def main(): + global OPTIONS + usage = "usage: %prog [options] b36VCF hg18VCF" + parser = OptionParser(usage=usage) + parser.add_option("", "--dry", dest="dry", + action='store_true', default=False, + help="If provided, nothing actually gets run, just a dry run") + + (OPTIONS, args) = parser.parse_args() + if len(args) != 2: + parser.error("incorrect number of arguments") + + b36vcf, hg18vcf = args + + out = open(hg18vcf, 'w') + for line in open(b36vcf): + length = len(line) + if length > 2 and line[0] != '#': + if line[0:2] == 'MT': + sys.exit('Cannot convert MT containing VCFs, sorry') + line = 'chr' + line + out.write(line) + out.close() + +if __name__ == "__main__": + main()