From ac9c335cd26f9866e741410bedb559c694f2b705 Mon Sep 17 00:00:00 2001 From: chartl Date: Wed, 31 Mar 2010 19:53:13 +0000 Subject: [PATCH] This is a python job dispatcher I've been using, which builds on Mark's FarmJob utility, and an example script of how I'm using it. Basically I wrote it to smartly break up analysis over an interval list, givin a maximum number of bases per job, a list of available queues, and a limit on each queue. It handles going over these limits in three ways: 1) [default]: Fail - No jobs are actually spawned 2) Space - User provides a string of the form A:B:C where A is the number of days to wait before scheduling jobs over the queue limits; B the number of hours, C the number of minutes. Exceeding the queue limits again will increment the space by another A:B:C 3) Stop-Resume - Spawns the maximum number of jobs, and writes a file describing the next job, and a hash code of the remaining jobs. The next time the script is run, it spawns the next set of jobs starting with what's written in the file. If the hash code (and thus the command string) changes between runs, the dispatcher fails-fast. The base job dispatcher class is also capable of dealing with dependencies if it is used correctly. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3102 348d0f76-0448-11de-a6fe-93d51630548a --- python/JobDispatcher.py | 393 +++++++++++++++++++++++++++++++++ python/JobDispatcherExample.py | 23 ++ 2 files changed, 416 insertions(+) create mode 100644 python/JobDispatcher.py create mode 100644 python/JobDispatcherExample.py diff --git a/python/JobDispatcher.py b/python/JobDispatcher.py new file mode 100644 index 000000000..16967e7d7 --- /dev/null +++ b/python/JobDispatcher.py @@ -0,0 +1,393 @@ +import os +import sys +import subprocess +import time +import re +import unittest +import tempfile + +MAX_UNNAMED_DEPENDENCIES = 0 +class FarmJob: + def __init__( self, cmd_str_from_user, jobName, projectName, delayTime = None, jobsDependingOnThis = None, + outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False, usingFiles = []): + self.cmd_str_from_user = cmd_str_from_user + self.jobName = jobName + self.projectName = projectName + self.jobsDependingOnThis = jobsDependingOnThis + self.outputFile = outputFile + self.dieOnFail = dieOnFail + self.delayTime = delayTime + self.dependencies = dependencies + if self.dependencies == None: + self.dependencies = [] + elif type(self.dependencies) != list: + self.dependencies = [self.dependencies] + self.dependencyNameString = dependencyNameString + self.filesToUse = usingFiles ## provides additional protection for stop-resume spawning by keeping a list of files + + if len(self.dependencies) > MAX_UNNAMED_DEPENDENCIES: + depNames = map(FarmJob.getJobName, self.dependencies) + if len(filter(None, depNames)) > 1 and len(self.dependencies) != len(filter(None, depNames)): + # there are some unnamed and some named deps + raise Exception("Bad job names -- some are named and some are unnamed", depName) + + self.jobID = None # None indicates currently unscheduled + self.executionString = None # currently unscheduled + self.executed = False + self.jobStatus = None + + def __hash__(self): + return self.cmd_str_from_user.__hash__() + + def setDelay(self,delayString): + self.delayTime = delayString + + def getJobName(self): + return self.jobName + + def getJobIDString(self): + if self.jobName == None: + if self.jobID == None: + return "UNNAMED" + else: + return str(self.jobID) + else: + return self.getJobName() + + def __str__(self): + return "[JOB: name=%s id=%s depending on (%s) with cmd=%s]" % (self.getJobName(), self.jobID, ','.join(map(FarmJob.getJobIDString, self.dependencies)), self.cmd_str_from_user) + + def getNumberOfDependencies(self): + return len(self.dependencies) + + def getNumberOfDependers(self): + if ( self.jobsDependingOnThis == None ): + return 0 + else: + return len(jobsDependingOnThis) + +def compareDependence(job1,job2): + if ( job1.getJobName() in job2.dependencies ): + return 1 + elif ( job2.getJobName() in job1.dependencies ): + return -1 + else: + return job1.getNumberOfDependers() - job2.getNumberOfDependers() + +class JobDispatchError(Exception): + + def __init__(self,value): + self.value = value + + def __str__(self): + return repr(self.value) + +def delayToGlobalTime(delayStr): + # delayStr is of the form 1:2:3 for 1 day 2 hrs 3 minutes + dayHrMin = delayStr.split(":") + additionalSecs = 24*60*60*int(dayHrMin[0]) + 60*60*int(dayHrMin[1]) + 60*int(dayHrMin[2]) + futureTime = time.localtime(time.time()+additionalSecs) + return ":".join([str(futureTime[0]),str(futureTime[1]),str(futureTime[2]),str(futureTime[3]),str(futureTime[4])]) + +def buildDependencyString(jobList): + dep = '\'' + paddedList = list() + for name in jobList: + paddedList.append('ended("'+name+'")') + dep += " && ".join(paddedList)+'\'' + return dep + +def buildSubmitString(farmJob,queue): + submitStr = "bsub -q "+queue + if ( farmJob.jobName != None ): + submitStr += " -J "+farmJob.jobName + if ( farmJob.projectName != None ): + submitStr += " -P "+farmJob.projectName + if ( farmJob.outputFile != None ): + submitStr += " -o "+farmJob.outputFile + if ( farmJob.delayTime != None ): + submitStr += " -b "+delayToGlobalTime(farmJob.delayTime) + if ( farmJob.dependencies != [] ): + submitStr += " -w "+buildDependencyString(farmJob.dependencies) + submitStr += " "+farmJob.cmd_str_from_user + return submitStr + +def writeResumeFile(jobList,start,max,filepath,specialHash = None): + print("Writing start was: "+str(start+max)) + try: + output = open(filepath,'w') + printToFile = True + except IOError: + print("Unable to open resume file, "+filepath+" dumping to stdout") + printToFile = False + hashVal = 0 + if ( specialHash != None ): + hashFunction = specialHash + else: + hashFunction = hash + for i in range(start,len(jobList)): + hashVal = hashVal ^ hashFunction(jobList[i]) + if ( printToFile ): + output.write(str(start+max)+"\n") + output.write(str(hashVal)+"\n") + output.write(jobList[start].cmd_str_from_user) + else: + print(str(start)) + print(str(hashVal)) + print(jobList[start].cmd_str_from_user) + +def writeResumeFileFinal(filepath): + output = open(filepath,'w') + output.write("ALL_JOBS_HAVE_BEEN_SPAWNED") + output.close() + +def checkResumeFile(filepath,jobs,specialHash = None): + if ( filepath == None or not os.path.exists(filepath) ): + return 0 + else: + input = open(filepath) + firstLine = input.readline() + if ( firstLine == "ALL_JOBS_HAVE_BEEN_SPAWNED" ): + raise JobDispatchError("All jobs for this project were spawned.") + else: + start = int(firstLine) + print("reading start was "+str(start)) + hashValToMatch = int(input.readline()) + input.close() + if ( specialHash == None ): + hashFunction = hash + else: + hashFunction = specialHash + hashVal = 0 + for i in range(start,len(jobs)): + hashVal = hashVal ^ hashFunction(jobs[i]) + if ( hashVal == hashValToMatch ): + return start + else: + str1="The commands for remaining jobs hashed to "+str(hashVal)+" but previous hash was "+str(hashValToMatch)+"." + str2="Please check the resume file "+filepath+" to see that the job command has not changed." + raise JobDispatchError(str1+" "+str2) + +def hashJobAndIntervals(farmJob): + fjhash = farmJob.__hash__() + interval_file = farmJob.filesToUse + intervals = list() + for line in open(interval_file[0]): + if ( not line.startswith("@") ): + spline = line.strip().split() + try: + intervals.append(Interval(spline[0],int(spline[1]),int(spline[2]))) + except IndexError: + print(line) + raise IndexError("List index out of range") + inthash = 0 + for interval in intervals: + inthash = inthash ^ interval.__hash__() + return fjhash ^ inthash + +class JobDispatcher: + def __init__(self,lsf_queues = ["long"], queue_limits = dict([["long",500]]), exceed_total_limit_action = "fail", print_only = False, action_string = None): + self.queues = lsf_queues + self.limits = queue_limits + self.action = exceed_total_limit_action + self.action_string = action_string + self.spawned = list() + if ( len(queue_limits) > 0 ): + self.maxJobs = sum(queue_limits.values()) + else: + self.maxJobs = -1 + self.print_only = print_only + + # external accessor, sorts by dependency and ensures user hasn't exceeded the set limits + def dispatchAll(self,farmJobs): + farmJobs.sort(compareDependence) + if ( self.maxJobs > -1 and len(farmJobs) > self.maxJobs ): + if ( self.action == "space" and self.action_string != None): + self._dispatchWithSpacing(farmJobs) + elif ( self.action == "resume" and self.action_string != None ): + self._dispatchWithStopResume(farmJobs) + else: + raise JobDispatchError("Number of jobs to dispatch, "+str(len(farmJobs))+", exceeds maximum ("+str(self.maxJobs)+").") + else: + self._dispatchAll(farmJobs) + + # internal accessor, loops over queues and dispatches jobs up to the limit + def _dispatchAll(self,farmJobs): + for queue in self.queues: + dispatchedToQueue = 0 + while ( dispatchedToQueue < self.limits[queue] and len(farmJobs) > 0 ): + farmJob = farmJobs.pop(0) + self._dispatch(farmJob,queue) + self.spawned.append(farmJob) + dispatchedToQueue = 1 + dispatchedToQueue + + # internal dispatch accessor; job limits dealt with at this point + def _dispatch(self,job,queue): + if ( self.print_only ): + print(buildSubmitString(job,queue)) + else: + lsf_response = subprocess.Popen([buildSubmitString(job,queue), ""], shell=True, stdout=subprocess.PIPE).communicate()[0] + p = re.compile('Job <(\d+)> is submitted to queue') + job.jobID = p.match(lsf_response).group(1) + print(lsf_response) + + # spaces jobs out by using the delay command to LSF + def _dispatchWithSpacing(self,farmJobs): + dayHrMin = self.action_string.split(":") + days = 0 + hours = 0 + mins = 0 + dispatchBuffer = list() + while ( len(farmJobs) > 0 ): + nextJob = farmJobs.pop(0) + nextJob.setDelay(":".join([str(days),str(hours),str(mins)])) + dispatchBuffer.append(nextJob) + if ( len(dispatchBuffer) >= self.maxJobs or len(farmJobs) == 0 ): + self._dispatchAll(dispatchBuffer) + days += int(dayHrMin[0]) + hours += int(dayHrMin[1]) + mins += int(dayHrMin[2]) + dispatchBuffer = list() + + # dispatches jobs up to the limit, outputs a file describing where to resume again + # if the file already exists, resumes at that job + # file also contains a hashcode for the remaining jobs -- if these don't match the + # hashcode calculated from farmJobs, dispatching is aborted via an exception + def _dispatchWithStopResume(self,farmJobs,jobHash = None): + startAt = checkResumeFile(self.action_string,farmJobs,jobHash) + dispatchBuffer = list() + while ( len(dispatchBuffer) < self.maxJobs and len(farmJobs) > startAt ): + dispatchBuffer.append(farmJobs.pop(startAt)) + self._dispatchAll(dispatchBuffer) + if ( len(farmJobs) > startAt ): + writeResumeFile(farmJobs,startAt,self.maxJobs,self.action_string,jobHash) + else: + writeResumeFileFinal(self.action_string) +class Interval: + def __init__(self,chrom,start,stop): + self.chromosome = chrom + self.start = start + self.stop = stop + + def size(self): + return self.stop - self.start + + def bedFormat(self): + return "\t".join([self.chromosome,str(self.start),str(self.stop),"+","target_whatever"]) + + def __str__(self): + return self.chromosome+":"+str(self.start)+"-"+str(self.stop) + + def __hash__(self): + return self.__str__().__hash__() + +class GATKDispatcher(JobDispatcher): + + def __init__(self,jarfile,memory,walker,args,output_directory,reference = None, bams = None, intervals = None, + queues = ["long"], limits = dict([["long",500]]), print_only = False, action = "fail", delay = None): + self.jarfile = jarfile + self.memory = memory + self.walker = walker + self.args = args + self.reference = reference + self.bams = bams + self.intervals = intervals + self.outputDir = output_directory + self.project = "GSA_GATK_Analysis" + if ( action == "resume" ): + action_string = self.outputDir+"GATKDispatcher/resumeJobs.txt" + elif ( action == "space" ): + action_string = delay + else: + action_string = None + JobDispatcher.__init__(self,queues,limits,action,print_only,action_string) + self.check() + self.baseCommand = "java -Xmx"+self.memory+" -jar "+self.jarfile+" -T "+self.walker+" "+args + + def check(self): + if ( not os.path.exists(self.jarfile) ): + raise JobDispatchError("The provided GATK jarfile "+str(self.jarfile)+" does not exist") + if ( self.intervals != None and not os.path.exists(self.intervals) ): + raise JobDispatchError("The provided interval list file, "+str(self.intervals)+" does not exist.") + if ( self.bams != None and not os.path.exists(self.bams) ): + raise JobDispatchError("The provided bam, or bam list, "+str(self.bams)+" does not exist.") + if ( self.reference == None ): + print("Warning: No reference supplied to GATKDispatcher. Most analyses require a reference.") + + def setProject(self,newProject): + self.project = newProject + + def addReadFilter(self,filter,filter_args = None): + if ( filter_args == None ): + self.baseCommand += " -rf "+filter + else: + raise JobDispatchError("GATK Dispatcher does not yet support read filters with arguments (e.g. blacklisting)") + + # remove bam files, reference, intervals, read filters, etc + def resetCommand(self): + self.baseCommand = "java -Xmx"+self.memory+" -jar "+self.jarfile+" -T "+self.walker+" "+args + + def dispatchByInterval(self,base_limit): + self.check() + dispatchCommand = self.baseCommand + " -R "+self.reference + if ( self.bams != None ): + dispatchCommand += " -I "+self.bams + intervals = open(self.intervals) + job_number = 0 + bases_for_job = 0 + intervals_for_job = list() + headerLines = list() + farmJobs = list() + if ( not os.path.exists(self.outputDir+"GATKDispatcher/") ): + os.mkdir(self.outputDir+"GATKDispatcher/") + for line in intervals: + if ( line.startswith("@") ): + headerLines.append(line) + else: + spline = line.strip().split() + chrom = spline[0] + start = int(spline[1]) + stop = int(spline[2]) + interval = Interval(chrom,start,stop) + intervals_for_job.append(interval) + bases_for_job += interval.size() + if ( bases_for_job > base_limit ): + farmJobs.append(self._buildIntervalJob(job_number,headerLines,intervals_for_job,dispatchCommand)) + intervals_for_job = list() + bases_for_job = 0 + job_number += 1 + if ( len(intervals_for_job) > 0 ): ## there's still some leftover + farmJobs.append(self._buildIntervalJob(job_number,headerLines,intervals_for_job,dispatchCommand)) + self.dispatchAll_Interval(farmJobs) + + def dispatchAll_Interval(self,jobs): + if ( self.action == "resume" ): + self._dispatchWithStopResume(jobs,hashJobAndIntervals) + else: + self.dispatchAll(jobs) + + def _buildIntervalJob(self,num,header,intervals,command): + job_dir = self.outputDir+"GATKDispatcher/dispatch"+str(num)+"/" + cmd = self.appendOutput(command,num,job_dir) + if ( not os.path.exists(job_dir) ): + os.mkdir(job_dir) + intFile = open(job_dir+"job"+str(num)+"_intervals.interval_list",'w') + intFile.write("".join(header)) + for interval in intervals: + intFile.write(interval.bedFormat()+"\n") + intFile.close() + cmd += " -L "+job_dir+"job"+str(num)+"_intervals.interval_list" + job = FarmJob(cmd,self.project+"_job"+str(num),self.project,None,None,job_dir+"bsub_out.txt",list(),None,False,[job_dir+"job"+str(num)+"_intervals.interval_list"]) + return job + + def appendOutput(self,command,num,job_dir): + if ( self.walker == "UnifiedGenotyper" ): + cmdToDispatch = command + " -varout "+job_dir+"job"+str(num)+"_calls.vcf" + elif ( self.walker == "CoverageStatistics" or self.walker == "DepthOfCoverage" ): + cmdToDispatch = command + " -o "+job_dir+"job"+str(num) + elif ( self.walker == "CombineDuplicates" or self.walker == "TableRecalibration" or self.walker == "ClipReads" ): + cmdToDispatch = command + " -o "+job_dir+"job"+str(num)+"_output.bam" + else: + cmdToDispatch = command + " -o "+job_dir+"job"+str(num)+".txt" + return cmdToDispatch + diff --git a/python/JobDispatcherExample.py b/python/JobDispatcherExample.py new file mode 100644 index 000000000..81fc3dcc1 --- /dev/null +++ b/python/JobDispatcherExample.py @@ -0,0 +1,23 @@ +import os +import JobDispatcher + +PRINT_ONLY = True +QUEUES = ["gsa","short","long"] +LIMITS = dict([["gsa",50],["short",20],["long",50]]) +ACTION = "space" +TIME_DIFF = "0:1:0" # 1 hour +RESULTS_DIR = "/humgen/gsa-hpprojects/dev/kiran/wholeExomeGeneCoverage/scratch/" +GATK_JAR = "/humgen/gsa-scr1/chartl/sting/dist/GenomeAnalysisTK.jar" +MEMORY = "2g" +WALKER = "CoverageStatistics" +REFERENCE = "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta" +BAM_FILES = RESULTS_DIR+"bam_files_use.list" +INTERVAL_LIST = "/seq/references/HybSelOligos/whole_exome_agilent_designed_120/whole_exome_agilent_designed_120.targets.interval_list" +MAX_BASES_PER_JOB = 100000 +ARGUMENTS = "-mmq 20 -mbq 10 -omitBaseOutput -dels -l INFO" + +dispatcher = JobDispatcher.GATKDispatcher(GATK_JAR,MEMORY,WALKER,ARGUMENTS,RESULTS_DIR,REFERENCE, + BAM_FILES,INTERVAL_LIST,QUEUES,LIMITS,PRINT_ONLY,ACTION,TIME_DIFF) + +dispatcher.setProject("GSA_WholeExome_CoverageByExon") +dispatcher.dispatchByInterval(MAX_BASES_PER_JOB)