2010-03-19 21:11:06 +08:00
#!/usr/bin/env python
import os
import sys
import subprocess
import re
import unittest
import tempfile
2010-04-26 20:34:04 +08:00
def all ( iterable ) :
for element in iterable :
if not element :
return False
return True
2010-03-19 21:11:06 +08:00
# maximum number of unnamed jobs allowed sa dependencies
MAX_UNNAMED_DEPENDENCIES = 10
class FarmJob :
def __init__ ( self , cmd_str_from_user , jobName = None , outputHead = None , outputFile = None , dependencies = [ ] , dependencyNameString = None , dieOnFail = False ) :
self . cmd_str_from_user = cmd_str_from_user
self . jobName = jobName
self . outputHead = outputHead
self . outputFile = outputFile
self . dieOnFail = dieOnFail
self . dependencies = dependencies
if self . dependencies == None :
self . dependencies = [ ]
elif type ( self . dependencies ) != list :
self . dependencies = [ self . dependencies ]
self . dependencyNameString = dependencyNameString
if len ( self . dependencies ) > MAX_UNNAMED_DEPENDENCIES :
depNames = map ( FarmJob . getJobName , self . dependencies )
if len ( filter ( None , depNames ) ) > 1 and len ( self . dependencies ) != len ( filter ( None , depNames ) ) :
# there are some unnamed and some named deps
raise Exception ( " Bad job names -- some are named and some are unnamed " , depName )
self . jobID = None # None indicates currently unscheduled
self . executionString = None # currently unscheduled
self . executed = False
self . jobStatus = None
def getJobName ( self ) :
return self . jobName
def getJobIDString ( self ) :
if self . jobName == None :
if self . jobID == None :
return " UNNAMED "
else :
return str ( self . jobID )
else :
return self . getJobName ( )
def __str__ ( self ) :
return " [JOB: name= %s id= %s depending on ( %s ) with cmd= %s ] " % ( self . getJobName ( ) , self . jobID , ' , ' . join ( map ( FarmJob . getJobIDString , self . dependencies ) ) , self . cmd_str_from_user )
# def __repr__(self):
# return self.__str__()
def longestCommonPrefix ( strings ) :
#print 'LCP', strings
if strings == [ ] :
return " "
else :
l = map ( len , strings )
shortestLen = min ( l )
#print '*arg', l, shortestLen
for i in range ( shortestLen ) :
c = strings [ 0 ] [ i ]
if not all ( map ( lambda s : c == s [ i ] , strings ) ) :
shortestLen = i
break
return strings [ 0 ] [ 0 : shortestLen ]
def jobNameWildcard ( jobs ) :
if len ( jobs ) == 1 :
return jobs [ 0 ] . getJobName ( )
else :
return longestCommonPrefix ( map ( FarmJob . getJobName , jobs ) ) + " * "
def executeJobs ( allJobs , farm_queue = None , just_print_commands = False , debug = True ) :
for job in allJobs :
if type ( job ) == list :
# convenience for lists of lists
map ( lambda x : executeJob ( x , farm_queue , just_print_commands , debug ) , job )
else :
print ' Preparing to execute ' , job
if not job . executed :
# schedule the dependents
executeJobs ( job . dependencies , farm_queue , just_print_commands , debug = debug )
executeJob ( job , farm_queue , just_print_commands , debug = debug )
print ' Executed ' , job
justPrintJobIDCounter = 1
def executeJob ( job , farm_queue = None , just_print_commands = False , debug = True , die_on_fail = True ) :
global justPrintJobIDCounter
job . executed = True
# build my execution string
job . executionString = buildExecutionString ( job , farm_queue , debug = debug )
if just_print_commands or ( globals ( ) . has_key ( " justPrintCommands " ) and globals ( ) . justPrintCommands ) :
job . jobID = justPrintJobIDCounter
justPrintJobIDCounter + = 1
elif farm_queue :
2010-05-12 21:37:39 +08:00
#print 'job.executionString', job.executionString
2010-03-19 21:11:06 +08:00
result = subprocess . Popen ( [ job . executionString , " " ] , shell = True , stdout = subprocess . PIPE ) . communicate ( ) [ 0 ]
p = re . compile ( ' Job <( \ d+)> is submitted to queue ' )
job . jobID = p . match ( result ) . group ( 1 )
else :
# Actually execute the command if we're not just in debugging output mode
status = os . system ( job . executionString )
if not farm_queue :
print " <<< Exit code: " , status , " \n "
if die_on_fail != None and status != 0 :
print " ### Failed with exit code " + str ( status ) + " while executing command " + job . cmd_str_from_user
sys . exit ( )
job . jobStatus = int ( status )
def buildExecutionString ( job , farm_queue = None , debug = True ) :
if farm_queue != None :
if job . outputFile != None :
farm_stdout = job . outputFile
elif job . outputHead != None :
farm_stdout = job . outputHead + " .stdout "
else :
#farm_stdout = None
farm_stdout = " % J.lsf.output "
cmd_str = " bsub -q " + farm_queue
if farm_stdout != None :
cmd_str + = " -o " + farm_stdout
# fixme
if job . dependencies != [ ] :
cmd_str + = buildJobDependencyString ( job . dependencies )
if job . dependencyNameString != None :
cmd_str + = buildJobDependencyString ( job . dependencyNameString )
if job . jobName != None :
cmd_str + = " -J %s " % job . jobName
cmd_out , cmd_file = tempfile . mkstemp ( )
cmd_out = open ( cmd_file , ' w ' )
cmd_out . write ( job . cmd_str_from_user )
cmd_out . close ( )
cmd_str + = " < " + cmd_file + " "
#cmd_str += " '" + job.cmd_str_from_user + "'"
if debug : print " >>> Farming via " + cmd_str
else :
cmd_str = job . cmd_str_from_user
if debug : print " >>> Executing " + cmd_str
return cmd_str
def allJobsAreNamed ( jobs ) :
#print map(lambda x: x != None, map(FarmJob.getJobName, jobs))
return all ( map ( lambda x : x != None , map ( FarmJob . getJobName , jobs ) ) )
def buildJobDependencyString ( depJobs ) :
if type ( depJobs ) == str :
# we are all formally named
depString = " ended( %s *) " % depJobs
elif allJobsAreNamed ( depJobs ) :
# we are all formally named
depString = " ended( %s ) " % jobNameWildcard ( depJobs )
else :
depString = " && " . join ( map ( lambda x : " ended( %s ) " % x . jobID , depJobs ) )
return " -w \" %s \" " % depString
def cmd ( cmd_str_from_user , farm_queue = False , output_head = None , just_print_commands = False , outputFile = None , waitID = None , jobName = None , die_on_fail = False ) :
""" if farm_queue != False, submits to queue, other
die_on_fail_msg : if != None , die on command failure ( non - zero return ) and show die_on_fail_msg """
if farm_queue :
if outputFile < > None :
farm_stdout = outputFile
elif output_head < > None :
farm_stdout = output_head + " .stdout "
else :
#farm_stdout = None
farm_stdout = " % J.lsf.output "
cmd_str = " bsub -q " + farm_queue
if farm_stdout < > None :
cmd_str + = " -o " + farm_stdout
if waitID < > None :
cmd_str + = " -w \" ended( %s ) \" " % ( str ( waitID ) )
if jobName < > None :
cmd_str + = " -J %s " % ( jobName )
cmd_str + = " ' " + cmd_str_from_user + " ' "
print " >>> Farming via " + cmd_str
else :
cmd_str = cmd_str_from_user
print " >>> Executing " + cmd_str
if just_print_commands or ( globals ( ) . has_key ( " justPrintCommands " ) and globals ( ) . justPrintCommands ) :
return - 1
elif farm_queue :
result = subprocess . Popen ( [ cmd_str , " " ] , shell = True , stdout = subprocess . PIPE ) . communicate ( ) [ 0 ]
p = re . compile ( ' Job <( \ d+)> is submitted to queue ' )
jobid = p . match ( result ) . group ( 1 )
return jobid
else :
# Actually execute the command if we're not just in debugging output mode
status = os . system ( cmd_str )
if not farm_queue :
print " <<< Exit code: " , status , " \n "
if die_on_fail != None and status != 0 :
print " ### Failed with exit code " + str ( status ) + " while executing command " + cmd_str_from_user
sys . exit ( )
return int ( status )
# ------------------------------------------------------------------------------------------
#
# Unit testing!
#
# ------------------------------------------------------------------------------------------
class TestFarmCommands ( unittest . TestCase ) :
def setUp ( self ) :
print ' '
print ' - ' * 100
def testMakingJob1 ( self ) :
print ' testMakingJob: '
job = FarmJob ( " foo " )
executeJobs ( [ job ] , just_print_commands = True )
def testMakingJob2 ( self ) :
print ' testMakingJob2: '
job = FarmJob ( " bar " , jobName = " barJobs " , outputHead = " outputRoot " , outputFile = " bar.log " , dependencies = [ ] , dieOnFail = True )
executeJobs ( [ job ] , " gsa " , just_print_commands = True )
def testDepJobs1 ( self ) :
print ' testDepJobs1: '
job1 = FarmJob ( " job1 " )
job2 = FarmJob ( " job2 " )
job3 = FarmJob ( " job3 " , dependencies = [ job1 , job2 ] )
executeJobs ( [ job1 , job2 , job3 ] , " gsa " , just_print_commands = True )
def testDepJobs2 ( self ) :
print ' testDepJobs2: '
foojob1 = FarmJob ( " job1 " , jobName = " foojob1 " )
foojob2 = FarmJob ( " job2 " , jobName = " foojob2 " )
barjob1 = FarmJob ( " barjob1 " , jobName = " barjob1 " , dependencies = [ foojob1 ] )
barjob2 = FarmJob ( " barjob2 " , jobName = " barjob2 " , dependencies = [ foojob1 , foojob2 ] )
bazjob1 = FarmJob ( " baz1 " , jobName = " baz1 " , dependencies = [ barjob1 , barjob2 ] )
executeJobs ( [ bazjob1 ] , " gsa " , just_print_commands = True )
if __name__ == ' __main__ ' :
unittest . main ( )