#!/usr/bin/env python import os import sys import subprocess import re import unittest import tempfile def all(iterable): for element in iterable: if not element: return False return True # maximum number of unnamed jobs allowed sa dependencies MAX_UNNAMED_DEPENDENCIES = 10 class FarmJob: def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False, memlimit = None): self.cmd_str_from_user = cmd_str_from_user self.jobName = jobName self.outputHead = outputHead self.outputFile = outputFile self.dieOnFail = dieOnFail self.memlimit = memlimit self.dependencies = dependencies if self.dependencies == None: self.dependencies = [] elif type(self.dependencies) != list: self.dependencies = [self.dependencies] self.dependencyNameString = dependencyNameString if len(self.dependencies) > MAX_UNNAMED_DEPENDENCIES: depNames = map(FarmJob.getJobName, self.dependencies) if len(filter(None, depNames)) > 1 and len(self.dependencies) != len(filter(None, depNames)): # there are some unnamed and some named deps raise Exception("Bad job names -- some are named and some are unnamed", depName) self.jobID = None # None indicates currently unscheduled self.executionString = None # currently unscheduled self.executed = False self.jobStatus = None def getJobName(self): return self.jobName def getJobIDString(self): if self.jobName == None: if self.jobID == None: return "UNNAMED" else: return str(self.jobID) else: return self.getJobName() def __str__(self): return "[JOB: name=%s id=%s depending on (%s) with cmd=%s]" % (self.getJobName(), self.jobID, ','.join(map(FarmJob.getJobIDString, self.dependencies)), self.cmd_str_from_user) # def __repr__(self): # return self.__str__() def longestCommonPrefix(strings): #print 'LCP', strings if strings == []: return "" else: l = map(len, strings) shortestLen = min(l) #print '*arg', l, shortestLen for i in range(shortestLen): c = strings[0][i] if not all(map(lambda s: c == s[i], strings)): shortestLen = i break return strings[0][0:shortestLen] def jobNameWildcard(jobs): if len(jobs) == 1: return jobs[0].getJobName() else: return longestCommonPrefix(map(FarmJob.getJobName, jobs)) + "*" def executeJobs(allJobs, farm_queue = None, just_print_commands = False, debug = True): for job in allJobs: if type(job) == list: # convenience for lists of lists map( lambda x: executeJob(x, farm_queue, just_print_commands, debug), job) else: print 'Preparing to execute', job if not job.executed: # schedule the dependents executeJobs(job.dependencies, farm_queue, just_print_commands, debug = debug) executeJob(job, farm_queue, just_print_commands, debug = debug) print 'Executed', job justPrintJobIDCounter = 1 def executeJob(job, farm_queue = None, just_print_commands = False, debug = True, die_on_fail = True): global justPrintJobIDCounter job.executed = True # build my execution string job.executionString = buildExecutionString(job, farm_queue, debug = debug) if just_print_commands or (globals().has_key("justPrintCommands") and globals().justPrintCommands): job.jobID = justPrintJobIDCounter justPrintJobIDCounter += 1 elif farm_queue: #print 'job.executionString', job.executionString result = subprocess.Popen([job.executionString, ""], shell=True, stdout=subprocess.PIPE).communicate()[0] p = re.compile('Job <(\d+)> is submitted to queue') job.jobID = p.match(result).group(1) else: # Actually execute the command if we're not just in debugging output mode status = os.system(job.executionString) if not farm_queue: print "<<< Exit code:", status,"\n" if die_on_fail != None and status != 0: print "### Failed with exit code "+str(status)+" while executing command "+job.cmd_str_from_user sys.exit() job.jobStatus = int(status) def buildExecutionString(job, farm_queue = None, debug = True): if farm_queue != None: if job.outputFile != None: farm_stdout = job.outputFile elif job.outputHead != None: farm_stdout = job.outputHead + ".stdout" else: #farm_stdout = None farm_stdout = "%J.lsf.output" cmd_str = "bsub -r -q " + farm_queue if farm_stdout != None: cmd_str += " -o " + farm_stdout if ( job.memlimit != None ): cmd_str += " -R \"rusage[mem=" + job.memlimit[0:-1] + "]\"" # fixme if job.dependencies != []: cmd_str += buildJobDependencyString(job.dependencies) if job.dependencyNameString != None: cmd_str += buildJobDependencyString(job.dependencyNameString) if job.jobName != None: cmd_str += " -J %s" % job.jobName cmd_out, cmd_file = tempfile.mkstemp() cmd_out = open(cmd_file, 'w') cmd_out.write(job.cmd_str_from_user) cmd_out.close() cmd_str += " < " + cmd_file + "" #cmd_str += " '" + job.cmd_str_from_user + "'" if debug: print ">>> Farming via "+cmd_str else: cmd_str = job.cmd_str_from_user if debug: print ">>> Executing "+cmd_str return cmd_str def allJobsAreNamed(jobs): #print map(lambda x: x != None, map(FarmJob.getJobName, jobs)) return all(map(lambda x: x != None, map(FarmJob.getJobName, jobs))) def buildJobDependencyString(depJobs): if type(depJobs) == str: # we are all formally named depString = "ended(%s*)" % depJobs elif allJobsAreNamed(depJobs): # we are all formally named depString = "ended(%s)" % jobNameWildcard(depJobs) else: depString = "&&".join(map(lambda x: "ended(%s)" % x.jobID, depJobs)) return " -w \"%s\"" % depString def cmd(cmd_str_from_user, farm_queue=False, output_head=None, just_print_commands=False, outputFile = None, waitID = None, jobName = None, die_on_fail = False): """if farm_queue != False, submits to queue, other die_on_fail_msg: if != None, die on command failure (non-zero return) and show die_on_fail_msg""" if farm_queue: if outputFile <> None: farm_stdout = outputFile elif output_head <> None: farm_stdout = output_head+".stdout" else: #farm_stdout = None farm_stdout = "%J.lsf.output" cmd_str = "bsub -q "+farm_queue if farm_stdout <> None: cmd_str += " -o " + farm_stdout if waitID <> None: cmd_str += " -w \"ended(%s)\"" % (str(waitID)) if jobName <> None: cmd_str += " -J %s" % (jobName) cmd_str += " '"+cmd_str_from_user + "'" print ">>> Farming via "+cmd_str else: cmd_str = cmd_str_from_user print ">>> Executing "+cmd_str if just_print_commands or (globals().has_key("justPrintCommands") and globals().justPrintCommands): return -1 elif farm_queue: result = subprocess.Popen([cmd_str, ""], shell=True, stdout=subprocess.PIPE).communicate()[0] p = re.compile('Job <(\d+)> is submitted to queue') jobid = p.match(result).group(1) return jobid else: # Actually execute the command if we're not just in debugging output mode status = os.system(cmd_str) if not farm_queue: print "<<< Exit code:", status,"\n" if die_on_fail != None and status != 0: print "### Failed with exit code "+str(status)+" while executing command "+cmd_str_from_user sys.exit() return int(status) # ------------------------------------------------------------------------------------------ # # Unit testing! # # ------------------------------------------------------------------------------------------ class TestFarmCommands(unittest.TestCase): def setUp(self): print '' print '-' * 100 def testMakingJob1(self): print 'testMakingJob:' job = FarmJob("foo") executeJobs([job], just_print_commands = True) def testMakingJob2(self): print 'testMakingJob2:' job = FarmJob("bar", jobName = "barJobs", outputHead = "outputRoot", outputFile = "bar.log", dependencies = [], dieOnFail = True) executeJobs([job], "gsa", just_print_commands = True) def testDepJobs1(self): print 'testDepJobs1:' job1 = FarmJob("job1") job2 = FarmJob("job2") job3 = FarmJob("job3", dependencies = [job1, job2]) executeJobs([job1, job2, job3], "gsa", just_print_commands = True) def testDepJobs2(self): print 'testDepJobs2:' foojob1 = FarmJob("job1", jobName = "foojob1") foojob2 = FarmJob("job2", jobName = "foojob2") barjob1 = FarmJob("barjob1", jobName = "barjob1", dependencies = [foojob1]) barjob2 = FarmJob("barjob2", jobName = "barjob2", dependencies = [foojob1, foojob2]) bazjob1 = FarmJob("baz1", jobName = "baz1", dependencies = [barjob1, barjob2]) executeJobs([bazjob1], "gsa", just_print_commands = True) if __name__ == '__main__': unittest.main()