Fixes throughout for getting job information
-- no more hostname -- it's just not going to be important
This commit is contained in:
parent
4918519a58
commit
06e30a81d1
|
|
@ -1,8 +1,8 @@
|
||||||
package org.broadinstitute.sting.queue.engine
|
package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.function.InProcessFunction
|
import org.broadinstitute.sting.queue.function.InProcessFunction
|
||||||
import org.broadinstitute.sting.queue.util.IOUtils
|
|
||||||
import java.util.Date
|
import java.util.Date
|
||||||
|
import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs a function that executes in process and does not fork out an external process.
|
* Runs a function that executes in process and does not fork out an external process.
|
||||||
|
|
@ -11,12 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce
|
||||||
private var runStatus: RunnerStatus.Value = _
|
private var runStatus: RunnerStatus.Value = _
|
||||||
|
|
||||||
def start() = {
|
def start() = {
|
||||||
runInfo.startTime = new Date()
|
getRunInfo.startTime = new Date()
|
||||||
runStatus = RunnerStatus.RUNNING
|
runStatus = RunnerStatus.RUNNING
|
||||||
|
|
||||||
function.run()
|
function.run()
|
||||||
|
|
||||||
runInfo.doneTime = new Date()
|
getRunInfo.doneTime = new Date()
|
||||||
val content = "%s%nDone.".format(function.description)
|
val content = "%s%nDone.".format(function.description)
|
||||||
IOUtils.writeContents(function.jobOutputFile, content)
|
IOUtils.writeContents(function.jobOutputFile, content)
|
||||||
runStatus = RunnerStatus.DONE
|
runStatus = RunnerStatus.DONE
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@
|
||||||
package org.broadinstitute.sting.queue.engine
|
package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
import java.util.Date
|
import java.util.Date
|
||||||
|
import java.text.SimpleDateFormat
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2011, The Broad Institute
|
* Copyright (c) 2011, The Broad Institute
|
||||||
|
|
@ -57,14 +58,20 @@ class JobRunInfo {
|
||||||
var startTime: Date = _
|
var startTime: Date = _
|
||||||
var doneTime: Date = _
|
var doneTime: Date = _
|
||||||
var memUsedInGb: Int = -1
|
var memUsedInGb: Int = -1
|
||||||
var hostName: String = "localhost"
|
|
||||||
var status: RunnerStatus.Value = RunnerStatus.DONE
|
var status: RunnerStatus.Value = RunnerStatus.DONE
|
||||||
|
|
||||||
def getStatus = status
|
def getStatus = status
|
||||||
|
|
||||||
def getStartTime = startTime
|
def getStartTime = startTime
|
||||||
def getDoneTime = doneTime
|
def getDoneTime = doneTime
|
||||||
|
def getFormattedStartTime = formatTime(getStartTime)
|
||||||
|
def getFormattedDoneTime = formatTime(getDoneTime)
|
||||||
|
|
||||||
|
val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS");
|
||||||
|
private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null"
|
||||||
|
|
||||||
def getMemoryUsedInGb = memUsedInGb
|
def getMemoryUsedInGb = memUsedInGb
|
||||||
def getHostname = hostName
|
def isFilledIn = startTime != null
|
||||||
|
|
||||||
def getRuntimeInMs: Long = {
|
def getRuntimeInMs: Long = {
|
||||||
if ( getDoneTime != null && getStartTime != null )
|
if ( getDoneTime != null && getStartTime != null )
|
||||||
|
|
@ -74,7 +81,7 @@ class JobRunInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString: String =
|
override def toString: String =
|
||||||
"started %s ended %s runtime %s on host %s using %d Gb memory".format(getStartTime, getDoneTime, getRuntimeInMs, getHostname, getMemoryUsedInGb)
|
"started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb)
|
||||||
}
|
}
|
||||||
|
|
||||||
object JobRunInfo {
|
object JobRunInfo {
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ class QGraphSettings {
|
||||||
var expandedDotFile: File = _
|
var expandedDotFile: File = _
|
||||||
|
|
||||||
@Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false)
|
@Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false)
|
||||||
var jobReportFile: File = new File("queue_job_report.gatkreport.txt")
|
var jobReportFile: File = new File("/dev/stdout")
|
||||||
|
|
||||||
@ArgumentCollection
|
@ArgumentCollection
|
||||||
val qSettings = new QSettings
|
val qSettings = new QSettings
|
||||||
|
|
|
||||||
|
|
@ -31,11 +31,12 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
|
||||||
import org.broadinstitute.sting.utils.Utils
|
import org.broadinstitute.sting.utils.Utils
|
||||||
import org.broadinstitute.sting.jna.clibrary.LibC
|
import org.broadinstitute.sting.jna.clibrary.LibC
|
||||||
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
|
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
|
||||||
import com.sun.jna.ptr.IntByReference
|
|
||||||
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
||||||
import com.sun.jna.{Structure, StringArray, NativeLong}
|
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import java.lang.StringBuffer
|
import java.lang.StringBuffer
|
||||||
|
import java.util.Date
|
||||||
|
import com.sun.jna.{Pointer, Structure, StringArray, NativeLong}
|
||||||
|
import com.sun.jna.ptr.{PointerByReference, IntByReference}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs jobs on an LSF compute cluster.
|
* Runs jobs on an LSF compute cluster.
|
||||||
|
|
@ -271,12 +272,21 @@ object Lsf706JobRunner extends Logging {
|
||||||
|
|
||||||
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo))
|
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo))
|
||||||
|
|
||||||
|
def updateRunInfo() {
|
||||||
|
runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue)
|
||||||
|
runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue)
|
||||||
|
runner.getRunInfo.hostName = "unavailable" // TODO : exHosts
|
||||||
|
runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem
|
||||||
|
}
|
||||||
|
|
||||||
runner.updateStatus(
|
runner.updateStatus(
|
||||||
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
|
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
|
||||||
// Done successfully.
|
// Done successfully.
|
||||||
|
updateRunInfo()
|
||||||
RunnerStatus.DONE
|
RunnerStatus.DONE
|
||||||
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
|
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
|
||||||
// Exited function that (probably) won't be retried.
|
// Exited function that (probably) won't be retried.
|
||||||
|
updateRunInfo()
|
||||||
RunnerStatus.FAILED
|
RunnerStatus.FAILED
|
||||||
} else {
|
} else {
|
||||||
// Note that we still saw the job in the system.
|
// Note that we still saw the job in the system.
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ package org.broadinstitute.sting.queue.engine.shell
|
||||||
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
||||||
import org.broadinstitute.sting.queue.util.ShellJob
|
import org.broadinstitute.sting.queue.util.ShellJob
|
||||||
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
||||||
|
import java.util.Date
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs jobs one at a time locally
|
* Runs jobs one at a time locally
|
||||||
|
|
@ -50,8 +51,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu
|
||||||
// Allow advanced users to update the job.
|
// Allow advanced users to update the job.
|
||||||
updateJobRun(job)
|
updateJobRun(job)
|
||||||
|
|
||||||
|
getRunInfo.startTime = new Date()
|
||||||
updateStatus(RunnerStatus.RUNNING)
|
updateStatus(RunnerStatus.RUNNING)
|
||||||
job.run()
|
job.run()
|
||||||
|
getRunInfo.doneTime = new Date()
|
||||||
updateStatus(RunnerStatus.DONE)
|
updateStatus(RunnerStatus.DONE)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,15 +49,15 @@ trait QJobReport extends QFunction {
|
||||||
def getName: String = features.get("jobName").get
|
def getName: String = features.get("jobName").get
|
||||||
|
|
||||||
private def addRunInfo(info: JobRunInfo) {
|
private def addRunInfo(info: JobRunInfo) {
|
||||||
|
logger.info("info " + info)
|
||||||
features = features ++ Map(
|
features = features ++ Map(
|
||||||
"analysisName" -> this.analysisName,
|
"analysisName" -> this.analysisName,
|
||||||
"jobName" -> this.jobName,
|
"jobName" -> this.jobName,
|
||||||
"intermediate" -> this.isIntermediate,
|
"intermediate" -> this.isIntermediate,
|
||||||
"startTime" -> info.getStartTime,
|
"startTime" -> info.getFormattedStartTime,
|
||||||
"doneTime" -> info.getDoneTime,
|
"doneTime" -> info.getFormattedDoneTime,
|
||||||
"memUsedInGb" -> info.getMemoryUsedInGb,
|
"memUsedInGb" -> info.getMemoryUsedInGb,
|
||||||
"runtime" -> info.getRuntimeInMs,
|
"runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null")
|
||||||
"hostName" -> info.getHostname).mapValues(_.toString)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def setJobLogging(group: String) {
|
def setJobLogging(group: String) {
|
||||||
|
|
@ -71,7 +71,8 @@ trait QJobReport extends QFunction {
|
||||||
}
|
}
|
||||||
|
|
||||||
object QJobReport {
|
object QJobReport {
|
||||||
def printReport(jobs: Map[QFunction, JobRunInfo], dest: File) {
|
def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
|
||||||
|
val jobs = jobsRaw.filter(_._2.isFilledIn)
|
||||||
val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList)
|
val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList)
|
||||||
jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get))
|
jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get))
|
||||||
val stream = new PrintStream(new FileOutputStream(dest))
|
val stream = new PrintStream(new FileOutputStream(dest))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue