From a7d6946b22e3689f658361be0b1a79d63775847a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 13:13:55 -0400 Subject: [PATCH] Refactored QJobReport and QFunction, which is now automatically tracked -- All QFunctions, including sg ones, are tracked -- Removed memory information --- .../sting/queue/engine/JobRunInfo.scala | 6 +- .../queue/engine/lsf/Lsf706JobRunner.scala | 1 - .../sting/queue/function/QFunction.scala | 14 ++- .../sting/queue/util/QJobReport.scala | 105 +++++++++++------- 4 files changed, 75 insertions(+), 51 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 563a32486..c99cbb1fc 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -57,7 +57,6 @@ import java.text.SimpleDateFormat class JobRunInfo { var startTime: Date = _ var doneTime: Date = _ - var memUsedInGb: Int = -1 var status: RunnerStatus.Value = RunnerStatus.DONE def getStatus = status @@ -67,10 +66,9 @@ class JobRunInfo { def getFormattedStartTime = formatTime(getStartTime) def getFormattedDoneTime = formatTime(getDoneTime) - val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS"); + val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS"); private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" - def getMemoryUsedInGb = memUsedInGb def isFilledIn = startTime != null def getRuntimeInMs: Long = { @@ -81,7 +79,7 @@ class JobRunInfo { } override def toString: String = - "started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb) + "started %s ended %s runtime %s".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs) } object JobRunInfo { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index b31a66150..02066c74f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -275,7 +275,6 @@ object Lsf706JobRunner extends Logging { def updateRunInfo() { runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) - runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem // todo -- ask khalid about units here } runner.updateStatus( diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 7048b6413..c905581fa 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -30,14 +30,14 @@ import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.queue.{QException, QSettings} import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction -import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, ReflectionUtils} +import org.broadinstitute.sting.queue.util._ /** * The base interface for all functions in Queue. * Inputs and outputs are specified as Sets of values. * Inputs are matched to other outputs by using .equals() */ -trait QFunction extends Logging { +trait QFunction extends Logging with QJobReport { /** A short description of this step in the graph */ var analysisName: String = "" @@ -83,11 +83,17 @@ trait QFunction extends Logging { */ var deleteIntermediateOutputs = true + // ------------------------------------------------------- + // + // job run information + // + // ------------------------------------------------------- + /** * Copies settings from this function to another function. * @param function QFunction to copy values to. */ - def copySettingsTo(function: QFunction) { + override def copySettingsTo(function: QFunction) { function.analysisName = this.analysisName function.jobName = this.jobName function.qSettings = this.qSettings @@ -99,6 +105,8 @@ trait QFunction extends Logging { function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate function.deleteIntermediateOutputs = this.deleteIntermediateOutputs + function.reportGroup = this.reportGroup + function.reportFeatures = this.reportFeatures } /** File to redirect any output. Defaults to .out */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index be0c2a5fc..3e393e4c0 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -28,72 +28,91 @@ import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport} import org.broadinstitute.sting.utils.exceptions.UserException import org.broadinstitute.sting.queue.engine.JobRunInfo import java.io.{FileOutputStream, PrintStream, File} +import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction} /** * A mixin to add Job info to the class */ -trait QJobReport extends QFunction { - private var group: String = _ - private var features: Map[String, String] = null +// todo -- need to enforce QFunction to have copySettingTo work +trait QJobReport extends Logging { + self: QFunction => - def getGroup = group - def isEnabled = group != null - def getFeatureNames: List[String] = features.keys.toList - def getFeatures = features - def get(key: String): String = { - features.get(key) match { + // todo -- might make more sense to mix in the variables + protected var reportGroup: String = null + protected var reportFeatures: Map[String, String] = Map() + + def includeInReport = getReportGroup != null + def setRunInfo(info: JobRunInfo) { + logger.info("info " + info) + reportFeatures = reportFeatures ++ Map( + "analysisName" -> self.analysisName, + "jobName" -> QJobReport.workAroundSameJobNames(this), + "intermediate" -> self.isIntermediate, + "startTime" -> info.getStartTime.getTime, + "doneTime" -> info.getDoneTime.getTime, + "formattedStartTime" -> info.getFormattedStartTime, + "formattedDoneTime" -> info.getFormattedDoneTime, + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + } + + def getReportGroup = reportGroup + def getReportFeatures = reportFeatures + + def getReportFeatureNames: List[String] = getReportFeatures.keys.toList + def getReportFeature(key: String): String = { + getReportFeatures.get(key) match { case Some(x) => x case None => throw new RuntimeException("Get called with key %s but no value was found".format(key)) } } - def getName: String = features.get("jobName").get - private def addRunInfo(info: JobRunInfo) { - logger.info("info " + info) - features = features ++ Map( - "analysisName" -> this.analysisName, - "jobName" -> this.jobName, - "intermediate" -> this.isIntermediate, - "startTime" -> info.getFormattedStartTime, - "doneTime" -> info.getFormattedDoneTime, - "memUsedInGb" -> info.getMemoryUsedInGb, - "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + def getReportName: String = getReportFeature("jobName") + + def configureJobReport(group: String) { + this.reportGroup = group } - def setJobLogging(group: String) { - this.group = group + def configureJobReport(group: String, features: Map[String, Any]) { + this.reportGroup = group + this.reportFeatures = features.mapValues(_.toString) } - def setJobLogging(group: String, features: Map[String, Any]) { - this.group = group - this.features = features.mapValues(_.toString) + // copy the QJobReport information -- todo : what's the best way to do this? + override def copySettingsTo(function: QFunction) { + self.copySettingsTo(function) + function.reportGroup = this.reportGroup + function.reportFeatures = this.reportFeatures } } object QJobReport { + // todo -- fixme to have a unique name for Scatter/gather jobs as well + var seenCounter = 1 + var seenNames = Set[String]() + def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) { - val jobs = jobsRaw.filter(_._2.isFilledIn) - val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList) - jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get)) + val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport) + jobs foreach {case (qf, info) => qf.setRunInfo(info)} val stream = new PrintStream(new FileOutputStream(dest)) - printJobLogging(jobLogs, stream) + printJobLogging(jobs.keys.toList, stream) stream.close() } - private def jobLoggingSublist(l: List[QFunction]): List[QJobReport] = { - def asJogLogging(qf: QFunction): QJobReport = qf match { - case x: QJobReport => x - case _ => null + def workAroundSameJobNames(func: QFunction):String = { + if ( seenNames.apply(func.jobName) ) { + seenCounter += 1 + "%s_%d".format(func.jobName, seenCounter) + } else { + seenNames += func.jobName + func.jobName } - - l.map(asJogLogging).filter(_ != null) } /** * Prints the JobLogging logs to a GATKReport. First splits up the * logs by group, and for each group generates a GATKReportTable */ - private def printJobLogging(logs: List[QJobReport], stream: PrintStream) { + private def printJobLogging(logs: List[QFunction], stream: PrintStream) { // create the report val report: GATKReport = new GATKReport @@ -108,25 +127,25 @@ object QJobReport { keys.foreach(table.addColumn(_, 0)) for (log <- groupLogs) { for ( key <- keys ) - table.set(log.getName, key, log.get(key)) + table.set(log.getReportName, key, log.getReportFeature(key)) } } report.print(stream) } - private def groupLogs(logs: List[QJobReport]): Map[String, List[QJobReport]] = { - logs.groupBy(_.getGroup) + private def groupLogs(logs: List[QFunction]): Map[String, List[QFunction]] = { + logs.groupBy(_.getReportGroup) } - private def logKeys(logs: List[QJobReport]): Set[String] = { + private def logKeys(logs: List[QFunction]): Set[String] = { // the keys should be the same for each log, but we will check that - val keys = Set[String](logs(0).getFeatureNames : _*) + val keys = Set[String](logs(0).getReportFeatureNames : _*) for ( log <- logs ) - if ( keys.sameElements(Set(log.getFeatureNames)) ) + if ( keys.sameElements(Set(log.getReportFeatureNames)) ) throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " + - "We found one with %s and another with %s").format(keys, log.getFeatureNames)) + "We found one with %s and another with %s").format(keys, log.getReportFeatureNames)) keys }