From cad6722cf6a4944b1c7cc2de21e65de63a338910 Mon Sep 17 00:00:00 2001 From: kshakir Date: Thu, 14 Apr 2011 14:55:35 +0000 Subject: [PATCH] Emailing on function start. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5637 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/queue/engine/QGraph.scala | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 061aa6854..c13283a1e 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -370,6 +370,7 @@ class QGraph extends Logging { runningJobs = Set.empty[FunctionEdge] var lastRunningCheck = System.currentTimeMillis var logNextStatusCounts = true + var startedJobsToEmail = Set.empty[FunctionEdge] while (running && readyJobs.size + runningJobs.size > 0) { @@ -387,6 +388,7 @@ class QGraph extends Logging { } runningJobs ++= startedJobs + startedJobsToEmail ++= startedJobs statusCounts.pending -= startedJobs.size statusCounts.running += startedJobs.size @@ -396,6 +398,11 @@ class QGraph extends Logging { deleteCleanup(lastRunningCheck) + if (running && startedJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) { + emailStartedJobs(startedJobsToEmail) + startedJobsToEmail = Set.empty[FunctionEdge] + } + if (readyJobs.size == 0 && runningJobs.size > 0) Thread.sleep(nextRunningCheck(lastRunningCheck)) @@ -411,6 +418,8 @@ class QGraph extends Logging { runningJobs --= doneJobs runningJobs --= failedJobs + startedJobsToEmail &~= failedJobs + addCleanup(doneJobs) statusCounts.running -= doneJobs.size @@ -593,6 +602,17 @@ class QGraph extends Logging { } } + private def emailStartedJobs(started: Set[FunctionEdge]) { + if (settings.statusEmailTo.size > 0) { + val emailMessage = new EmailMessage + emailMessage.from = settings.statusEmailFrom + emailMessage.to = settings.statusEmailTo + emailMessage.subject = "Queue function: Started: " + settings.qSettings.jobNamePrefix + addStartedFunctions(emailMessage, started.toList) + emailMessage.trySend(settings.qSettings.emailSettings) + } + } + private def emailFailedJobs(failed: Set[FunctionEdge]) { if (settings.statusEmailTo.size > 0) { val emailMessage = new EmailMessage @@ -645,6 +665,17 @@ class QGraph extends Logging { } } + private def addStartedFunctions(emailMessage: EmailMessage, started: List[FunctionEdge]) { + if (emailMessage.body == null) + emailMessage.body = "" + emailMessage.body += """ + |Started functions: + | + |%s + |""".stripMargin.trim.format( + started.map(edge => emailDescription(edge)).mkString(nl+nl)) + } + private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) { val logs = failed.flatMap(edge => logFiles(edge)) @@ -658,18 +689,18 @@ class QGraph extends Logging { |Logs: |%s%n |""".stripMargin.trim.format( - failed.map(edge => failedDescription(edge)).mkString(nl+nl), + failed.map(edge => emailDescription(edge)).mkString(nl+nl), logs.map(_.getAbsolutePath).mkString(nl)) emailMessage.attachments = logs } - private def failedDescription(failed: FunctionEdge) = { + private def emailDescription(edge: FunctionEdge) = { val description = new StringBuilder if (settings.retries > 0) - description.append("Attempt %d of %d.%n".format(failed.retries + 1, settings.retries + 1)) - description.append(failed.function.description) - description.toString + description.append("Attempt %d of %d.%n".format(edge.retries + 1, settings.retries + 1)) + description.append(edge.function.description) + description.toString() } private def logFiles(edge: FunctionEdge) = {