diff --git a/ivy.xml b/ivy.xml
index 9bb536839..e46e02cb2 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -12,7 +12,10 @@
-
+
+
+
+
@@ -39,6 +42,7 @@
+
@@ -48,7 +52,8 @@
+
+
-
diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
index bd361c610..f560225cd 100755
--- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
@@ -4,7 +4,7 @@ import java.io.File
import java.util.Arrays
import org.broadinstitute.sting.queue.engine.QGraph
import org.broadinstitute.sting.commandline._
-import org.broadinstitute.sting.queue.util.{ProcessController, Logging, ScalaCompoundArgumentTypeDescriptor}
+import org.broadinstitute.sting.queue.util._
/**
* Entry point of Queue. Compiles and runs QScripts passed in to the command line.
@@ -38,6 +38,12 @@ class QCommandLine extends CommandLineProgram with Logging {
@ArgumentCollection
private val qSettings = new QSettings
+ @Argument(fullName="statusEmailFrom", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
+ private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName
+
+ @Argument(fullName="statusEmailTo", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
+ private var statusEmailTo: List[String] = Nil
+
/**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated
* functions, and then builds and runs a QGraph based on the dependencies.
@@ -52,6 +58,8 @@ class QCommandLine extends CommandLineProgram with Logging {
qGraph.expandedDotFile = expandedDotFile
qGraph.qSettings = qSettings
qGraph.debugMode = debugMode == true
+ qGraph.statusEmailFrom = statusEmailFrom
+ qGraph.statusEmailTo = statusEmailTo
val scripts = qScriptManager.createScripts()
for (script <- scripts) {
diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala
index 808a5ea81..5ae9a7c77 100644
--- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala
@@ -1,8 +1,8 @@
package org.broadinstitute.sting.queue
-import org.broadinstitute.sting.commandline.Argument
import java.io.File
-import java.lang.management.ManagementFactory
+import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument}
+import org.broadinstitute.sting.queue.util.{SystemUtils, EmailSettings}
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
@@ -22,18 +22,15 @@ class QSettings {
@Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
var memoryLimit: Option[Int] = None
+
+ @ArgumentCollection
+ val emailSettings = new EmailSettings
}
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
*/
object QSettings {
- /** A semi-unique job prefix using the host name and the process id. */
- private val processNamePrefix = "Q-" + {
- var prefix = ManagementFactory.getRuntimeMXBean.getName
- val index = prefix.indexOf(".")
- if (index >= 0)
- prefix = prefix.substring(0, index)
- prefix
- }
+ /** A semi-unique job prefix using the host name and the process id. */
+ private val processNamePrefix = "Q-" + SystemUtils.pidAtHost.split('.')(0)
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
index f8c27bf47..fc2e9aeee 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
@@ -11,8 +11,8 @@ import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.{QSettings, QException}
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
-import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging}
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
+import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging}
/**
* The internal dependency tracker between sets of function input and output files.
@@ -25,6 +25,9 @@ class QGraph extends Logging {
var expandedDotFile: File = _
var qSettings: QSettings = _
var debugMode = false
+ var statusEmailFrom: String = _
+ var statusEmailTo: List[String] = _
+
private val jobGraph = newGraph
/**
@@ -262,35 +265,51 @@ class QGraph extends Logging {
* Runs the jobs by traversing the graph.
*/
private def runJobs() = {
- traverseFunctions(edge => {
- val isDone = !this.startClean &&
- edge.status == RunnerStatus.DONE &&
- this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
- if (!isDone)
- edge.resetToPending()
- })
-
- var readyJobs = getReadyJobs
- var runningJobs = Set.empty[FunctionEdge]
- while (readyJobs.size + runningJobs.size > 0) {
- var exitedJobs = List.empty[FunctionEdge]
- runningJobs.foreach(runner => {
- if (runner.status != RunnerStatus.RUNNING)
- exitedJobs :+= runner
- })
- exitedJobs.foreach(runner => runningJobs -= runner)
-
- readyJobs.foreach(f => {
- f.runner = newRunner(f.function)
- f.runner.start()
- if (f.status == RunnerStatus.RUNNING) {
- runningJobs += f
- }
+ try {
+ traverseFunctions(edge => {
+ val isDone = !this.startClean &&
+ edge.status == RunnerStatus.DONE &&
+ this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
+ if (!isDone)
+ edge.resetToPending()
})
- if (readyJobs.size == 0 && runningJobs.size > 0)
- Thread.sleep(30000L)
- readyJobs = getReadyJobs
+ var readyJobs = getReadyJobs
+ var runningJobs = Set.empty[FunctionEdge]
+ while (readyJobs.size + runningJobs.size > 0) {
+ var exitedJobs = List.empty[FunctionEdge]
+ var failedJobs = List.empty[FunctionEdge]
+
+ runningJobs.foreach(runner => runner.status match {
+ case RunnerStatus.RUNNING => /* do nothing while still running */
+ case RunnerStatus.FAILED => exitedJobs :+= runner; failedJobs :+= runner
+ case RunnerStatus.DONE => exitedJobs :+= runner
+ })
+ exitedJobs.foreach(runner => runningJobs -= runner)
+
+ readyJobs.foreach(f => {
+ f.runner = newRunner(f.function)
+ f.runner.start()
+ f.status match {
+ case RunnerStatus.RUNNING => runningJobs += f
+ case RunnerStatus.FAILED => failedJobs :+= f
+ case RunnerStatus.DONE => /* do nothing and move on */
+ }
+ })
+
+ if (failedJobs.size > 0)
+ emailFailedJobs(failedJobs)
+
+ if (readyJobs.size == 0 && runningJobs.size > 0)
+ Thread.sleep(30000L)
+ readyJobs = getReadyJobs
+ }
+
+ emailStatus()
+ } catch {
+ case e =>
+ logger.error("Uncaught error running jobs.", e)
+ throw e
}
}
@@ -308,6 +327,57 @@ class QGraph extends Logging {
}
}
+ private def emailFailedJobs(jobs: List[FunctionEdge]) = {
+ if (statusEmailTo.size > 0) {
+ val emailMessage = new EmailMessage
+ emailMessage.from = statusEmailFrom
+ emailMessage.to = statusEmailTo
+ emailMessage.body = getStatus
+ emailMessage.subject = "Queue function: Failure"
+ emailMessage.body = "Failed functions: %n%n%s%n"
+ .format(jobs.map(_.function.description).mkString("%n%n".format()))
+ emailMessage.attachments = jobs.flatMap(edge => logFiles(edge))
+ emailMessage.trySend(qSettings.emailSettings)
+ }
+ }
+
+ private def emailStatus() = {
+ if (statusEmailTo.size > 0) {
+ var failedFunctions = List.empty[String]
+ var failedOutputs = List.empty[File]
+ foreachFunction(edge => {
+ if (edge.status == RunnerStatus.FAILED) {
+ failedFunctions :+= edge.function.description
+ failedOutputs ++= logFiles(edge)
+ }
+ })
+
+ val emailMessage = new EmailMessage
+ emailMessage.from = statusEmailFrom
+ emailMessage.to = statusEmailTo
+ emailMessage.body = getStatus + "%n".format()
+ if (failedFunctions.size == 0) {
+ emailMessage.subject = "Queue run: Success"
+ } else {
+ emailMessage.subject = "Queue run: Failure"
+ emailMessage.attachments = failedOutputs
+ }
+ emailMessage.trySend(qSettings.emailSettings)
+ }
+ }
+
+ private def logFiles(edge: FunctionEdge) = {
+ // TODO: All functions should be writing error files, including InProcessFunctions
+ var failedOutputs = List.empty[File]
+ if (edge.function.isInstanceOf[CommandLineFunction]) {
+ val clf = edge.function.asInstanceOf[CommandLineFunction]
+ failedOutputs :+= clf.jobOutputFile
+ if (clf.jobErrorFile != null)
+ failedOutputs :+= clf.jobErrorFile
+ }
+ failedOutputs.filter(file => file != null && file.exists)
+ }
+
/**
* Tracks analysis status.
*/
@@ -327,9 +397,26 @@ class QGraph extends Logging {
}
/**
- * Gets job statuses by traversing the graph and looking for status-related files
+ * Logs job statuses by traversing the graph and looking for status-related files
*/
private def logStatus = {
+ doStatus(status => logger.info(status))
+ }
+
+ /**
+ * Gets job statuses by traversing the graph and looking for status-related files
+ */
+ private def getStatus = {
+ val buffer = new StringBuilder
+ val nl = "%n".format()
+ doStatus(status => buffer.append(status).append(nl))
+ buffer.toString
+ }
+
+ /**
+ * Gets job statuses by traversing the graph and looking for status-related files
+ */
+ private def doStatus(statusFunc: String => Unit) = {
var statuses = Map.empty[String, AnalysisStatus]
foreachFunction(edgeCLF => {
if (edgeCLF.function.analysisName != null) {
@@ -360,7 +447,7 @@ class QGraph extends Logging {
info += formatSGStatus(status.scatter, "s")
info += formatSGStatus(status.gather, "g")
}
- logger.info(info)
+ statusFunc(info)
})
}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala b/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala
new file mode 100644
index 000000000..d950ad0ff
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala
@@ -0,0 +1,115 @@
+package org.broadinstitute.sting.queue.util
+
+import org.apache.commons.mail.{MultiPartEmail, EmailAttachment}
+import java.io.{FileReader, File}
+import javax.mail.internet.InternetAddress
+import scala.collection.JavaConversions
+
+/**
+ * Encapsulates a message to be sent over email.
+ */
+class EmailMessage extends Logging {
+ var from: String = _
+ var to: List[String] = Nil
+ var cc: List[String] = Nil
+ var bcc: List[String] = Nil
+ var subject: String = _
+ var body: String = _
+ var attachments: List[File] = Nil
+
+ /**
+ * Sends the email and throws an exception if the email can't be sent.
+ * @param settings The server settings for the email.
+ */
+ def send(settings: EmailSettings) = {
+ val email = new MultiPartEmail
+
+ email.setHostName(settings.host)
+ email.setSmtpPort(settings.port)
+ email.setTLS(settings.tls)
+ if (settings.ssl) {
+ email.setSSL(true)
+ email.setSslSmtpPort(settings.port.toString)
+ }
+
+ if (settings.username != null && settings.password != null && settings.passwordFile != null) {
+ val password = {
+ if (settings.passwordFile != null) {
+ val reader = new FileReader(settings.passwordFile)
+ try {
+ org.apache.commons.io.IOUtils.toString(reader)
+ } finally {
+ org.apache.commons.io.IOUtils.closeQuietly(reader)
+ }
+ } else {
+ settings.password
+ }
+ }
+ email.setAuthentication(settings.username, password)
+ }
+
+ email.setFrom(this.from)
+ if (this.subject != null)
+ email.setSubject(this.subject)
+ if (this.body != null)
+ email.setMsg(this.body)
+ if (this.to.size > 0)
+ email.setTo(convert(this.to))
+ if (this.cc.size > 0)
+ email.setCc(convert(this.cc))
+ if (this.bcc.size > 0)
+ email.setBcc(convert(this.bcc))
+
+ for (file <- this.attachments) {
+ val attachment = new EmailAttachment
+ attachment.setDisposition(EmailAttachment.ATTACHMENT)
+ attachment.setPath(file.getAbsolutePath)
+ attachment.setDescription(file.getAbsolutePath)
+ attachment.setName(file.getName)
+ email.attach(attachment)
+ }
+
+ email.send
+ }
+
+ /**
+ * Tries twice 30 seconds apart to send the email. Then logs the message if it can't be sent.
+ * @param settings The server settings for the email.
+ */
+ def trySend(settings: EmailSettings) = {
+ try {
+ Retry.attempt(() => send(settings), .5)
+ } catch {
+ case e => logger.error("Error sending message: %n%s".format(this.toString), e)
+ }
+ }
+
+ /**
+ * Converts the email addresses to a collection of InternetAddress which can bypass client side validation,
+ * specifically that the domain is specified.
+ * @param addresses List of email addresses.
+ * @return java.util.List of InternetAddress'es
+ */
+ private def convert(addresses: List[String]) = {
+ JavaConversions.asList(addresses.map(address => new InternetAddress(address, false)))
+ }
+
+ override def toString = {
+ """|
+ |From: %s
+ |To: %s
+ |Cc: %s
+ |Bcc: %s
+ |Subject: %s
+ |
+ |%s
+ |
+ |Attachments:
+ |%s
+ |""".stripMargin.trim.format(
+ this.from, this.to.mkString(", "),
+ this.cc.mkString(", "), this.bcc.mkString(", "),
+ this.subject, this.body,
+ this.attachments.map(_.getAbsolutePath).mkString("%n".format()))
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/EmailSettings.scala b/scala/src/org/broadinstitute/sting/queue/util/EmailSettings.scala
new file mode 100644
index 000000000..f6cfd0cf6
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/EmailSettings.scala
@@ -0,0 +1,30 @@
+package org.broadinstitute.sting.queue.util
+
+import java.io.File
+import org.broadinstitute.sting.commandline.Argument
+
+/**
+ * Email server settings.
+ */
+class EmailSettings {
+ @Argument(doc="Email SMTP host. Defaults to localhost.", shortName="emailHost", fullName="emailSmtpHost", required=false)
+ var host = "localhost"
+
+ @Argument(doc="Email SMTP port. Defaults to 465 for ssl, otherwise 25.", shortName="emailPort", fullName="emailSmtpPort", required=false)
+ var port = 25
+
+ @Argument(doc="Email should use TLS. Defaults to false.", shortName="emailTLS", fullName="emailUseTLS", required=false)
+ var tls = false
+
+ @Argument(doc="Email should use SSL. Defaults to false.", shortName="emailSSL", fullName="emailUseSSL", required=false)
+ var ssl = false
+
+ @Argument(doc="Email SMTP username. Defaults to none.", shortName="emailUser", fullName="emailUsername", required=false)
+ var username: String = _
+
+ @Argument(doc="Email SMTP password. Defaults to none. Not secure! See emailPassFile.", shortName="emailPass", fullName="emailPassword", required=false)
+ var password: String = _
+
+ @Argument(doc="Email SMTP password file. Defaults to none.", shortName="emailPassFile", fullName="emailPasswordFile", required=false)
+ var passwordFile: File = _
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/SystemUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/SystemUtils.scala
new file mode 100644
index 000000000..b4cd22909
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/SystemUtils.scala
@@ -0,0 +1,13 @@
+package org.broadinstitute.sting.queue.util
+
+import java.lang.management.ManagementFactory
+import java.net.InetAddress
+
+/**
+ * A collection of various system utilites.
+ */
+object SystemUtils {
+ val pidAtHost = ManagementFactory.getRuntimeMXBean.getName
+ val hostName = InetAddress.getLocalHost.getCanonicalHostName
+ val domainName = hostName.split('.').takeRight(2).mkString(".")
+}
diff --git a/settings/ivysettings.xml b/settings/ivysettings.xml
index 244cd1075..167a01fb5 100644
--- a/settings/ivysettings.xml
+++ b/settings/ivysettings.xml
@@ -9,6 +9,7 @@
+
@@ -19,5 +20,7 @@
+
+