Printing out counts of functions as they are dispatched.

Deleting files from intermediate jobs as soon as all the dependent jobs are done.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5413 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-03-10 01:45:20 +00:00
parent ff7edc4493
commit 204582bcd5
1 changed files with 127 additions and 26 deletions

View File

@ -60,6 +60,7 @@ class QGraph extends Logging {
private var running = true
private val runningLock = new Object
private var runningJobs = Set.empty[FunctionEdge]
private var intermediatesJobs = Set.empty[FunctionEdge]
private val nl = "%n".format()
@ -68,6 +69,14 @@ class QGraph extends Logging {
private val inProcessManager = new InProcessJobManager
private def managers = List[Any](inProcessManager, commandLineManager)
private class StatusCounts {
var pending = 0
var running = 0
var failed = 0
var done = 0
}
private val statusCounts = new StatusCounts
/**
* Adds a QScript created CommandLineFunction to the graph.
* @param command Function to add to the graph.
@ -192,6 +201,27 @@ class QGraph extends Logging {
previous
}
/**
* Walks up the graph looking for the next function edges.
* @param edge Graph edge to examine for the next functions.
* @return A list of prior function edges.
*/
private def nextFunctions(edge: QEdge): List[FunctionEdge] = {
var next = List.empty[FunctionEdge]
val target = this.jobGraph.getEdgeTarget(edge)
for (outgoingEdge <- this.jobGraph.outgoingEdgesOf(target)) {
outgoingEdge match {
// Stop recursing when we find a function edge and return it
case functionEdge: FunctionEdge => next :+= functionEdge
// For any other type of edge find the jobs following the edge
case edge: QEdge => next ++= nextFunctions(edge)
}
}
next
}
/**
* Fills in the graph using mapping functions, then removes out of date
* jobs, then cleans up mapping functions and nodes that aren't need.
@ -341,34 +371,57 @@ class QGraph extends Logging {
readyJobs ++= getReadyJobs()
runningJobs = Set.empty[FunctionEdge]
var lastRunningCheck = System.currentTimeMillis
var logNextStatusCounts = true
while (running && readyJobs.size + runningJobs.size > 0) {
while (running && readyJobs.size > 0 && nextRunningCheck(lastRunningCheck) > 0) {
var startedJobs = Set.empty[FunctionEdge]
var doneJobs = Set.empty[FunctionEdge]
var failedJobs = Set.empty[FunctionEdge]
while (running && readyJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) {
val edge = readyJobs.head
edge.runner = newRunner(edge.function)
edge.start()
runningJobs += edge
startedJobs += edge
readyJobs -= edge
logNextStatusCounts = true
}
runningJobs ++= startedJobs
statusCounts.pending -= startedJobs.size
statusCounts.running += startedJobs.size
if (logNextStatusCounts)
logStatusCounts
logNextStatusCounts = false
deleteDoneIntermediates(lastRunningCheck)
if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(nextRunningCheck(lastRunningCheck))
lastRunningCheck = System.currentTimeMillis
updateStatus()
var doneJobs = List.empty[FunctionEdge]
var failedJobs = List.empty[FunctionEdge]
runningJobs.foreach(edge => edge.status match {
case RunnerStatus.DONE => doneJobs :+= edge
case RunnerStatus.FAILED => failedJobs :+= edge
case RunnerStatus.DONE => doneJobs += edge
case RunnerStatus.FAILED => failedJobs += edge
case RunnerStatus.RUNNING => /* do nothing while still running */
})
runningJobs --= doneJobs
runningJobs --= failedJobs
if (!settings.keepIntermediates)
intermediatesJobs ++= doneJobs.filter(_.function.isIntermediate)
statusCounts.running -= doneJobs.size
statusCounts.running -= failedJobs.size
statusCounts.done += doneJobs.size
statusCounts.failed += failedJobs.size
if (doneJobs.size > 0 || failedJobs.size > 0)
logNextStatusCounts = true
if (running && failedJobs.size > 0) {
emailFailedJobs(failedJobs)
@ -378,7 +431,8 @@ class QGraph extends Logging {
readyJobs ++= getReadyJobs()
}
deleteIntermediateOutputs()
logStatusCounts
deleteDoneIntermediates(-1)
} catch {
case e =>
logger.error("Uncaught error running jobs.", e)
@ -388,7 +442,16 @@ class QGraph extends Logging {
}
}
private def nextRunningCheck(lastRunningCheck: Long) = 0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck))
private def readyRunningCheck(lastRunningCheck: Long) =
lastRunningCheck > 0 && nextRunningCheck(lastRunningCheck) <= 0
private def nextRunningCheck(lastRunningCheck: Long) =
0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck))
private def logStatusCounts {
logger.info("%d Pend, %d Run, %d Fail, %d Done".format(
statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done))
}
/**
* Updates the status of edges in the graph.
@ -396,11 +459,12 @@ class QGraph extends Logging {
*/
private def updateGraphStatus(cleanOutputs: Boolean) {
traverseFunctions(edge => checkDone(edge, cleanOutputs))
traverseFunctions(edge => recheckDone(edge))
}
/**
* Checks if an edge is done or if it's an intermediate edge if it can be skipped.
* This function may modify previous edges if it discovers that the edge passed in
* First pass that checks if an edge is done or if it's an intermediate edge if it can be skipped.
* This function may modify the status of previous edges if it discovers that the edge passed in
* is dependent jobs that were previously marked as skipped.
* @param edge Edge to check to see if it's done or can be skipped.
* @param cleanOutputs If true will delete outputs when setting edges to pending.
@ -422,6 +486,53 @@ class QGraph extends Logging {
}
}
/**
* Second pass which
* a) Updates the status counts based on the function statuses
* b) Checks if the edge is a completed intermediate edge then adds it to the set of candidates for cleanup
* @param edge Edge to check to see if it's done or skipped.
*/
private def recheckDone(edge: FunctionEdge) {
edge.status match {
case RunnerStatus.PENDING => statusCounts.pending += 1
case RunnerStatus.FAILED => statusCounts.failed += 1
case RunnerStatus.DONE => statusCounts.done += 1
case RunnerStatus.SKIPPED => statusCounts.done += 1
}
if (edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) {
logger.debug("Already done: " + edge.function.description)
if (!settings.keepIntermediates && edge.function.isIntermediate)
intermediatesJobs += edge
}
}
/**
* Continues deleting the outputs of intermediate jobs that are no longer needed until it's time to recheck running status.
* @param lastRunningCheck The last time the status was checked.
*/
private def deleteDoneIntermediates(lastRunningCheck: Long) {
var doneJobs = Set.empty[FunctionEdge]
for (edge <- intermediatesJobs) {
val nextDone = nextFunctions(edge).forall(next => {
val status = next.status
(status == RunnerStatus.DONE || status == RunnerStatus.SKIPPED)
})
if (nextDone)
doneJobs += edge
}
for (edge <- doneJobs) {
if (running && !readyRunningCheck(lastRunningCheck)) {
logger.debug("Deleting intermediates:" + edge.function.description)
edge.function.deleteOutputs()
intermediatesJobs -= edge
}
}
}
/**
* Returns the graph depth for the function.
* @param edge Function edge to get the edge for.
@ -464,18 +575,18 @@ class QGraph extends Logging {
}
}
private def emailFailedJobs(failed: List[FunctionEdge]) {
private def emailFailedJobs(failed: Set[FunctionEdge]) {
if (settings.statusEmailTo.size > 0) {
val emailMessage = new EmailMessage
emailMessage.from = settings.statusEmailFrom
emailMessage.to = settings.statusEmailTo
emailMessage.subject = "Queue function: Failure: " + settings.qSettings.jobNamePrefix
addFailedFunctions(emailMessage, failed)
addFailedFunctions(emailMessage, failed.toList)
emailMessage.trySend(settings.qSettings.emailSettings)
}
}
private def checkRetryJobs(failed: List[FunctionEdge]) {
private def checkRetryJobs(failed: Set[FunctionEdge]) {
if (settings.retries > 0) {
for (failedJob <- failed) {
if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) {
@ -483,6 +594,8 @@ class QGraph extends Logging {
failedJob.resetToPending(true)
logger.info("Reset for retry attempt %d of %d: %s".format(
failedJob.retries, settings.retries, failedJob.function.description))
statusCounts.failed -= 1
statusCounts.pending += 1
} else {
logger.info("Giving up after retrying %d times: %s".format(
settings.retries, failedJob.function.description))
@ -783,18 +896,6 @@ class QGraph extends Logging {
iterator.foreach(_ => {})
}
private def deleteIntermediateOutputs() {
if (running && !settings.keepIntermediates && success) {
logger.info("Deleting intermediate files.")
traverseFunctions(edge => {
if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs) {
logger.debug("Deleting intermediates:" + edge.function.description)
edge.function.deleteOutputs()
}
})
}
}
/**
* Outputs the graph to a .dot file.
* http://en.wikipedia.org/wiki/DOT_language