import os import sys import subprocess import time import re import unittest import tempfile import RefseqLibrary MAX_UNNAMED_DEPENDENCIES = 0 class FarmJob: def __init__( self, cmd_str_from_user, jobName, projectName, delayTime = None, jobsDependingOnThis = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False, usingFiles = []): self.cmd_str_from_user = cmd_str_from_user self.jobName = jobName self.projectName = projectName self.jobsDependingOnThis = jobsDependingOnThis self.outputFile = outputFile self.dieOnFail = dieOnFail self.delayTime = delayTime self.dependencies = dependencies if self.dependencies == None: self.dependencies = [] elif type(self.dependencies) != list: self.dependencies = [self.dependencies] self.dependencyNameString = dependencyNameString self.filesToUse = usingFiles ## provides additional protection for stop-resume spawning by keeping a list of files if len(self.dependencies) > MAX_UNNAMED_DEPENDENCIES: depNames = map(FarmJob.getJobName, self.dependencies) if len(filter(None, depNames)) > 1 and len(self.dependencies) != len(filter(None, depNames)): # there are some unnamed and some named deps raise Exception("Bad job names -- some are named and some are unnamed", depName) self.jobID = None # None indicates currently unscheduled self.executionString = None # currently unscheduled self.executed = False self.jobStatus = None def __hash__(self): return self.cmd_str_from_user.__hash__() def setDelay(self,delayString): self.delayTime = delayString def getJobName(self): return self.jobName def getJobIDString(self): if self.jobName == None: if self.jobID == None: return "UNNAMED" else: return str(self.jobID) else: return self.getJobName() def __str__(self): return "[JOB: name=%s id=%s depending on (%s) with cmd=%s]" % (self.getJobName(), self.jobID, ','.join(map(FarmJob.getJobIDString, self.dependencies)), self.cmd_str_from_user) def getNumberOfDependencies(self): return len(self.dependencies) def getNumberOfDependers(self): if ( self.jobsDependingOnThis == None ): return 0 else: return len(jobsDependingOnThis) def compareDependence(job1,job2): if ( job1.getJobName() in job2.dependencies ): return 1 elif ( job2.getJobName() in job1.dependencies ): return -1 else: return job1.getNumberOfDependers() - job2.getNumberOfDependers() class JobDispatchError(Exception): def __init__(self,value): self.value = value def __str__(self): return repr(self.value) def delayToGlobalTime(delayStr): # delayStr is of the form 1:2:3 for 1 day 2 hrs 3 minutes dayHrMin = delayStr.split(":") additionalSecs = 24*60*60*int(dayHrMin[0]) + 60*60*int(dayHrMin[1]) + 60*int(dayHrMin[2]) futureTime = time.localtime(time.time()+additionalSecs) return ":".join([str(futureTime[0]),str(futureTime[1]),str(futureTime[2]),str(futureTime[3]),str(futureTime[4])]) def buildDependencyString(jobList): dep = '\'' paddedList = list() for name in jobList: paddedList.append('ended("'+name+'")') dep += " && ".join(paddedList)+'\'' return dep def buildSubmitString(farmJob,queue): submitStr = "bsub -q "+queue if ( farmJob.jobName != None ): submitStr += " -J "+farmJob.jobName if ( farmJob.projectName != None ): submitStr += " -P "+farmJob.projectName if ( farmJob.outputFile != None ): submitStr += " -o "+farmJob.outputFile if ( farmJob.delayTime != None ): submitStr += " -b "+delayToGlobalTime(farmJob.delayTime) if ( farmJob.dependencies != [] ): submitStr += " -w "+buildDependencyString(farmJob.dependencies) submitStr += " "+farmJob.cmd_str_from_user return submitStr def writeResumeFile(jobList,start,max,filepath,specialHash = None): print("Writing start was: "+str(start+max)) try: output = open(filepath,'w') printToFile = True except IOError: print("Unable to open resume file, "+filepath+" dumping to stdout") printToFile = False hashVal = 0 if ( specialHash != None ): hashFunction = specialHash else: hashFunction = hash for i in range(start,len(jobList)): hashVal = hashVal ^ hashFunction(jobList[i]) if ( printToFile ): output.write(str(start+max)+"\n") output.write(str(hashVal)+"\n") output.write(jobList[start].cmd_str_from_user) else: print(str(start)) print(str(hashVal)) print(jobList[start].cmd_str_from_user) def writeResumeFileFinal(filepath): output = open(filepath,'w') output.write("ALL_JOBS_HAVE_BEEN_SPAWNED") output.close() def checkResumeFile(filepath,jobs,specialHash = None): if ( filepath == None or not os.path.exists(filepath) ): return 0 else: input = open(filepath) firstLine = input.readline() if ( firstLine == "ALL_JOBS_HAVE_BEEN_SPAWNED" ): raise JobDispatchError("All jobs for this project were spawned.") else: start = int(firstLine) print("reading start was "+str(start)) hashValToMatch = int(input.readline()) input.close() if ( specialHash == None ): hashFunction = hash else: hashFunction = specialHash hashVal = 0 for i in range(start,len(jobs)): hashVal = hashVal ^ hashFunction(jobs[i]) if ( hashVal == hashValToMatch ): return start else: str1="The commands for remaining jobs hashed to "+str(hashVal)+" but previous hash was "+str(hashValToMatch)+"." str2="Please check the resume file "+filepath+" to see that the job command has not changed." raise JobDispatchError(str1+" "+str2) def hashJobAndIntervals(farmJob): fjhash = farmJob.__hash__() interval_file = farmJob.filesToUse intervals = list() for line in open(interval_file[0]): if ( not line.startswith("@") ): spline = line.strip().split() try: intervals.append(Interval(spline[0],int(spline[1]),int(spline[2]))) except IndexError: print(line) raise IndexError("List index out of range") inthash = 0 for interval in intervals: inthash = inthash ^ interval.__hash__() return fjhash ^ inthash class JobDispatcher: def __init__(self,lsf_queues = ["long"], queue_limits = dict([["long",500]]), exceed_total_limit_action = "fail", print_only = False, action_string = None): self.queues = lsf_queues self.limits = queue_limits self.action = exceed_total_limit_action self.action_string = action_string self.spawned = list() if ( len(queue_limits) > 0 ): self.maxJobs = sum(queue_limits.values()) else: self.maxJobs = -1 self.print_only = print_only self.startDays = 0 self.startHours = 0 self.startMins = 0 def setInitialDelay(self,delay): dhm = delay.split(":") self.startDays = int(dhm[0]) self.startHours = int(dhm[1]) self.startMins = int(dhm[2]) # external accessor, sorts by dependency and ensures user hasn't exceeded the set limits def dispatchAll(self,farmJobs): farmJobs.sort(compareDependence) if ( self.maxJobs > -1 and len(farmJobs) > self.maxJobs ): if ( self.action == "space" and self.action_string != None): self._dispatchWithSpacing(farmJobs) elif ( self.action == "resume" and self.action_string != None ): self._dispatchWithStopResume(farmJobs) else: raise JobDispatchError("Number of jobs to dispatch, "+str(len(farmJobs))+", exceeds maximum ("+str(self.maxJobs)+").") else: self._dispatchAll(farmJobs) # internal accessor, loops over queues and dispatches jobs up to the limit def _dispatchAll(self,farmJobs): for queue in self.queues: dispatchedToQueue = 0 while ( dispatchedToQueue < self.limits[queue] and len(farmJobs) > 0 ): farmJob = farmJobs.pop(0) self._dispatch(farmJob,queue) self.spawned.append(farmJob) dispatchedToQueue = 1 + dispatchedToQueue # internal dispatch accessor; job limits dealt with at this point def _dispatch(self,job,queue): if ( self.print_only ): print(buildSubmitString(job,queue)) else: lsf_response = subprocess.Popen([buildSubmitString(job,queue), ""], shell=True, stdout=subprocess.PIPE).communicate()[0] p = re.compile('Job <(\d+)> is submitted to queue') job.jobID = p.match(lsf_response).group(1) print(lsf_response) # spaces jobs out by using the delay command to LSF def _dispatchWithSpacing(self,farmJobs): dayHrMin = self.action_string.split(":") days = self.startDays hours = self.startHours mins = self.startMins dispatchBuffer = list() while ( len(farmJobs) > 0 ): nextJob = farmJobs.pop(0) nextJob.setDelay(":".join([str(days),str(hours),str(mins)])) dispatchBuffer.append(nextJob) if ( len(dispatchBuffer) >= self.maxJobs or len(farmJobs) == 0 ): self._dispatchAll(dispatchBuffer) days += int(dayHrMin[0]) hours += int(dayHrMin[1]) mins += int(dayHrMin[2]) dispatchBuffer = list() # dispatches jobs up to the limit, outputs a file describing where to resume again # if the file already exists, resumes at that job # file also contains a hashcode for the remaining jobs -- if these don't match the # hashcode calculated from farmJobs, dispatching is aborted via an exception def _dispatchWithStopResume(self,farmJobs,jobHash = None): startAt = checkResumeFile(self.action_string,farmJobs,jobHash) dispatchBuffer = list() while ( len(dispatchBuffer) < self.maxJobs and len(farmJobs) > startAt ): dispatchBuffer.append(farmJobs.pop(startAt)) self._dispatchAll(dispatchBuffer) if ( len(farmJobs) > startAt ): writeResumeFile(farmJobs,startAt,self.maxJobs,self.action_string,jobHash) else: writeResumeFileFinal(self.action_string) class Interval: def __init__(self,chrom,start,stop): self.chromosome = chrom self.start = start self.stop = stop def size(self): return self.stop - self.start def bedFormat(self): return "\t".join([self.chromosome,str(self.start),str(self.stop),"+","target_whatever"]) def __str__(self): return self.chromosome+":"+str(self.start)+"-"+str(self.stop) def __hash__(self): return self.__str__().__hash__() class GATKDispatcher(JobDispatcher): def __init__(self,jarfile,memory,walker,args,output_directory,reference = None, bams = None, intervals = None, queues = ["long"], limits = dict([["long",500]]), print_only = False, action = "fail", delay = None): self.jarfile = jarfile self.memory = memory self.walker = walker self.args = args self.reference = reference self.bams = bams self.intervals = intervals self.outputDir = output_directory self.project = "GSA_GATK_Analysis" if ( action == "resume" ): action_string = self.outputDir+"GATKDispatcher/resumeJobs.txt" elif ( action == "space" ): action_string = delay else: action_string = None JobDispatcher.__init__(self,queues,limits,action,print_only,action_string) self.check() self.baseCommand = "java -Xmx"+self.memory+" -jar "+self.jarfile+" -T "+self.walker+" "+args def check(self): if ( not os.path.exists(self.jarfile) ): raise JobDispatchError("The provided GATK jarfile "+str(self.jarfile)+" does not exist") if ( self.intervals != None and not os.path.exists(self.intervals) ): raise JobDispatchError("The provided interval list file, "+str(self.intervals)+" does not exist.") if ( self.bams != None and not os.path.exists(self.bams) ): raise JobDispatchError("The provided bam, or bam list, "+str(self.bams)+" does not exist.") if ( self.reference == None ): print("Warning: No reference supplied to GATKDispatcher. Most analyses require a reference.") def setProject(self,newProject): self.project = newProject def addReadFilter(self,filter,filter_args = None): if ( filter_args == None ): self.baseCommand += " -rf "+filter else: raise JobDispatchError("GATK Dispatcher does not yet support read filters with arguments (e.g. blacklisting)") # remove bam files, reference, intervals, read filters, etc def resetCommand(self): self.baseCommand = "java -Xmx"+self.memory+" -jar "+self.jarfile+" -T "+self.walker+" "+args def dispatchByInterval(self,base_limit): self.check() dispatchCommand = self.baseCommand + " -R "+self.reference if ( self.bams != None ): dispatchCommand += " -I "+self.bams intervals = open(self.intervals) job_number = 0 bases_for_job = 0 intervals_for_job = list() headerLines = list() farmJobs = list() if ( not os.path.exists(self.outputDir+"GATKDispatcher/") ): os.mkdir(self.outputDir+"GATKDispatcher/") for line in intervals: if ( line.startswith("@") ): headerLines.append(line) else: spline = line.strip().split() chrom = spline[0] start = int(spline[1]) stop = int(spline[2]) interval = Interval(chrom,start,stop) intervals_for_job.append(interval) bases_for_job += interval.size() if ( bases_for_job > base_limit ): farmJobs.append(self._buildIntervalJob(job_number,headerLines,intervals_for_job,dispatchCommand)) intervals_for_job = list() bases_for_job = 0 job_number += 1 if ( len(intervals_for_job) > 0 ): ## there's still some leftover farmJobs.append(self._buildIntervalJob(job_number,headerLines,intervals_for_job,dispatchCommand)) self.dispatchAll_Interval(farmJobs) def dispatchAll_Interval(self,jobs): if ( self.action == "resume" ): self._dispatchWithStopResume(jobs,hashJobAndIntervals) else: self.dispatchAll(jobs) def _buildIntervalJob(self,num,header,intervals,command): job_dir = self.outputDir+"GATKDispatcher/dispatch"+str(num)+"/" cmd = self.appendOutput(command,num,job_dir) if ( not os.path.exists(job_dir) ): os.mkdir(job_dir) intFile = open(job_dir+"job"+str(num)+"_intervals.interval_list",'w') intFile.write("".join(header)) for interval in intervals: intFile.write(interval.bedFormat()+"\n") intFile.close() cmd += " -L "+job_dir+"job"+str(num)+"_intervals.interval_list" job = FarmJob(cmd,self.project+"_job"+str(num),self.project,None,None,job_dir+"bsub_out.txt",list(),None,False,[job_dir+"job"+str(num)+"_intervals.interval_list"]) return job def appendOutput(self,command,num,job_dir): if ( self.walker == "UnifiedGenotyper" ): cmdToDispatch = command + " -varout "+job_dir+"job"+str(num)+"_calls.vcf" elif ( self.walker == "CoverageStatistics" or self.walker == "DepthOfCoverage" ): cmdToDispatch = command + " -o "+job_dir+"job"+str(num) elif ( self.walker == "CombineDuplicates" or self.walker == "TableRecalibration" or self.walker == "ClipReads" ): cmdToDispatch = command + " -o "+job_dir+"job"+str(num)+"_output.bam" else: cmdToDispatch = command + " -o "+job_dir+"job"+str(num)+".txt" return cmdToDispatch class GeneDispatcher(GATKDispatcher): def __init__(self,geneNames,jarfile,memory,walker,args,output_directory,reference = None, bams = None, queues = ["long"], limits = dict([["long",500]]), print_only = False, delay = "0:1:0"): JobDispatcher.GATKDispatcher.__init__(self,jarfile,memory,walker,args,output_directory,reference,bams,None,queues, limits,print_only,"space",delay) self.genes = RefseqLibrary.getRefseqGenes(geneNames) def dispatchByInterval(self,base_limit): raise JobDispatchError("Dispatch by interval not permitted from GeneDispatcher") def dispatchByGene(self): dispatchCommand = self.baseCommand + " -R "+self.reference farmJobs = list() jobNumber = "" headerLines = RefseqLibrary.getIntervalHeaderLines() if ( not os.path.exists(self.outputDir+"GATKDispatcher/") ): os.mkdir(self.outputDir+"GATKDispatcher/") if ( self.bams != None ): dispatchCommand += " -I "+self.bams for gene in self.genes: jobNumber = "_"+gene.getGeneName() intervals = gene.getExonIntervals() farmJobs.append(self._buildIntervalJob(jobNumber,headerLines,intervals,dispatchCommand)) self.dispatchAll_Interval(farmJobs)