Refactored QJobReport and QFunction, which is now automatically tracked
-- All QFunctions, including sg ones, are tracked -- Removed memory information
This commit is contained in:
parent
08f6c3eea9
commit
a7d6946b22
|
|
@ -57,7 +57,6 @@ import java.text.SimpleDateFormat
|
||||||
class JobRunInfo {
|
class JobRunInfo {
|
||||||
var startTime: Date = _
|
var startTime: Date = _
|
||||||
var doneTime: Date = _
|
var doneTime: Date = _
|
||||||
var memUsedInGb: Int = -1
|
|
||||||
var status: RunnerStatus.Value = RunnerStatus.DONE
|
var status: RunnerStatus.Value = RunnerStatus.DONE
|
||||||
|
|
||||||
def getStatus = status
|
def getStatus = status
|
||||||
|
|
@ -67,10 +66,9 @@ class JobRunInfo {
|
||||||
def getFormattedStartTime = formatTime(getStartTime)
|
def getFormattedStartTime = formatTime(getStartTime)
|
||||||
def getFormattedDoneTime = formatTime(getDoneTime)
|
def getFormattedDoneTime = formatTime(getDoneTime)
|
||||||
|
|
||||||
val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS");
|
val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS");
|
||||||
private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null"
|
private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null"
|
||||||
|
|
||||||
def getMemoryUsedInGb = memUsedInGb
|
|
||||||
def isFilledIn = startTime != null
|
def isFilledIn = startTime != null
|
||||||
|
|
||||||
def getRuntimeInMs: Long = {
|
def getRuntimeInMs: Long = {
|
||||||
|
|
@ -81,7 +79,7 @@ class JobRunInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString: String =
|
override def toString: String =
|
||||||
"started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb)
|
"started %s ended %s runtime %s".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs)
|
||||||
}
|
}
|
||||||
|
|
||||||
object JobRunInfo {
|
object JobRunInfo {
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,6 @@ object Lsf706JobRunner extends Logging {
|
||||||
def updateRunInfo() {
|
def updateRunInfo() {
|
||||||
runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue)
|
runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue)
|
||||||
runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue)
|
runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue)
|
||||||
runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem // todo -- ask khalid about units here
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runner.updateStatus(
|
runner.updateStatus(
|
||||||
|
|
|
||||||
|
|
@ -30,14 +30,14 @@ import org.broadinstitute.sting.commandline._
|
||||||
import org.broadinstitute.sting.queue.{QException, QSettings}
|
import org.broadinstitute.sting.queue.{QException, QSettings}
|
||||||
import collection.JavaConversions._
|
import collection.JavaConversions._
|
||||||
import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
|
import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
|
||||||
import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, ReflectionUtils}
|
import org.broadinstitute.sting.queue.util._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The base interface for all functions in Queue.
|
* The base interface for all functions in Queue.
|
||||||
* Inputs and outputs are specified as Sets of values.
|
* Inputs and outputs are specified as Sets of values.
|
||||||
* Inputs are matched to other outputs by using .equals()
|
* Inputs are matched to other outputs by using .equals()
|
||||||
*/
|
*/
|
||||||
trait QFunction extends Logging {
|
trait QFunction extends Logging with QJobReport {
|
||||||
/** A short description of this step in the graph */
|
/** A short description of this step in the graph */
|
||||||
var analysisName: String = "<function>"
|
var analysisName: String = "<function>"
|
||||||
|
|
||||||
|
|
@ -83,11 +83,17 @@ trait QFunction extends Logging {
|
||||||
*/
|
*/
|
||||||
var deleteIntermediateOutputs = true
|
var deleteIntermediateOutputs = true
|
||||||
|
|
||||||
|
// -------------------------------------------------------
|
||||||
|
//
|
||||||
|
// job run information
|
||||||
|
//
|
||||||
|
// -------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copies settings from this function to another function.
|
* Copies settings from this function to another function.
|
||||||
* @param function QFunction to copy values to.
|
* @param function QFunction to copy values to.
|
||||||
*/
|
*/
|
||||||
def copySettingsTo(function: QFunction) {
|
override def copySettingsTo(function: QFunction) {
|
||||||
function.analysisName = this.analysisName
|
function.analysisName = this.analysisName
|
||||||
function.jobName = this.jobName
|
function.jobName = this.jobName
|
||||||
function.qSettings = this.qSettings
|
function.qSettings = this.qSettings
|
||||||
|
|
@ -99,6 +105,8 @@ trait QFunction extends Logging {
|
||||||
function.updateJobRun = this.updateJobRun
|
function.updateJobRun = this.updateJobRun
|
||||||
function.isIntermediate = this.isIntermediate
|
function.isIntermediate = this.isIntermediate
|
||||||
function.deleteIntermediateOutputs = this.deleteIntermediateOutputs
|
function.deleteIntermediateOutputs = this.deleteIntermediateOutputs
|
||||||
|
function.reportGroup = this.reportGroup
|
||||||
|
function.reportFeatures = this.reportFeatures
|
||||||
}
|
}
|
||||||
|
|
||||||
/** File to redirect any output. Defaults to <jobName>.out */
|
/** File to redirect any output. Defaults to <jobName>.out */
|
||||||
|
|
|
||||||
|
|
@ -28,72 +28,91 @@ import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport}
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException
|
import org.broadinstitute.sting.utils.exceptions.UserException
|
||||||
import org.broadinstitute.sting.queue.engine.JobRunInfo
|
import org.broadinstitute.sting.queue.engine.JobRunInfo
|
||||||
import java.io.{FileOutputStream, PrintStream, File}
|
import java.io.{FileOutputStream, PrintStream, File}
|
||||||
|
import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mixin to add Job info to the class
|
* A mixin to add Job info to the class
|
||||||
*/
|
*/
|
||||||
trait QJobReport extends QFunction {
|
// todo -- need to enforce QFunction to have copySettingTo work
|
||||||
private var group: String = _
|
trait QJobReport extends Logging {
|
||||||
private var features: Map[String, String] = null
|
self: QFunction =>
|
||||||
|
|
||||||
def getGroup = group
|
// todo -- might make more sense to mix in the variables
|
||||||
def isEnabled = group != null
|
protected var reportGroup: String = null
|
||||||
def getFeatureNames: List[String] = features.keys.toList
|
protected var reportFeatures: Map[String, String] = Map()
|
||||||
def getFeatures = features
|
|
||||||
def get(key: String): String = {
|
def includeInReport = getReportGroup != null
|
||||||
features.get(key) match {
|
def setRunInfo(info: JobRunInfo) {
|
||||||
|
logger.info("info " + info)
|
||||||
|
reportFeatures = reportFeatures ++ Map(
|
||||||
|
"analysisName" -> self.analysisName,
|
||||||
|
"jobName" -> QJobReport.workAroundSameJobNames(this),
|
||||||
|
"intermediate" -> self.isIntermediate,
|
||||||
|
"startTime" -> info.getStartTime.getTime,
|
||||||
|
"doneTime" -> info.getDoneTime.getTime,
|
||||||
|
"formattedStartTime" -> info.getFormattedStartTime,
|
||||||
|
"formattedDoneTime" -> info.getFormattedDoneTime,
|
||||||
|
"runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null")
|
||||||
|
}
|
||||||
|
|
||||||
|
def getReportGroup = reportGroup
|
||||||
|
def getReportFeatures = reportFeatures
|
||||||
|
|
||||||
|
def getReportFeatureNames: List[String] = getReportFeatures.keys.toList
|
||||||
|
def getReportFeature(key: String): String = {
|
||||||
|
getReportFeatures.get(key) match {
|
||||||
case Some(x) => x
|
case Some(x) => x
|
||||||
case None => throw new RuntimeException("Get called with key %s but no value was found".format(key))
|
case None => throw new RuntimeException("Get called with key %s but no value was found".format(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def getName: String = features.get("jobName").get
|
|
||||||
|
|
||||||
private def addRunInfo(info: JobRunInfo) {
|
def getReportName: String = getReportFeature("jobName")
|
||||||
logger.info("info " + info)
|
|
||||||
features = features ++ Map(
|
def configureJobReport(group: String) {
|
||||||
"analysisName" -> this.analysisName,
|
this.reportGroup = group
|
||||||
"jobName" -> this.jobName,
|
|
||||||
"intermediate" -> this.isIntermediate,
|
|
||||||
"startTime" -> info.getFormattedStartTime,
|
|
||||||
"doneTime" -> info.getFormattedDoneTime,
|
|
||||||
"memUsedInGb" -> info.getMemoryUsedInGb,
|
|
||||||
"runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def setJobLogging(group: String) {
|
def configureJobReport(group: String, features: Map[String, Any]) {
|
||||||
this.group = group
|
this.reportGroup = group
|
||||||
|
this.reportFeatures = features.mapValues(_.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
def setJobLogging(group: String, features: Map[String, Any]) {
|
// copy the QJobReport information -- todo : what's the best way to do this?
|
||||||
this.group = group
|
override def copySettingsTo(function: QFunction) {
|
||||||
this.features = features.mapValues(_.toString)
|
self.copySettingsTo(function)
|
||||||
|
function.reportGroup = this.reportGroup
|
||||||
|
function.reportFeatures = this.reportFeatures
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object QJobReport {
|
object QJobReport {
|
||||||
|
// todo -- fixme to have a unique name for Scatter/gather jobs as well
|
||||||
|
var seenCounter = 1
|
||||||
|
var seenNames = Set[String]()
|
||||||
|
|
||||||
def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
|
def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
|
||||||
val jobs = jobsRaw.filter(_._2.isFilledIn)
|
val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport)
|
||||||
val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList)
|
jobs foreach {case (qf, info) => qf.setRunInfo(info)}
|
||||||
jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get))
|
|
||||||
val stream = new PrintStream(new FileOutputStream(dest))
|
val stream = new PrintStream(new FileOutputStream(dest))
|
||||||
printJobLogging(jobLogs, stream)
|
printJobLogging(jobs.keys.toList, stream)
|
||||||
stream.close()
|
stream.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def jobLoggingSublist(l: List[QFunction]): List[QJobReport] = {
|
def workAroundSameJobNames(func: QFunction):String = {
|
||||||
def asJogLogging(qf: QFunction): QJobReport = qf match {
|
if ( seenNames.apply(func.jobName) ) {
|
||||||
case x: QJobReport => x
|
seenCounter += 1
|
||||||
case _ => null
|
"%s_%d".format(func.jobName, seenCounter)
|
||||||
|
} else {
|
||||||
|
seenNames += func.jobName
|
||||||
|
func.jobName
|
||||||
}
|
}
|
||||||
|
|
||||||
l.map(asJogLogging).filter(_ != null)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prints the JobLogging logs to a GATKReport. First splits up the
|
* Prints the JobLogging logs to a GATKReport. First splits up the
|
||||||
* logs by group, and for each group generates a GATKReportTable
|
* logs by group, and for each group generates a GATKReportTable
|
||||||
*/
|
*/
|
||||||
private def printJobLogging(logs: List[QJobReport], stream: PrintStream) {
|
private def printJobLogging(logs: List[QFunction], stream: PrintStream) {
|
||||||
// create the report
|
// create the report
|
||||||
val report: GATKReport = new GATKReport
|
val report: GATKReport = new GATKReport
|
||||||
|
|
||||||
|
|
@ -108,25 +127,25 @@ object QJobReport {
|
||||||
keys.foreach(table.addColumn(_, 0))
|
keys.foreach(table.addColumn(_, 0))
|
||||||
for (log <- groupLogs) {
|
for (log <- groupLogs) {
|
||||||
for ( key <- keys )
|
for ( key <- keys )
|
||||||
table.set(log.getName, key, log.get(key))
|
table.set(log.getReportName, key, log.getReportFeature(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
report.print(stream)
|
report.print(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def groupLogs(logs: List[QJobReport]): Map[String, List[QJobReport]] = {
|
private def groupLogs(logs: List[QFunction]): Map[String, List[QFunction]] = {
|
||||||
logs.groupBy(_.getGroup)
|
logs.groupBy(_.getReportGroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def logKeys(logs: List[QJobReport]): Set[String] = {
|
private def logKeys(logs: List[QFunction]): Set[String] = {
|
||||||
// the keys should be the same for each log, but we will check that
|
// the keys should be the same for each log, but we will check that
|
||||||
val keys = Set[String](logs(0).getFeatureNames : _*)
|
val keys = Set[String](logs(0).getReportFeatureNames : _*)
|
||||||
|
|
||||||
for ( log <- logs )
|
for ( log <- logs )
|
||||||
if ( keys.sameElements(Set(log.getFeatureNames)) )
|
if ( keys.sameElements(Set(log.getReportFeatureNames)) )
|
||||||
throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " +
|
throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " +
|
||||||
"We found one with %s and another with %s").format(keys, log.getFeatureNames))
|
"We found one with %s and another with %s").format(keys, log.getReportFeatureNames))
|
||||||
|
|
||||||
keys
|
keys
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue