diff --git a/python/farm_commands2.py b/python/farm_commands2.py index 239828f5b..4da53adb3 100755 --- a/python/farm_commands2.py +++ b/python/farm_commands2.py @@ -17,12 +17,13 @@ def all(iterable): MAX_UNNAMED_DEPENDENCIES = 10 class FarmJob: - def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False): + def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False, memlimit = None): self.cmd_str_from_user = cmd_str_from_user self.jobName = jobName self.outputHead = outputHead self.outputFile = outputFile self.dieOnFail = dieOnFail + self.memlimit = memlimit self.dependencies = dependencies if self.dependencies == None: @@ -136,6 +137,9 @@ def buildExecutionString(job, farm_queue = None, debug = True): cmd_str = "bsub -r -q " + farm_queue if farm_stdout != None: cmd_str += " -o " + farm_stdout + + if ( job.memlimit != None ): + cmd_str += " -R \"rusage[mem=" + job.memlimit[0:-1] + "]\"" # fixme if job.dependencies != []: diff --git a/python/madPipelineUtils.py b/python/madPipelineUtils.py index 3db856bd2..2fc78d41e 100755 --- a/python/madPipelineUtils.py +++ b/python/madPipelineUtils.py @@ -45,7 +45,7 @@ def appendExtension(path, newExt, addExtension = True): # return os.path.join(OPTIONS.dir, s) class PipelineArgs: - def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '3g', excludeChrs = [] ): + def __init__( self, GATK_JAR = GATK_JAR, ref = 'hg18', name = None, memory = '2g', excludeChrs = [] ): self.GATK = 'java -Xmx%s -Djava.io.tmpdir=/broad/shptmp/depristo/tmp/ -jar ' + GATK_JAR + ' -R /seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta -l INFO ' self.ref = ref self.name = name @@ -81,7 +81,7 @@ class PipelineArgs: # def simpleGATKCommand( pargs, name, args, lastJobs ): cmd = pargs.finalizedGATKCommand(args) - return [FarmJob(cmd, jobName = pargs.getCommandName(name), dependencies = lastJobs)] + return [FarmJob(cmd, jobName = pargs.getCommandName(name), dependencies = lastJobs, memlimit = pargs.memory)] # # Takes a simpleGATKCommand and splits it by chromosome, merging output @@ -99,7 +99,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma if myPipelineArgs.convertToB36(): chr_cmd_str = hg18args_to_b36(chr_cmd_str) - chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies) + chrCmd = FarmJob(chr_cmd_str, jobName = cmd.jobName + '.byChr' + chr, dependencies = cmd.dependencies, memlimit = myPipelineArgs.memory) return chrCmd, chrOutputMap #print '######################################### chrsToSplitBy', myPipelineArgs.chrsToSplitBy(hg18) @@ -110,7 +110,7 @@ def splitGATKCommandByChr( myPipelineArgs, cmd, outputsToParallelize, mergeComma mergeCommand = mergeCommands[i] mergeFile = outputsToParallelize[i] splitFiles = map(lambda x: x[1][i], splits) - return FarmJob(mergeCommand(splitFiles, mergeFile), jobName = cmd.jobName + '.merge', dependencies = splitCommands) + return FarmJob(mergeCommand(splitFiles, mergeFile), jobName = cmd.jobName + '.merge', dependencies = splitCommands, memlimit = myPipelineArgs.memory) mergeCommands = map(mergeCommand1, range(len(outputsToParallelize))) @@ -127,4 +127,4 @@ def mergeByCat(splitFiles, mergeFile): def indexBAMFile( name, bamFile, lastJobs ): cmd = 'samtools index ' + bamFile - return [FarmJob(cmd, jobName = 'samtools.index.' + name, dependencies = lastJobs)], None + return [FarmJob(cmd, jobName = 'samtools.index.' + name, dependencies = lastJobs, memlimit = "1g")], None