From 8dfa24df7b9d163b2a5f46c3bd2d89173efa7b62 Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 23 Oct 2012 12:34:26 -0400 Subject: [PATCH] Sending a version of per job status messages. In addition to outputs, inputs are passed to QStatusMessenger.done() CloneFunction.cloneIndex has a new CloneFunction.cloneCount companion useful for display purposes. --- .../sting/queue/QCommandLine.scala | 11 ++++-- .../broadinstitute/sting/queue/QScript.scala | 13 +++++-- .../sting/queue/engine/QGraph.scala | 34 +++++++++++++++---- .../sting/queue/engine/QStatusMessenger.scala | 6 +++- .../scattergather/CloneFunction.scala | 1 + .../ScatterGatherableFunction.scala | 1 + 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 2afa66d9c..65abaf7be 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -123,6 +123,8 @@ class QCommandLine extends CommandLineProgram with Logging { commandPlugin.statusMessenger.started() } + qGraph.messengers = allCommandPlugins.filter(_.statusMessenger != null).map(_.statusMessenger).toSeq + // TODO: Default command plugin argument? val remoteFileConverter = ( for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null)) @@ -178,14 +180,17 @@ class QCommandLine extends CommandLineProgram with Logging { qGraph.logFailed() for (commandPlugin <- allCommandPlugins) if (commandPlugin.statusMessenger != null) - commandPlugin.statusMessenger.exit("Done with errors") + commandPlugin.statusMessenger.exit("Done with errors: %s".format(qGraph.formattedStatusCounts)) 1 } else { if (settings.run) { allQScripts.foreach(_.pushOutputs()) for (commandPlugin <- allCommandPlugins) - if (commandPlugin.statusMessenger != null) - commandPlugin.statusMessenger.done(allQScripts.map(_.remoteOutputs)) + if (commandPlugin.statusMessenger != null) { + val allInputs = allQScripts.map(_.remoteInputs) + val allOutputs = allQScripts.map(_.remoteOutputs) + commandPlugin.statusMessenger.done(allInputs, allOutputs) + } } 0 } diff --git a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala index 3df61b1e3..8c834696c 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -149,8 +149,17 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon * List out the remote outputs * @return the RemoteFile outputs by argument source */ - def remoteOutputs: Map[ArgumentSource, Seq[RemoteFile]] = - outputFields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap + def remoteInputs: Map[ArgumentSource, Seq[RemoteFile]] = remoteFieldMap(inputFields) + + /** + * List out the remote outputs + * @return the RemoteFile outputs by argument source + */ + def remoteOutputs: Map[ArgumentSource, Seq[RemoteFile]] = remoteFieldMap(outputFields) + + private def remoteFieldMap(fields: Seq[ArgumentSource]): Map[ArgumentSource, Seq[RemoteFile]] = { + fields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap + } private def filterRemoteFiles(fields: Seq[File]): Seq[RemoteFile] = fields.filter(field => field != null && field.isInstanceOf[RemoteFile]).map(_.asInstanceOf[RemoteFile]) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 2c33596e1..4f7dd665d 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -47,6 +47,7 @@ import java.io.{OutputStreamWriter, File} */ class QGraph extends Logging { var settings: QGraphSettings = _ + var messengers: Seq[QStatusMessenger] = Nil private def dryRun = !settings.run private var numMissingValues = 0 @@ -95,7 +96,7 @@ class QGraph extends Logging { * The settings aren't necessarily available until after this QGraph object has been constructed, so * this function must be called once the QGraphSettings have been filled in. * - * @param settings + * @param settings QGraphSettings */ def initializeWithSettings(settings: QGraphSettings) { this.settings = settings @@ -430,6 +431,7 @@ class QGraph extends Logging { val edge = readyJobs.head edge.runner = newRunner(edge.function) edge.start() + messengers.foreach(_.started(jobShortName(edge.function))) startedJobs += edge readyJobs -= edge logNextStatusCounts = true @@ -465,8 +467,14 @@ class QGraph extends Logging { updateStatus() runningJobs.foreach(edge => edge.status match { - case RunnerStatus.DONE => doneJobs += edge - case RunnerStatus.FAILED => failedJobs += edge + case RunnerStatus.DONE => { + doneJobs += edge + messengers.foreach(_.done(jobShortName(edge.function))) + } + case RunnerStatus.FAILED => { + failedJobs += edge + messengers.foreach(_.exit(jobShortName(edge.function), edge.function.jobErrorLines.mkString("%n".format()))) + } case RunnerStatus.RUNNING => /* do nothing while still running */ }) @@ -493,7 +501,7 @@ class QGraph extends Logging { // incremental if ( logNextStatusCounts && INCREMENTAL_JOBS_REPORT ) { logger.info("Writing incremental jobs reports...") - writeJobsReport(false) + writeJobsReport(plot = false) } readyJobs ++= getReadyJobs @@ -516,9 +524,13 @@ class QGraph extends Logging { private def nextRunningCheck(lastRunningCheck: Long) = ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck)) + def formattedStatusCounts: String = { + "%d Pend, %d Run, %d Fail, %d Done".format( + statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done) + } + private def logStatusCounts() { - logger.info("%d Pend, %d Run, %d Fail, %d Done".format( - statusCounts.pending, statusCounts.running, statusCounts.failed, statusCounts.done)) + logger.info(formattedStatusCounts) } /** @@ -533,6 +545,16 @@ class QGraph extends Logging { traverseFunctions(edge => recheckDone(edge)) } + // TODO: Yet another field to add (with overloads) to QFunction? + private def jobShortName(function: QFunction): String = { + var name = function.analysisName + if (function.isInstanceOf[CloneFunction]) { + val cloneFunction = function.asInstanceOf[CloneFunction] + name += " %d of %d".format(cloneFunction.cloneIndex, cloneFunction.cloneCount) + } + name + } + /** * First pass that checks if an edge is done or if it's an intermediate edge if it can be skipped. * This function may modify the status of previous edges if it discovers that the edge passed in diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QStatusMessenger.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QStatusMessenger.scala index eeabe6d1d..c4151dafc 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QStatusMessenger.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QStatusMessenger.scala @@ -8,6 +8,10 @@ import org.broadinstitute.sting.queue.util.RemoteFile */ trait QStatusMessenger { def started() - def done(files: Seq[Map[ArgumentSource, Seq[RemoteFile]]]) + def done(inputs: Seq[Map[ArgumentSource, Seq[RemoteFile]]], outputs: Seq[Map[ArgumentSource, Seq[RemoteFile]]]) def exit(message: String) + + def started(job: String) + def done(job: String) + def exit(job: String, message: String) } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index 91cacbb71..861db3f80 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -38,6 +38,7 @@ object CloneFunction { class CloneFunction extends CommandLineFunction { var originalFunction: ScatterGatherableFunction = _ var cloneIndex: Int = _ + var cloneCount: Int = _ private var overriddenFields = Map.empty[ArgumentSource, Any] private var withScatterPartCount = 0 diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 5dd7d4c79..b00437f9f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -176,6 +176,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { cloneFunction.originalFunction = this cloneFunction.analysisName = this.analysisName cloneFunction.cloneIndex = i + cloneFunction.cloneCount = numClones cloneFunction.commandDirectory = this.scatterGatherTempDir(dirFormat.format(i)) cloneFunction.jobOutputFile = if (IOUtils.isSpecialFile(this.jobOutputFile)) this.jobOutputFile else new File(this.jobOutputFile.getName) if (this.jobErrorFile != null)