better farm commands, and simple utility to convert ucsc repeat masks to interval lists

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3040 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2010-03-19 13:11:06 +00:00
parent 03480c955c
commit 08d9ae403d
2 changed files with 405 additions and 0 deletions

View File

@ -0,0 +1,259 @@
#!/usr/bin/env python
import os
import sys
import subprocess
import re
import unittest
import tempfile
# maximum number of unnamed jobs allowed sa dependencies
MAX_UNNAMED_DEPENDENCIES = 10
class FarmJob:
def __init__( self, cmd_str_from_user, jobName = None, outputHead = None, outputFile = None, dependencies = [], dependencyNameString = None, dieOnFail = False):
self.cmd_str_from_user = cmd_str_from_user
self.jobName = jobName
self.outputHead = outputHead
self.outputFile = outputFile
self.dieOnFail = dieOnFail
self.dependencies = dependencies
if self.dependencies == None:
self.dependencies = []
elif type(self.dependencies) != list:
self.dependencies = [self.dependencies]
self.dependencyNameString = dependencyNameString
if len(self.dependencies) > MAX_UNNAMED_DEPENDENCIES:
depNames = map(FarmJob.getJobName, self.dependencies)
if len(filter(None, depNames)) > 1 and len(self.dependencies) != len(filter(None, depNames)):
# there are some unnamed and some named deps
raise Exception("Bad job names -- some are named and some are unnamed", depName)
self.jobID = None # None indicates currently unscheduled
self.executionString = None # currently unscheduled
self.executed = False
self.jobStatus = None
def getJobName(self):
return self.jobName
def getJobIDString(self):
if self.jobName == None:
if self.jobID == None:
return "UNNAMED"
else:
return str(self.jobID)
else:
return self.getJobName()
def __str__(self):
return "[JOB: name=%s id=%s depending on (%s) with cmd=%s]" % (self.getJobName(), self.jobID, ','.join(map(FarmJob.getJobIDString, self.dependencies)), self.cmd_str_from_user)
# def __repr__(self):
# return self.__str__()
def longestCommonPrefix(strings):
#print 'LCP', strings
if strings == []:
return ""
else:
l = map(len, strings)
shortestLen = min(l)
#print '*arg', l, shortestLen
for i in range(shortestLen):
c = strings[0][i]
if not all(map(lambda s: c == s[i], strings)):
shortestLen = i
break
return strings[0][0:shortestLen]
def jobNameWildcard(jobs):
if len(jobs) == 1:
return jobs[0].getJobName()
else:
return longestCommonPrefix(map(FarmJob.getJobName, jobs)) + "*"
def executeJobs(allJobs, farm_queue = None, just_print_commands = False, debug = True):
for job in allJobs:
if type(job) == list:
# convenience for lists of lists
map( lambda x: executeJob(x, farm_queue, just_print_commands, debug), job)
else:
print 'Preparing to execute', job
if not job.executed:
# schedule the dependents
executeJobs(job.dependencies, farm_queue, just_print_commands, debug = debug)
executeJob(job, farm_queue, just_print_commands, debug = debug)
print 'Executed', job
justPrintJobIDCounter = 1
def executeJob(job, farm_queue = None, just_print_commands = False, debug = True, die_on_fail = True):
global justPrintJobIDCounter
job.executed = True
# build my execution string
job.executionString = buildExecutionString(job, farm_queue, debug = debug)
if just_print_commands or (globals().has_key("justPrintCommands") and globals().justPrintCommands):
job.jobID = justPrintJobIDCounter
justPrintJobIDCounter += 1
elif farm_queue:
print 'job.executionString', job.executionString
result = subprocess.Popen([job.executionString, ""], shell=True, stdout=subprocess.PIPE).communicate()[0]
p = re.compile('Job <(\d+)> is submitted to queue')
job.jobID = p.match(result).group(1)
else:
# Actually execute the command if we're not just in debugging output mode
status = os.system(job.executionString)
if not farm_queue:
print "<<< Exit code:", status,"\n"
if die_on_fail != None and status != 0:
print "### Failed with exit code "+str(status)+" while executing command "+job.cmd_str_from_user
sys.exit()
job.jobStatus = int(status)
def buildExecutionString(job, farm_queue = None, debug = True):
if farm_queue != None:
if job.outputFile != None:
farm_stdout = job.outputFile
elif job.outputHead != None:
farm_stdout = job.outputHead + ".stdout"
else:
#farm_stdout = None
farm_stdout = "%J.lsf.output"
cmd_str = "bsub -q " + farm_queue
if farm_stdout != None:
cmd_str += " -o " + farm_stdout
# fixme
if job.dependencies != []:
cmd_str += buildJobDependencyString(job.dependencies)
if job.dependencyNameString != None:
cmd_str += buildJobDependencyString(job.dependencyNameString)
if job.jobName != None:
cmd_str += " -J %s" % job.jobName
cmd_out, cmd_file = tempfile.mkstemp()
cmd_out = open(cmd_file, 'w')
cmd_out.write(job.cmd_str_from_user)
cmd_out.close()
cmd_str += " < " + cmd_file + ""
#cmd_str += " '" + job.cmd_str_from_user + "'"
if debug: print ">>> Farming via "+cmd_str
else:
cmd_str = job.cmd_str_from_user
if debug: print ">>> Executing "+cmd_str
return cmd_str
def allJobsAreNamed(jobs):
#print map(lambda x: x != None, map(FarmJob.getJobName, jobs))
return all(map(lambda x: x != None, map(FarmJob.getJobName, jobs)))
def buildJobDependencyString(depJobs):
if type(depJobs) == str:
# we are all formally named
depString = "ended(%s*)" % depJobs
elif allJobsAreNamed(depJobs):
# we are all formally named
depString = "ended(%s)" % jobNameWildcard(depJobs)
else:
depString = "&&".join(map(lambda x: "ended(%s)" % x.jobID, depJobs))
return " -w \"%s\"" % depString
def cmd(cmd_str_from_user, farm_queue=False, output_head=None, just_print_commands=False, outputFile = None, waitID = None, jobName = None, die_on_fail = False):
"""if farm_queue != False, submits to queue, other
die_on_fail_msg: if != None, die on command failure (non-zero return) and show die_on_fail_msg"""
if farm_queue:
if outputFile <> None:
farm_stdout = outputFile
elif output_head <> None:
farm_stdout = output_head+".stdout"
else:
#farm_stdout = None
farm_stdout = "%J.lsf.output"
cmd_str = "bsub -q "+farm_queue
if farm_stdout <> None:
cmd_str += " -o " + farm_stdout
if waitID <> None:
cmd_str += " -w \"ended(%s)\"" % (str(waitID))
if jobName <> None:
cmd_str += " -J %s" % (jobName)
cmd_str += " '"+cmd_str_from_user + "'"
print ">>> Farming via "+cmd_str
else:
cmd_str = cmd_str_from_user
print ">>> Executing "+cmd_str
if just_print_commands or (globals().has_key("justPrintCommands") and globals().justPrintCommands):
return -1
elif farm_queue:
result = subprocess.Popen([cmd_str, ""], shell=True, stdout=subprocess.PIPE).communicate()[0]
p = re.compile('Job <(\d+)> is submitted to queue')
jobid = p.match(result).group(1)
return jobid
else:
# Actually execute the command if we're not just in debugging output mode
status = os.system(cmd_str)
if not farm_queue:
print "<<< Exit code:", status,"\n"
if die_on_fail != None and status != 0:
print "### Failed with exit code "+str(status)+" while executing command "+cmd_str_from_user
sys.exit()
return int(status)
# ------------------------------------------------------------------------------------------
#
# Unit testing!
#
# ------------------------------------------------------------------------------------------
class TestFarmCommands(unittest.TestCase):
def setUp(self):
print ''
print '-' * 100
def testMakingJob1(self):
print 'testMakingJob:'
job = FarmJob("foo")
executeJobs([job], just_print_commands = True)
def testMakingJob2(self):
print 'testMakingJob2:'
job = FarmJob("bar", jobName = "barJobs", outputHead = "outputRoot", outputFile = "bar.log", dependencies = [], dieOnFail = True)
executeJobs([job], "gsa", just_print_commands = True)
def testDepJobs1(self):
print 'testDepJobs1:'
job1 = FarmJob("job1")
job2 = FarmJob("job2")
job3 = FarmJob("job3", dependencies = [job1, job2])
executeJobs([job1, job2, job3], "gsa", just_print_commands = True)
def testDepJobs2(self):
print 'testDepJobs2:'
foojob1 = FarmJob("job1", jobName = "foojob1")
foojob2 = FarmJob("job2", jobName = "foojob2")
barjob1 = FarmJob("barjob1", jobName = "barjob1", dependencies = [foojob1])
barjob2 = FarmJob("barjob2", jobName = "barjob2", dependencies = [foojob1, foojob2])
bazjob1 = FarmJob("baz1", jobName = "baz1", dependencies = [barjob1, barjob2])
executeJobs([bazjob1], "gsa", just_print_commands = True)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,146 @@
import farm_commands
import os.path
import sys
from optparse import OptionParser
from datetime import date
import glob
import operator
import faiReader
import math
import shutil
IS_OFFSET = True
CHR_OFFSET = 5
START_OFFSET = 6
END_OFFSET = 7
TYPE_OFFSET = 11
class RepeatInfo:
def __init__(self, name):
self.name = name
self.count = 0
self.coveredBases = 0
def badChr(excludes, chr):
if excludes == None:
return False
for exclude in excludes:
if chr.find(exclude) != -1:
return True
return False
def main():
global OPTIONS
usage = "usage: %prog stage [options]"
parser = OptionParser(usage=usage)
# parser.add_option("-q", "--farm", dest="farmQueue",
# type="string", default=None,
# help="Farm queue to send processing jobs to")
parser.add_option("", "--header", dest="header",
type='string', default=None,
help="interval_list file")
parser.add_option("-o", "--output", dest="output",
type='string', default=None,
help="output interval_list filename")
parser.add_option("-r", "--ref", dest="ref",
type='string', default=None,
help="reference name -- either hg18 or b36")
parser.add_option("-x", "--exclude", dest="excludes",
action="append", type='string',
help="If provided, only run pipeline for this sample")
parser.add_option("-m", "--maxRecords", dest="maxRecords",
type='int', default=None,
help="If provided, max. number of records to process")
parser.add_option("-s", "--skip", dest="skip",
type='int', default=None,
help="If provided, only process every skip records")
parser.add_option("", "--excludeChr", dest="excludeChrs",
action="append", type='string',
help="If provided, don't include chr matching this string")
(OPTIONS, args) = parser.parse_args()
if len(args) != 1:
parser.error("incorrect number of arguments")
repeatFile = args[0]
# open the output file
out = open(OPTIONS.output, 'w')
# write out the header
for line in open(OPTIONS.header):
out.write(line)
info = dict()
nRecords = 1
i = 1
for line in open(repeatFile):
if OPTIONS.maxRecords != None and nRecords > OPTIONS.maxRecords:
break
if len(line) == 0 or line[0] == '#':
continue
parts = line.split()
type = parts[TYPE_OFFSET]
start = int(parts[START_OFFSET]) + 1
end = int(parts[END_OFFSET]) + 1
chr = parts[CHR_OFFSET]
if OPTIONS.ref == 'b36':
chr = chr.replace('chrM', 'chrMT')
chr = chr.replace('chr', '')
if (OPTIONS.excludes == None or type not in OPTIONS.excludes) and not badChr(OPTIONS.excludeChrs, chr):
name = 'repeat_' + str(i) + '_' + type
strand = '+'
if OPTIONS.skip == None or i % OPTIONS.skip == 0:
print >> out, '\t'.join(map(str, [chr, start, end, strand, name]))
nRecords += 1
i += 1
if type not in info:
info[type] = RepeatInfo(type)
typeInfo = info[type]
typeInfo.count += 1
typeInfo.coveredBases += end - start
out.close()
for typeInfo in info.values():
print "%20s\t%20d\t%20d" % ( typeInfo.name, typeInfo.count, typeInfo.coveredBases )
if __name__ == "__main__":
main()
# java -Xmx4096m -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/GenomeAnalysisTK.jar -T VCFCombine -R /humgen/gsa-hpprojects/1kg/reference/human_b36_both.fasta -B GATK,VCF,ceu.trio.gatk.ug.filtered.vcf -B glfTrio,VCF,/humgen/gsa-hpprojects/1kg/1kg_pilot2/currentBestProjectCalls/CEU_1kg_pilot2.vcf -O test.vcf -type UNION -priority GATK,glfTrio -l INFO -A
# java -ea -Xmx4096m -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/GenomeAnalysisTK.jar -l INFO -R /humgen/gsa-hpprojects/1kg/reference/human_b36_both.fasta -T VariantEval -D /humgen/gsa-scr1/GATK_Data/dbsnp_129_b36.rod -B eval,VCF,test.vcf -B hapmap-chip,GFF,../../hapmap3Genotypes/NA12878.b36.gff --sampleName NA12878 -vcfInfoSelector set=gatk-filtered
# if ( $1 == 3.1 ) then
# cat ceu.trio.calls.allTechs.mmq10_mbq10_q200.filtered.vcf | cut -f 1-10 > ceu.trio.calls.allTechs.mmq10_mbq10_q200.NA12878only.filtered.vcf
# endif
#
# if ( $1 == 4 ) then
# foreach callset ( NA12878.allTechs.mmq10_mbq10_q200 ceu.trio.calls.allTechs.mmq10_mbq10_q200.NA12878only )
# java -Xmx4096m -jar /home/radon01/depristo/dev/GenomeAnalysisTKStable/trunk/dist/GenomeAnalysisTK.jar -T CallsetConcordance -R /humgen/gsa-hpprojects/1kg/reference/human_b36_both.fasta -B GATK,VCF,$callset.filtered.vcf -B glfTrio,VCF,CEU_1kg_pilot2.na12878.vcf -CT SimpleVenn -CO ${callset}_v_CEU_1kg_pilot2.filtered.vcf -l INFO
# cat ${callset}_v_CEU_1kg_pilot2.filtered.vcf | awk '$1 ~ "#" || $8 ~ "callset2_only"' > ${callset}_v_CEU_1kg_pilot2.filtered.CEU_1kg_pilot2Unique.vcf
# cat ${callset}_v_CEU_1kg_pilot2.filtered.vcf | awk '$1 ~ "#" || $8 ~ "callset1_only"' > ${callset}_v_CEU_1kg_pilot2.filtered.${callset}Unique.vcf
# cat ${callset}_v_CEU_1kg_pilot2.filtered.vcf | awk '$1 ~ "#" || $8 ~ "concordant"' > ${callset}_v_CEU_1kg_pilot2.filtered.concordant.vcf
# end
# endif
#
# if ( $1 == 5 ) then
# mkdir /humgen/gsa-scr1/pub/1000Pilot2_010710
#
# foreach file ( NA12878.allTechs.mmq10_mbq10_q200.filtered.vcf NA12878.SLX.mmq10_mbq10_q50.filtered.vcf NA12891.calls.mmq10_mbq10_q50.filtered.vcf NA12892.calls.mmq10_mbq10_q50.filtered.vcf ceu.trio.calls.allTechs.mmq10_mbq10_q200.filtered.vcf )
# echo $file
# cp $file /humgen/gsa-scr1/pub/1000Pilot2_010710
# end
#
# endif