From d3c33d4b3f1bb239b717bbff02ccc75466651a41 Mon Sep 17 00:00:00 2001 From: depristo Date: Wed, 12 May 2010 13:37:39 +0000 Subject: [PATCH] more powerful management routines for my pipeline git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3351 348d0f76-0448-11de-a6fe-93d51630548a --- python/farm_commands2.py | 2 +- python/madPipelineUtils.py | 16 ++++++++++------ python/realignBamByChr.py | 32 +++++++++++++++++++++++--------- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/python/farm_commands2.py b/python/farm_commands2.py index 1afab77d9..1e8488b54 100755 --- a/python/farm_commands2.py +++ b/python/farm_commands2.py @@ -109,7 +109,7 @@ def executeJob(job, farm_queue = None, just_print_commands = False, debug = True job.jobID = justPrintJobIDCounter justPrintJobIDCounter += 1 elif farm_queue: - print 'job.executionString', job.executionString + #print 'job.executionString', job.executionString result = subprocess.Popen([job.executionString, ""], shell=True, stdout=subprocess.PIPE).communicate()[0] p = re.compile('Job <(\d+)> is submitted to queue') job.jobID = p.match(result).group(1) diff --git a/python/madPipelineUtils.py b/python/madPipelineUtils.py index 83f275f30..560a78d03 100755 --- a/python/madPipelineUtils.py +++ b/python/madPipelineUtils.py @@ -17,11 +17,11 @@ GATK_JAR = GATK_STABLE_JAR # add to GATK to enable dbSNP aware cleaning # -D /humgen/gsa-scr1/GATK_Data/dbsnp_129_hg18.rod -#hg18 = ['chrM'] + ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] -#b36 = [str(i) for i in range(1,23)] + ['X', 'Y', 'MT'] +hg18 = ['chrM'] + ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] +b36 = [str(i) for i in range(1,23)] + ['X', 'Y', 'MT'] -hg18 = ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] -b36 = [str(i) for i in range(1,23)] + ['X', 'Y'] +#hg18 = ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] +#b36 = [str(i) for i in range(1,23)] + ['X', 'Y'] HG18_TO_B36 = { 'hg18' : 'b36', @@ -45,11 +45,12 @@ def appendExtension(path, newExt, addExtension = True): # return os.path.join(OPTIONS.dir, s) class PipelineArgs: - def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '4g' ): + def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '4g', excludeChrs = [] ): self.GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_JAR + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO ' self.ref = ref self.name = name self.memory = memory + self.excludeChrs = excludeChrs def convertToB36(self): return self.ref == 'b36' @@ -71,6 +72,9 @@ class PipelineArgs: cmd = hg18args_to_b36(cmd) return cmd + def chrsToSplitBy(self, chrs): + return filter(lambda x: x not in self.excludeChrs, chrs) + # # General features # @@ -97,7 +101,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies) return chrCmd, chrOutputMap - splits = map( makeChrCmd, hg18 ) + splits = map( makeChrCmd, myPipelineArgs.chrsToSplitBy(hg18) ) splitCommands = map(lambda x: x[0], splits) def mergeCommand1(i): diff --git a/python/realignBamByChr.py b/python/realignBamByChr.py index 9b70c0fc3..c1f9ec260 100755 --- a/python/realignBamByChr.py +++ b/python/realignBamByChr.py @@ -9,6 +9,7 @@ import faiReader import math import shutil import string +import picard_utils from madPipelineUtils import * def main(): @@ -42,7 +43,7 @@ def main(): inputBam, outputRoot = args[1:] outputBamList = outputRoot + '.bams.list' - STAGES = ['targets', 'realign', 'index'] + STAGES = ['targets', 'realign', 'index', 'merge'] for stage in stages: if stage not in STAGES: sys.exit('unknown stage ' + stage) @@ -57,6 +58,7 @@ def main(): return name in stages out = open(outputBamList, 'w') + realignInfo = [] for chr in hg18: lastJobs = None @@ -66,23 +68,30 @@ def main(): allJobs.append(newjobs) if newjobs != []: lastJobs = newjobs - return [], lastJobs - - newJobs = [] + return lastJobs def execStage(name, func, args = [], lastJobs = []): if OPTIONS.verbose: print 'Name is', name newJobs, results = func(myPipelineArgs, chr, inputBam, outputRoot + '.' + chr, args, lastJobs) - if includeStage(name): newJobs, lastJobs = updateNewJobs(newJobs, lastJobs) - return newJobs, lastJobs, results + if includeStage(name): lastJobs = updateNewJobs(newJobs, lastJobs) + return lastJobs, results - newJobs, lastJobs, intervals = execStage('targets', createTargets) - newJobs, lastJobs, realignedBam = execStage('realign', realign, intervals, lastJobs) + lastJobs, intervals = execStage('targets', createTargets) + realignJobs, realignedBam = execStage('realign', realign, intervals, lastJobs) + realignInfo.append([realignJobs, realignedBam]) # need to merge and then index - newJobs, lastJobs, ignore = execStage('index', index, realignedBam, lastJobs) + indexJobs, ignore = execStage('index', index, realignedBam, realignJobs) print >> out, os.path.abspath(realignedBam) out.close() + + if 'merge' in stages: + realignerJobs = [] + if realignInfo[0][0] != []: + realignerJobs = map(lambda x: x[0][0], realignInfo) + mergerJob = mergeBams(myPipelineArgs, outputRoot + ".bam", map(lambda x: x[1], realignInfo), realignerJobs) + allJobs.append(mergerJob) + print 'EXECUTING JOBS' executeJobs(allJobs, farm_queue = OPTIONS.farmQueue, just_print_commands = OPTIONS.dry) @@ -99,5 +108,10 @@ def realign( myPipelineArgs, chr, inputBam, outputRoot, intervals, lastJobs ): def index( myPipelineArgs, chr, inputBam, outputRoot, realignedBam, lastJobs ): return indexBAMFile( myPipelineArgs.name, realignedBam, lastJobs ) +def mergeBams( myPipelineArgs, outputFilename, bamsToMerge, lastJobs ): + print lastJobs + cmd = picard_utils.mergeBAMCmd( outputFilename, bamsToMerge, compression_level = 5 ) + return FarmJob(cmd, jobName = 'merge.' + myPipelineArgs.name, dependencies = lastJobs) + if __name__ == "__main__": main()