import os.path import sys from optparse import OptionParser from itertools import * from xml.etree.cElementTree import * import gzip import datetime import re MISSING_VALUE = "NA" RUN_REPORT_LIST = "GATK-run-reports" RUN_REPORT = "GATK-run-report" def main(): global OPTIONS usage = "usage: %prog [options] mode file1 ... fileN" parser = OptionParser(usage=usage) parser.add_option("-v", "--verbose", dest="verbose", action='store_true', default=False, help="If provided, verbose progress will be enabled") parser.add_option("", "--overwrite", dest="overwrite", action='store_true', default=False, help="If provided, archive mode will overwrite destination file, if it exists (DANGEROUS)") parser.add_option("-o", "--o", dest="output", type='string', default=None, help="if provided, output will go here instead of stdout") parser.add_option("", "--no-dev", dest="noDev", action='store_true', default=False, help="if provided, only records not coming from a dev version of GATK will be included") parser.add_option("-E", "", dest="exception_selection", type='choice', choices=['all', 'user', 'sting'], default='all', help="if provided, will only emit records matching of the provided class [default %default]") parser.add_option("", "--max_days", dest="maxDays", type='int', default=None, help="if provided, only records generated within X days of today will be included") parser.add_option("-D", "--delete_while_archiving", dest="reallyDeleteInArchiveMode", action='store_true', default=False, help="if provided, we'll actually delete records when running in archive mode") (OPTIONS, args) = parser.parse_args() if len(args) == 0: parser.error("Requires at least GATKRunReport xml to analyze") stage = args[0] files = resolveFiles(args[1:]) # open up the output file if OPTIONS.output != None: if stage == "archive" and os.path.exists(OPTIONS.output) and not OPTIONS.overwrite: raise "archive output file already exists, aborting!", OPTIONS.output out = openFile(OPTIONS.output,'w') else: out = sys.stdout handler = getHandler(stage)(stage, out) handler.initialize(files) # parse all of the incoming files counter = 0 for report in readReports(files): # todo -- add matching here handler.processRecord(report) counter += 1 report.clear() handler.finalize(files) if OPTIONS.output != None: out.close() print 'Processed records:', counter # # Stage HANDLERS # class StageHandler: def __init__(self, name, out): self.name = name self.out = out def getName(self): return self.name def initialize(self, args): pass # print 'initialize' def processRecord(self, record): pass # print 'processing record', record def finalize(self, args): pass # print 'Finalize' # a map from stage strings -> function to handle record HANDLERS = dict() def addHandler(name, handler): HANDLERS[name] = handler def getHandler(stage): return HANDLERS[stage] def eltIsException(elt): return elt.tag == "exception" def parseException(elt): msgElt = elt.find("message") msgText = "MISSING" userException = "NA" if msgElt != None: msgText = msgElt.text stackTrace = elt.find("stacktrace").find("string").text if elt.find("is-user-exception") != None: #print elt.find("is-user-exception") userException = elt.find("is-user-exception").text return msgText, stackTrace, userException def javaExceptionFile(javaException): m = re.search("\((.*\.java:.*)\)", javaException) if m != None: return m.group(1) else: javaException class RecordDecoder: def __init__(self): self.fields = list() self.formatters = dict() def id(elt): return elt.text def toString(elt): return '%s' % elt.text def formatExceptionMsg(elt): return '%s' % parseException(elt)[0] def formatExceptionAt(elt): return '%s' % parseException(elt)[1] def formatExceptionAtBrief(elt): return '%s' % javaExceptionFile(parseException(elt)[1]) def formatExceptionUser(elt): return '%s' % parseException(elt)[2] def add(names, func): for name in names: addComplex(name, [name], [func]) def addComplex(key, fields, funcs): self.fields.extend(fields) self.formatters[key] = zip(fields, funcs) add(["id", "walker-name", "svn-version", "phone-home-type"], id) add(["start-time", "end-time"], toString) add(["run-time", "java-tmp-directory", "working-directory", "user-name", "host-name"], id) add(["java", "machine"], toString) add(["max-memory", "total-memory", "iterations", "reads"], id) addComplex("exception", ["exception-msg", "exception-at", "exception-at-brief", "is-user-exception"], [formatExceptionMsg, formatExceptionAt, formatExceptionAtBrief, formatExceptionUser]) # add(["command-line"], toString) def decode(self, report): bindings = dict() for elt in report: if elt.tag in self.formatters: fieldFormats = self.formatters[elt.tag] # we actually care about this tag for field, formatter in fieldFormats: bindings[field] = formatter(elt) # add missing data for field in self.fields: if field not in bindings: bindings[field] = MISSING_VALUE return bindings # def class RecordAsTable(StageHandler): def __init__(self, name, out): StageHandler.__init__(self, name, out) def initialize(self, args): self.decoder = RecordDecoder() print >> self.out, "\t".join(self.decoder.fields) def processRecord(self, record): parsed = self.decoder.decode(record) def oneField(field): val = MISSING_VALUE if field in parsed: val = parsed[field] if val == None: if OPTIONS.verbose: print >> sys.stderr, 'field', field, 'is missing in', parsed['id'] else: val = val.replace('"',"'") # if val.find("\t") != -1: # if OPTIONS.verbose: print >> sys.stderr, 'Warning -- val', val, 'contains tabs, droping field', field # raise Error #val = "value contained tabs, dropped" if val.find(" ") != -1: val = "\"" + val + "\"" return val try: print >> self.out, "\t".join([ oneField(field) for field in self.decoder.fields ]) except: #print 'Failed to convert to table ', parsed pass addHandler('table', RecordAsTable) class CountRecords(StageHandler): def __init__(self, name, out): StageHandler.__init__(self, name, out) def initialize(self, args): self.counter = 0 def processRecord(self, record): self.counter += 1 addHandler('count', CountRecords) class RecordAsXML(StageHandler): def __init__(self, name, out): StageHandler.__init__(self, name, out) def initialize(self, args): print >> self.out, "<%s>" % RUN_REPORT_LIST def processRecord(self, record): print >> self.out, tostring(record) def finalize(self, args): print >> self.out, "" % RUN_REPORT_LIST addHandler('xml', RecordAsXML) class Archive(RecordAsXML): def __init__(self, name, out): RecordAsXML.__init__(self, name, out) def finalize(self, args): RecordAsXML.finalize(self, args) for arg in args: if OPTIONS.verbose: print 'Deleting file: ', arg if OPTIONS.reallyDeleteInArchiveMode: os.remove(arg) print 'Deleted', len(args), 'files' addHandler('archive', Archive) class ExceptionReport(StageHandler): #FIELDS = ["Msg", "At", "SVN.versions", "Walkers", 'Occurrences', 'IDs'] def __init__(self, name, out): StageHandler.__init__(self, name, out) self.exceptions = [] def initialize(self, args): self.decoder = RecordDecoder() #print >> self.out, "\t".join(self.FIELDS) def processRecord(self, record): for elt in record: if eltIsException(elt): self.exceptions.append(self.decoder.decode(record)) break def finalize(self, args): commonExceptions = list() def addToCommons(ex): for common in commonExceptions: if common.equals(ex): common.update(ex) return commonExceptions.append(CommonException(ex)) for ex in self.exceptions: addToCommons(ex) commonExceptions = sorted(commonExceptions, None, lambda x: x.counts) for common in commonExceptions: msg, at, svns, walkers, counts, ids, duration, users, userError = common.toStrings() if not matchesExceptionSelection(userError): continue print >> self.out, ''.join(['*'] * 80) print >> self.out, 'Exception :', msg print >> self.out, ' is-user-exception? :', userError print >> self.out, ' at :', at print >> self.out, ' walkers :', walkers print >> self.out, ' svns :', svns print >> self.out, ' duration :', duration print >> self.out, ' occurrences :', counts print >> self.out, ' users :', users print >> self.out, ' ids :', ids def matchesExceptionSelection(userError): if OPTIONS.exception_selection == "all": return True elif OPTIONS.exception_selection == "user" and userError == "true": return True elif OPTIONS.exception_selection == "sting" and userError == "false": return True return False class CommonException: MAX_SET_ITEMS_TO_SHOW = 5 def __init__(self, ex): self.msgs = set([ex['exception-msg']]) self.at = ex['exception-at'] self.svns = set([ex['svn-version']]) self.users = set([ex['user-name']]) self.userError = ex['is-user-exception'] self.counts = 1 self.times = set([decodeTime(ex['end-time'])]) self.walkers = set([ex['walker-name']]) self.ids = set([ex['id']]) def equals(self, ex): return self.at == ex['exception-at'] def update(self, ex): self.msgs.add(ex['exception-msg']) self.svns.add(ex['svn-version']) self.users.add(ex['user-name']) self.counts += 1 self.walkers.add(ex['walker-name']) self.times.add(decodeTime(ex['end-time'])) self.ids.add(ex['id']) def bestExample(self, examples): def takeShorter(x, y): if len(y) < len(x): return y else: return x return reduce(takeShorter, examples) def setString(self, s): if len(s) > self.MAX_SET_ITEMS_TO_SHOW: s = [x for x in s][0:self.MAX_SET_ITEMS_TO_SHOW] + ["..."] return ','.join(s) def duration(self): x = sorted(filter(lambda x: x != "ND", self.times)) if len(x) >= 2: return "-".join(map(lambda x: x.strftime("%m/%d/%y"), [x[0], x[-1]])) elif len(x) == 1: return x[0] else: return "ND" def toStrings(self): return [self.bestExample(self.msgs), self.at, self.setString(self.svns), self.setString(self.walkers), self.counts, self.setString(self.ids), self.duration(), self.setString(self.users), self.userError] addHandler('exceptions', ExceptionReport) class SummaryReport(StageHandler): #FIELDS = ["Msg", "At", "SVN.versions", "Walkers", 'Occurrences', 'IDs'] def __init__(self, name, out): StageHandler.__init__(self, name, out) self.reports = [] def initialize(self, args): self.decoder = RecordDecoder() #print >> self.out, "\t".join(self.FIELDS) def processRecord(self, record): self.reports.append(self.decoder.decode(record)) def finalize(self, args): print >> self.out, 'GATK run summary for :', datetime.datetime.today() print >> self.out, ' number of runs :', len(self.reports) print >> self.out, ' number of StingExceptions :', len(filter(isStingException, self.reports)) print >> self.out, ' number of UserExceptions :', len(filter(isUserException, self.reports)) print >> self.out, ' users :', ', '.join(set(map(userID, self.reports))) def userID(rec): return rec['user-name'] def isStingException(rec): return rec['exception-at'] != "NA" and rec['is-user-exception'] == "false" def isUserException(rec): return rec['exception-at'] != "NA" and rec['is-user-exception'] == "true" addHandler('summary', SummaryReport) # # utilities # def openFile(filename, mode='r'): if ( filename.endswith(".gz") ): return gzip.open(filename, mode) else: return open(filename, mode) def resolveFiles(paths): allFiles = list() def resolve1(path): if not os.path.exists(path): raise Exception("Path doesn't exist: " + path) elif os.path.isfile(path): allFiles.append(path) else: def one(arg, dirname, files): #print dirname, files #print dirname allFiles.extend(map( lambda x: os.path.join(path, x), files )) #print files os.path.walk(path, one, None) map( resolve1, paths ) return allFiles def decodeTime(time): if time == "ND": return "ND" else: return datetime.datetime.strptime(time.split()[0], "%Y/%m/%d") #return datetime.datetime.strptime(time, "%Y/%m/%d %H.%M.%S") def eltTagEquals(elt, tag, value): if elt == None: return False msgElt = elt.find(tag) found = msgElt != None and msgElt.text == value #print 'finding', tag, 'in', elt, msgElt, msgElt.text, found return found def passesFilters(elt): if OPTIONS.noDev and eltTagEquals(elt.find('argument-collection'),'phone-home-type','DEV'): #print 'skipping', elt return False if OPTIONS.maxDays != None: now = datetime.datetime.today() now = datetime.datetime(now.year, now.month, now.day) # 2010/08/31 15.38.00 eltTime = decodeTime(elt.find('end-time').text) diff = now - eltTime #print eltTime, now, diff, diff.days if diff.days > OPTIONS.maxDays: return False return True def readReportsSlow(files): #print files for file in files: if OPTIONS.verbose: print 'Reading file', file input = openFile(file) try: tree = ElementTree(file=input) except: print "Ignoring excepting file", file continue elem = tree.getroot() if elem.tag == RUN_REPORT_LIST: counter = 0 for sub in elem: if passesFilters(sub): counter += 1 if counter % 1000 == 0: print 'Returning', counter yield sub else: if passesFilters(elem): yield elem def readReports(files): #print files for file in files: if OPTIONS.verbose: print 'Reading file', file input = openFile(file) try: counter = 0 for event, elem in iterparse(input): if elem.tag == RUN_REPORT: if passesFilters(elem): counter += 1 #if counter % 1000 == 0: print 'Returning', counter yield elem except: print "Ignoring excepting file", file continue if __name__ == "__main__": main()