more powerful management routines for my pipeline

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3351 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2010-05-12 13:37:39 +00:00
parent 3f07611187
commit d3c33d4b3f
3 changed files with 34 additions and 16 deletions

View File

@ -109,7 +109,7 @@ def executeJob(job, farm_queue = None, just_print_commands = False, debug = True
job.jobID = justPrintJobIDCounter job.jobID = justPrintJobIDCounter
justPrintJobIDCounter += 1 justPrintJobIDCounter += 1
elif farm_queue: elif farm_queue:
print 'job.executionString', job.executionString #print 'job.executionString', job.executionString
result = subprocess.Popen([job.executionString, ""], shell=True, stdout=subprocess.PIPE).communicate()[0] result = subprocess.Popen([job.executionString, ""], shell=True, stdout=subprocess.PIPE).communicate()[0]
p = re.compile('Job <(\d+)> is submitted to queue') p = re.compile('Job <(\d+)> is submitted to queue')
job.jobID = p.match(result).group(1) job.jobID = p.match(result).group(1)

View File

@ -17,11 +17,11 @@ GATK_JAR = GATK_STABLE_JAR
# add to GATK to enable dbSNP aware cleaning # add to GATK to enable dbSNP aware cleaning
# -D /humgen/gsa-scr1/GATK_Data/dbsnp_129_hg18.rod # -D /humgen/gsa-scr1/GATK_Data/dbsnp_129_hg18.rod
#hg18 = ['chrM'] + ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] hg18 = ['chrM'] + ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY']
#b36 = [str(i) for i in range(1,23)] + ['X', 'Y', 'MT'] b36 = [str(i) for i in range(1,23)] + ['X', 'Y', 'MT']
hg18 = ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY'] #hg18 = ['chr' + str(i) for i in range(1,23)] + ['chrX', 'chrY']
b36 = [str(i) for i in range(1,23)] + ['X', 'Y'] #b36 = [str(i) for i in range(1,23)] + ['X', 'Y']
HG18_TO_B36 = { HG18_TO_B36 = {
'hg18' : 'b36', 'hg18' : 'b36',
@ -45,11 +45,12 @@ def appendExtension(path, newExt, addExtension = True):
# return os.path.join(OPTIONS.dir, s) # return os.path.join(OPTIONS.dir, s)
class PipelineArgs: class PipelineArgs:
def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '4g' ): def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '4g', excludeChrs = [] ):
self.GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_JAR + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO ' self.GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_JAR + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO '
self.ref = ref self.ref = ref
self.name = name self.name = name
self.memory = memory self.memory = memory
self.excludeChrs = excludeChrs
def convertToB36(self): def convertToB36(self):
return self.ref == 'b36' return self.ref == 'b36'
@ -71,6 +72,9 @@ class PipelineArgs:
cmd = hg18args_to_b36(cmd) cmd = hg18args_to_b36(cmd)
return cmd return cmd
def chrsToSplitBy(self, chrs):
return filter(lambda x: x not in self.excludeChrs, chrs)
# #
# General features # General features
# #
@ -97,7 +101,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma
chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies) chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies)
return chrCmd, chrOutputMap return chrCmd, chrOutputMap
splits = map( makeChrCmd, hg18 ) splits = map( makeChrCmd, myPipelineArgs.chrsToSplitBy(hg18) )
splitCommands = map(lambda x: x[0], splits) splitCommands = map(lambda x: x[0], splits)
def mergeCommand1(i): def mergeCommand1(i):

View File

@ -9,6 +9,7 @@ import faiReader
import math import math
import shutil import shutil
import string import string
import picard_utils
from madPipelineUtils import * from madPipelineUtils import *
def main(): def main():
@ -42,7 +43,7 @@ def main():
inputBam, outputRoot = args[1:] inputBam, outputRoot = args[1:]
outputBamList = outputRoot + '.bams.list' outputBamList = outputRoot + '.bams.list'
STAGES = ['targets', 'realign', 'index'] STAGES = ['targets', 'realign', 'index', 'merge']
for stage in stages: for stage in stages:
if stage not in STAGES: if stage not in STAGES:
sys.exit('unknown stage ' + stage) sys.exit('unknown stage ' + stage)
@ -57,6 +58,7 @@ def main():
return name in stages return name in stages
out = open(outputBamList, 'w') out = open(outputBamList, 'w')
realignInfo = []
for chr in hg18: for chr in hg18:
lastJobs = None lastJobs = None
@ -66,23 +68,30 @@ def main():
allJobs.append(newjobs) allJobs.append(newjobs)
if newjobs != []: if newjobs != []:
lastJobs = newjobs lastJobs = newjobs
return [], lastJobs return lastJobs
newJobs = []
def execStage(name, func, args = [], lastJobs = []): def execStage(name, func, args = [], lastJobs = []):
if OPTIONS.verbose: print 'Name is', name if OPTIONS.verbose: print 'Name is', name
newJobs, results = func(myPipelineArgs, chr, inputBam, outputRoot + '.' + chr, args, lastJobs) newJobs, results = func(myPipelineArgs, chr, inputBam, outputRoot + '.' + chr, args, lastJobs)
if includeStage(name): newJobs, lastJobs = updateNewJobs(newJobs, lastJobs) if includeStage(name): lastJobs = updateNewJobs(newJobs, lastJobs)
return newJobs, lastJobs, results return lastJobs, results
newJobs, lastJobs, intervals = execStage('targets', createTargets) lastJobs, intervals = execStage('targets', createTargets)
newJobs, lastJobs, realignedBam = execStage('realign', realign, intervals, lastJobs) realignJobs, realignedBam = execStage('realign', realign, intervals, lastJobs)
realignInfo.append([realignJobs, realignedBam])
# need to merge and then index # need to merge and then index
newJobs, lastJobs, ignore = execStage('index', index, realignedBam, lastJobs) indexJobs, ignore = execStage('index', index, realignedBam, realignJobs)
print >> out, os.path.abspath(realignedBam) print >> out, os.path.abspath(realignedBam)
out.close() out.close()
if 'merge' in stages:
realignerJobs = []
if realignInfo[0][0] != []:
realignerJobs = map(lambda x: x[0][0], realignInfo)
mergerJob = mergeBams(myPipelineArgs, outputRoot + ".bam", map(lambda x: x[1], realignInfo), realignerJobs)
allJobs.append(mergerJob)
print 'EXECUTING JOBS' print 'EXECUTING JOBS'
executeJobs(allJobs, farm_queue = OPTIONS.farmQueue, just_print_commands = OPTIONS.dry) executeJobs(allJobs, farm_queue = OPTIONS.farmQueue, just_print_commands = OPTIONS.dry)
@ -99,5 +108,10 @@ def realign( myPipelineArgs, chr, inputBam, outputRoot, intervals, lastJobs ):
def index( myPipelineArgs, chr, inputBam, outputRoot, realignedBam, lastJobs ): def index( myPipelineArgs, chr, inputBam, outputRoot, realignedBam, lastJobs ):
return indexBAMFile( myPipelineArgs.name, realignedBam, lastJobs ) return indexBAMFile( myPipelineArgs.name, realignedBam, lastJobs )
def mergeBams( myPipelineArgs, outputFilename, bamsToMerge, lastJobs ):
print lastJobs
cmd = picard_utils.mergeBAMCmd( outputFilename, bamsToMerge, compression_level = 5 )
return FarmJob(cmd, jobName = 'merge.' + myPipelineArgs.name, dependencies = lastJobs)
if __name__ == "__main__": if __name__ == "__main__":
main() main()