Sending a version of per job status messages.

In addition to outputs, inputs are passed to QStatusMessenger.done()
CloneFunction.cloneIndex has a new CloneFunction.cloneCount companion useful for display purposes.
This commit is contained in:
kshakir 2012-10-23 12:34:26 -04:00
parent 5fac5bf12e
commit 8dfa24df7b
6 changed files with 54 additions and 12 deletions

View File

@ -123,6 +123,8 @@ class QCommandLine extends CommandLineProgram with Logging {
commandPlugin.statusMessenger.started()
}
qGraph.messengers = allCommandPlugins.filter(_.statusMessenger != null).map(_.statusMessenger).toSeq
// TODO: Default command plugin argument?
val remoteFileConverter = (
for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null))
@ -178,14 +180,17 @@ class QCommandLine extends CommandLineProgram with Logging {
qGraph.logFailed()
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.exit("Done with errors")
commandPlugin.statusMessenger.exit("Done with errors: %s".format(qGraph.formattedStatusCounts))
1
} else {
if (settings.run) {
allQScripts.foreach(_.pushOutputs())
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.done(allQScripts.map(_.remoteOutputs))
if (commandPlugin.statusMessenger != null) {
val allInputs = allQScripts.map(_.remoteInputs)
val allOutputs = allQScripts.map(_.remoteOutputs)
commandPlugin.statusMessenger.done(allInputs, allOutputs)
}
}
0
}

View File

@ -149,8 +149,17 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
* List out the remote outputs
* @return the RemoteFile outputs by argument source
*/
def remoteOutputs: Map[ArgumentSource, Seq[RemoteFile]] =
outputFields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap
def remoteInputs: Map[ArgumentSource, Seq[RemoteFile]] = remoteFieldMap(inputFields)
/**
* List out the remote outputs
* @return the RemoteFile outputs by argument source
*/
def remoteOutputs: Map[ArgumentSource, Seq[RemoteFile]] = remoteFieldMap(outputFields)
private def remoteFieldMap(fields: Seq[ArgumentSource]): Map[ArgumentSource, Seq[RemoteFile]] = {
fields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap
}
private def filterRemoteFiles(fields: Seq[File]): Seq[RemoteFile] =
fields.filter(field => field != null && field.isInstanceOf[RemoteFile]).map(_.asInstanceOf[RemoteFile])

View File

@ -47,6 +47,7 @@ import java.io.{OutputStreamWriter, File}
*/
class QGraph extends Logging {
var settings: QGraphSettings = _
var messengers: Seq[QStatusMessenger] = Nil
private def dryRun = !settings.run
private var numMissingValues = 0
@ -95,7 +96,7 @@ class QGraph extends Logging {
* The settings aren't necessarily available until after this QGraph object has been constructed, so
* this function must be called once the QGraphSettings have been filled in.
*
* @param settings
* @param settings QGraphSettings
*/
def initializeWithSettings(settings: QGraphSettings) {
this.settings = settings
@ -430,6 +431,7 @@ class QGraph extends Logging {
val edge = readyJobs.head
edge.runner = newRunner(edge.function)
edge.start()
messengers.foreach(_.started(jobShortName(edge.function)))
startedJobs += edge
readyJobs -= edge
logNextStatusCounts = true
@ -465,8 +467,14 @@ class QGraph extends Logging {
updateStatus()
runningJobs.foreach(edge => edge.status match {
case RunnerStatus.DONE => doneJobs += edge
case RunnerStatus.FAILED => failedJobs += edge
case RunnerStatus.DONE => {
doneJobs += edge
messengers.foreach(_.done(jobShortName(edge.function)))
}
case RunnerStatus.FAILED => {
failedJobs += edge
messengers.foreach(_.exit(jobShortName(edge.function), edge.function.jobErrorLines.mkString("%n".format())))
}
case RunnerStatus.RUNNING => /* do nothing while still running */
})
@ -493,7 +501,7 @@ class QGraph extends Logging {
// incremental
if ( logNextStatusCounts && INCREMENTAL_JOBS_REPORT ) {
logger.info("Writing incremental jobs reports...")
writeJobsReport(false)
writeJobsReport(plot = false)
}
readyJobs ++= getReadyJobs
@ -516,9 +524,13 @@ class QGraph extends Logging {
private def nextRunningCheck(lastRunningCheck: Long) =
((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck))
def formattedStatusCounts: String = {
"%d Pend, %d Run, %d Fail, %d Done".format(
statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done)
}
private def logStatusCounts() {
logger.info("%d Pend, %d Run, %d Fail, %d Done".format(
statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done))
logger.info(formattedStatusCounts)
}
/**
@ -533,6 +545,16 @@ class QGraph extends Logging {
traverseFunctions(edge => recheckDone(edge))
}
// TODO: Yet another field to add (with overloads) to QFunction?
private def jobShortName(function: QFunction): String = {
var name = function.analysisName
if (function.isInstanceOf[CloneFunction]) {
val cloneFunction = function.asInstanceOf[CloneFunction]
name += " %d of %d".format(cloneFunction.cloneIndex, cloneFunction.cloneCount)
}
name
}
/**
* First pass that checks if an edge is done or if it's an intermediate edge if it can be skipped.
* This function may modify the status of previous edges if it discovers that the edge passed in

View File

@ -8,6 +8,10 @@ import org.broadinstitute.sting.queue.util.RemoteFile
*/
trait QStatusMessenger {
def started()
def done(files: Seq[Map[ArgumentSource, Seq[RemoteFile]]])
def done(inputs: Seq[Map[ArgumentSource, Seq[RemoteFile]]], outputs: Seq[Map[ArgumentSource, Seq[RemoteFile]]])
def exit(message: String)
def started(job: String)
def done(job: String)
def exit(job: String, message: String)
}

View File

@ -38,6 +38,7 @@ object CloneFunction {
class CloneFunction extends CommandLineFunction {
var originalFunction: ScatterGatherableFunction = _
var cloneIndex: Int = _
var cloneCount: Int = _
private var overriddenFields = Map.empty[ArgumentSource, Any]
private var withScatterPartCount = 0

View File

@ -176,6 +176,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
cloneFunction.originalFunction = this
cloneFunction.analysisName = this.analysisName
cloneFunction.cloneIndex = i
cloneFunction.cloneCount = numClones
cloneFunction.commandDirectory = this.scatterGatherTempDir(dirFormat.format(i))
cloneFunction.jobOutputFile = if (IOUtils.isSpecialFile(this.jobOutputFile)) this.jobOutputFile else new File(this.jobOutputFile.getName)
if (this.jobErrorFile != null)