diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 7210deefc..abf4cbe09 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -60,6 +60,7 @@ class QGraph extends Logging { private var running = true private val runningLock = new Object private var runningJobs = Set.empty[FunctionEdge] + private var intermediatesJobs = Set.empty[FunctionEdge] private val nl = "%n".format() @@ -68,6 +69,14 @@ class QGraph extends Logging { private val inProcessManager = new InProcessJobManager private def managers = List[Any](inProcessManager, commandLineManager) + private class StatusCounts { + var pending = 0 + var running = 0 + var failed = 0 + var done = 0 + } + private val statusCounts = new StatusCounts + /** * Adds a QScript created CommandLineFunction to the graph. * @param command Function to add to the graph. @@ -192,6 +201,27 @@ class QGraph extends Logging { previous } + /** + * Walks up the graph looking for the next function edges. + * @param edge Graph edge to examine for the next functions. + * @return A list of prior function edges. + */ + private def nextFunctions(edge: QEdge): List[FunctionEdge] = { + var next = List.empty[FunctionEdge] + val target = this.jobGraph.getEdgeTarget(edge) + for (outgoingEdge <- this.jobGraph.outgoingEdgesOf(target)) { + outgoingEdge match { + + // Stop recursing when we find a function edge and return it + case functionEdge: FunctionEdge => next :+= functionEdge + + // For any other type of edge find the jobs following the edge + case edge: QEdge => next ++= nextFunctions(edge) + } + } + next + } + /** * Fills in the graph using mapping functions, then removes out of date * jobs, then cleans up mapping functions and nodes that aren't need. @@ -341,34 +371,57 @@ class QGraph extends Logging { readyJobs ++= getReadyJobs() runningJobs = Set.empty[FunctionEdge] var lastRunningCheck = System.currentTimeMillis + var logNextStatusCounts = true while (running && readyJobs.size + runningJobs.size > 0) { - while (running && readyJobs.size > 0 && nextRunningCheck(lastRunningCheck) > 0) { + var startedJobs = Set.empty[FunctionEdge] + var doneJobs = Set.empty[FunctionEdge] + var failedJobs = Set.empty[FunctionEdge] + + while (running && readyJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) { val edge = readyJobs.head edge.runner = newRunner(edge.function) edge.start() - runningJobs += edge + startedJobs += edge readyJobs -= edge + logNextStatusCounts = true } + runningJobs ++= startedJobs + statusCounts.pending -= startedJobs.size + statusCounts.running += startedJobs.size + + if (logNextStatusCounts) + logStatusCounts + logNextStatusCounts = false + + deleteDoneIntermediates(lastRunningCheck) + if (readyJobs.size == 0 && runningJobs.size > 0) Thread.sleep(nextRunningCheck(lastRunningCheck)) lastRunningCheck = System.currentTimeMillis updateStatus() - var doneJobs = List.empty[FunctionEdge] - var failedJobs = List.empty[FunctionEdge] - runningJobs.foreach(edge => edge.status match { - case RunnerStatus.DONE => doneJobs :+= edge - case RunnerStatus.FAILED => failedJobs :+= edge + case RunnerStatus.DONE => doneJobs += edge + case RunnerStatus.FAILED => failedJobs += edge case RunnerStatus.RUNNING => /* do nothing while still running */ }) runningJobs --= doneJobs runningJobs --= failedJobs + if (!settings.keepIntermediates) + intermediatesJobs ++= doneJobs.filter(_.function.isIntermediate) + + statusCounts.running -= doneJobs.size + statusCounts.running -= failedJobs.size + statusCounts.done += doneJobs.size + statusCounts.failed += failedJobs.size + + if (doneJobs.size > 0 || failedJobs.size > 0) + logNextStatusCounts = true if (running && failedJobs.size > 0) { emailFailedJobs(failedJobs) @@ -378,7 +431,8 @@ class QGraph extends Logging { readyJobs ++= getReadyJobs() } - deleteIntermediateOutputs() + logStatusCounts + deleteDoneIntermediates(-1) } catch { case e => logger.error("Uncaught error running jobs.", e) @@ -388,7 +442,16 @@ class QGraph extends Logging { } } - private def nextRunningCheck(lastRunningCheck: Long) = 0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck)) + private def readyRunningCheck(lastRunningCheck: Long) = + lastRunningCheck > 0 && nextRunningCheck(lastRunningCheck) <= 0 + + private def nextRunningCheck(lastRunningCheck: Long) = + 0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck)) + + private def logStatusCounts { + logger.info("%d Pend, %d Run, %d Fail, %d Done".format( + statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done)) + } /** * Updates the status of edges in the graph. @@ -396,11 +459,12 @@ class QGraph extends Logging { */ private def updateGraphStatus(cleanOutputs: Boolean) { traverseFunctions(edge => checkDone(edge, cleanOutputs)) + traverseFunctions(edge => recheckDone(edge)) } /** - * Checks if an edge is done or if it's an intermediate edge if it can be skipped. - * This function may modify previous edges if it discovers that the edge passed in + * First pass that checks if an edge is done or if it's an intermediate edge if it can be skipped. + * This function may modify the status of previous edges if it discovers that the edge passed in * is dependent jobs that were previously marked as skipped. * @param edge Edge to check to see if it's done or can be skipped. * @param cleanOutputs If true will delete outputs when setting edges to pending. @@ -422,6 +486,53 @@ class QGraph extends Logging { } } + /** + * Second pass which + * a) Updates the status counts based on the function statuses + * b) Checks if the edge is a completed intermediate edge then adds it to the set of candidates for cleanup + * @param edge Edge to check to see if it's done or skipped. + */ + private def recheckDone(edge: FunctionEdge) { + edge.status match { + case RunnerStatus.PENDING => statusCounts.pending += 1 + case RunnerStatus.FAILED => statusCounts.failed += 1 + case RunnerStatus.DONE => statusCounts.done += 1 + case RunnerStatus.SKIPPED => statusCounts.done += 1 + } + + if (edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) { + logger.debug("Already done: " + edge.function.description) + if (!settings.keepIntermediates && edge.function.isIntermediate) + intermediatesJobs += edge + } + } + + /** + * Continues deleting the outputs of intermediate jobs that are no longer needed until it's time to recheck running status. + * @param lastRunningCheck The last time the status was checked. + */ + private def deleteDoneIntermediates(lastRunningCheck: Long) { + var doneJobs = Set.empty[FunctionEdge] + + for (edge <- intermediatesJobs) { + val nextDone = nextFunctions(edge).forall(next => { + val status = next.status + (status == RunnerStatus.DONE || status == RunnerStatus.SKIPPED) + }) + + if (nextDone) + doneJobs += edge + } + + for (edge <- doneJobs) { + if (running && !readyRunningCheck(lastRunningCheck)) { + logger.debug("Deleting intermediates:" + edge.function.description) + edge.function.deleteOutputs() + intermediatesJobs -= edge + } + } + } + /** * Returns the graph depth for the function. * @param edge Function edge to get the edge for. @@ -464,18 +575,18 @@ class QGraph extends Logging { } } - private def emailFailedJobs(failed: List[FunctionEdge]) { + private def emailFailedJobs(failed: Set[FunctionEdge]) { if (settings.statusEmailTo.size > 0) { val emailMessage = new EmailMessage emailMessage.from = settings.statusEmailFrom emailMessage.to = settings.statusEmailTo emailMessage.subject = "Queue function: Failure: " + settings.qSettings.jobNamePrefix - addFailedFunctions(emailMessage, failed) + addFailedFunctions(emailMessage, failed.toList) emailMessage.trySend(settings.qSettings.emailSettings) } } - private def checkRetryJobs(failed: List[FunctionEdge]) { + private def checkRetryJobs(failed: Set[FunctionEdge]) { if (settings.retries > 0) { for (failedJob <- failed) { if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) { @@ -483,6 +594,8 @@ class QGraph extends Logging { failedJob.resetToPending(true) logger.info("Reset for retry attempt %d of %d: %s".format( failedJob.retries, settings.retries, failedJob.function.description)) + statusCounts.failed -= 1 + statusCounts.pending += 1 } else { logger.info("Giving up after retrying %d times: %s".format( settings.retries, failedJob.function.description)) @@ -783,18 +896,6 @@ class QGraph extends Logging { iterator.foreach(_ => {}) } - private def deleteIntermediateOutputs() { - if (running && !settings.keepIntermediates && success) { - logger.info("Deleting intermediate files.") - traverseFunctions(edge => { - if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs) { - logger.debug("Deleting intermediates:" + edge.function.description) - edge.function.deleteOutputs() - } - }) - } - } - /** * Outputs the graph to a .dot file. * http://en.wikipedia.org/wiki/DOT_language