JobRunInfo improvements
-- dry-run now adds some info, for testing -- InProcessRunner adds some, but not all, of the information we want
This commit is contained in:
parent
569e1a1089
commit
b8bc03bb42
|
|
@ -23,7 +23,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
||||||
*/
|
*/
|
||||||
var depth = -1
|
var depth = -1
|
||||||
|
|
||||||
var runInfo: JobRunInfo = JobRunInfo.default // todo: replace after testing with _
|
val myRunInfo: JobRunInfo = JobRunInfo.default // purely for dryRun testing
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes with the current status of the function.
|
* Initializes with the current status of the function.
|
||||||
|
|
@ -90,7 +90,6 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
||||||
tailError()
|
tailError()
|
||||||
} else if (currentStatus == RunnerStatus.DONE) {
|
} else if (currentStatus == RunnerStatus.DONE) {
|
||||||
try {
|
try {
|
||||||
runInfo = runner.getRunInfo
|
|
||||||
runner.cleanup()
|
runner.cleanup()
|
||||||
function.doneOutputs.foreach(_.createNewFile())
|
function.doneOutputs.foreach(_.createNewFile())
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -182,4 +181,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
||||||
printWriter.close
|
printWriter.close
|
||||||
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
|
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getRunInfo = {
|
||||||
|
if ( runner == null ) myRunInfo else runner.getRunInfo
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.function.InProcessFunction
|
import org.broadinstitute.sting.queue.function.InProcessFunction
|
||||||
import org.broadinstitute.sting.queue.util.IOUtils
|
import org.broadinstitute.sting.queue.util.IOUtils
|
||||||
|
import java.util.Date
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs a function that executes in process and does not fork out an external process.
|
* Runs a function that executes in process and does not fork out an external process.
|
||||||
|
|
@ -10,8 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce
|
||||||
private var runStatus: RunnerStatus.Value = _
|
private var runStatus: RunnerStatus.Value = _
|
||||||
|
|
||||||
def start() = {
|
def start() = {
|
||||||
|
runInfo.startTime = new Date()
|
||||||
runStatus = RunnerStatus.RUNNING
|
runStatus = RunnerStatus.RUNNING
|
||||||
|
|
||||||
function.run()
|
function.run()
|
||||||
|
|
||||||
|
runInfo.doneTime = new Date()
|
||||||
val content = "%s%nDone.".format(function.description)
|
val content = "%s%nDone.".format(function.description)
|
||||||
IOUtils.writeContents(function.jobOutputFile, content)
|
IOUtils.writeContents(function.jobOutputFile, content)
|
||||||
runStatus = RunnerStatus.DONE
|
runStatus = RunnerStatus.DONE
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,13 @@ import java.util.Date
|
||||||
/**
|
/**
|
||||||
* Base class containing all of the information about a job run.
|
* Base class containing all of the information about a job run.
|
||||||
*/
|
*/
|
||||||
class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String, status: RunnerStatus.Value) {
|
class JobRunInfo {
|
||||||
|
var startTime: Date = _
|
||||||
|
var doneTime: Date = _
|
||||||
|
var memUsedInGb: Int = -1
|
||||||
|
var hostName: String = "localhost"
|
||||||
|
var status: RunnerStatus.Value = RunnerStatus.DONE
|
||||||
|
|
||||||
def getStatus = status
|
def getStatus = status
|
||||||
def getStartTime = startTime
|
def getStartTime = startTime
|
||||||
def getDoneTime = doneTime
|
def getDoneTime = doneTime
|
||||||
|
|
@ -69,7 +75,5 @@ class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: St
|
||||||
}
|
}
|
||||||
|
|
||||||
object JobRunInfo {
|
object JobRunInfo {
|
||||||
def default = new JobRunInfo(new Date(), new Date(), 1, "localhost", RunnerStatus.DONE)
|
def default: JobRunInfo = new JobRunInfo()
|
||||||
def detailed(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String) =
|
|
||||||
new JobRunInfo(startTime, doneTime, memUsedInGb, hostName, RunnerStatus.DONE)
|
|
||||||
}
|
}
|
||||||
|
|
@ -72,7 +72,8 @@ trait JobRunner[TFunction <: QFunction] {
|
||||||
/**
|
/**
|
||||||
* Must be overloaded
|
* Must be overloaded
|
||||||
*/
|
*/
|
||||||
def getRunInfo: JobRunInfo = JobRunInfo.default
|
val runInfo = JobRunInfo.default
|
||||||
|
def getRunInfo = runInfo
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls back to a hook that an expert user can setup to modify a job.
|
* Calls back to a hook that an expert user can setup to modify a job.
|
||||||
|
|
|
||||||
|
|
@ -320,7 +320,9 @@ class QGraph extends Logging {
|
||||||
logger.debug("+++++++")
|
logger.debug("+++++++")
|
||||||
foreachFunction(readyJobs.toList, edge => {
|
foreachFunction(readyJobs.toList, edge => {
|
||||||
if (running) {
|
if (running) {
|
||||||
|
edge.myRunInfo.startTime = new Date()
|
||||||
logEdge(edge)
|
logEdge(edge)
|
||||||
|
edge.myRunInfo.doneTime = new Date()
|
||||||
edge.markAsDone
|
edge.markAsDone
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
@ -1038,7 +1040,7 @@ class QGraph extends Logging {
|
||||||
def isShutdown = !running
|
def isShutdown = !running
|
||||||
|
|
||||||
def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = {
|
def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = {
|
||||||
getFunctionEdges.map(edge => (edge.function, edge.runInfo)).toMap
|
getFunctionEdges.map(edge => (edge.function, edge.getRunInfo)).toMap
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue