Minor improvements to my crappy old python job management system. Mauricio's first task is to retire all of this code and move the DPP pipeline over to Queue

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4810 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2010-12-09 04:44:16 +00:00
parent 722819688a
commit fabb42924c
2 changed files with 10 additions and 6 deletions

View File

@ -17,12 +17,13 @@ def all(iterable):
MAX_UNNAMED_DEPENDENCIES = 10
class FarmJob:
def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False):
def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False, memlimit = None):
self.cmd_str_from_user = cmd_str_from_user
self.jobName = jobName
self.outputHead = outputHead
self.outputFile = outputFile
self.dieOnFail = dieOnFail
self.memlimit = memlimit
self.dependencies = dependencies
if self.dependencies == None:
@ -136,6 +137,9 @@ def buildExecutionString(job, farm_queue = None, debug = True):
cmd_str = "bsub -r -q " + farm_queue
if farm_stdout != None:
cmd_str += " -o " + farm_stdout
if ( job.memlimit != None ):
cmd_str += " -R \"rusage[mem=" + job.memlimit[0:-1] + "]\""
# fixme
if job.dependencies != []:

View File

@ -45,7 +45,7 @@ def appendExtension(path, newExt, addExtension = True):
# return os.path.join(OPTIONS.dir, s)
class PipelineArgs:
def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '3g', excludeChrs = [] ):
def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '2g', excludeChrs = [] ):
self.GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_JAR + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO '
self.ref = ref
self.name = name
@ -81,7 +81,7 @@ class PipelineArgs:
#
def simpleGATKCommand( pargs, name, args, lastJobs ):
cmd = pargs.finalizedGATKCommand(args)
return [FarmJob(cmd, jobName = pargs.getCommandName(name), dependencies = lastJobs)]
return [FarmJob(cmd, jobName = pargs.getCommandName(name), dependencies = lastJobs, memlimit = pargs.memory)]
#
# Takes a simpleGATKCommand and splits it by chromosome, merging output
@ -99,7 +99,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma
if myPipelineArgs.convertToB36():
chr_cmd_str = hg18args_to_b36(chr_cmd_str)
chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies)
chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies, memlimit = myPipelineArgs.memory)
return chrCmd, chrOutputMap
#print '######################################### chrsToSplitBy', myPipelineArgs.chrsToSplitBy(hg18)
@ -110,7 +110,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma
mergeCommand = mergeCommands[i]
mergeFile = outputsToParallelize[i]
splitFiles = map(lambda x: x[1][i], splits)
return FarmJob(mergeCommand(splitFiles, mergeFile), jobName = cmd.jobName + '.merge', dependencies = splitCommands)
return FarmJob(mergeCommand(splitFiles, mergeFile), jobName = cmd.jobName + '.merge', dependencies = splitCommands, memlimit = myPipelineArgs.memory)
mergeCommands = map(mergeCommand1, range(len(outputsToParallelize)))
@ -127,4 +127,4 @@ def mergeByCat(splitFiles, mergeFile):
def indexBAMFile( name, bamFile, lastJobs ):
cmd = 'samtools index ' + bamFile
return [FarmJob(cmd, jobName = 'samtools.index.' + name, dependencies = lastJobs)], None
return [FarmJob(cmd, jobName = 'samtools.index.' + name, dependencies = lastJobs, memlimit = "1g")], None