Added status email support with -statusTo. Will send emails on failure of an individual function or success/failure of the whole pipeline.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4496 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-14 15:58:52 +00:00
parent 2606e67cf1
commit 63e3848187
8 changed files with 301 additions and 43 deletions

View File

@ -12,7 +12,10 @@
<dependency org="net.sf" name="picard" rev="latest.integration"/>
<dependency org="edu.mit.broad" name="picard-private-parts" rev="latest.integration"/>
<dependency org="junit" name="junit" rev="4.4"/>
<dependency org="log4j" name="log4j" rev="1.2.15"/>
<dependency org="log4j" name="log4j" rev="1.2.15">
<!-- Don't include javax.mail here in default, only used in scala->default by commons-email -->
<exclude org="javax.mail" />
</dependency>
<dependency org="colt" name="colt" rev="1.2.0"/>
<dependency org="jboss" name="javassist" rev="3.7.ga"/>
<dependency org="org.simpleframework" name="simple-xml" rev="2.0.4"/>
@ -39,6 +42,7 @@
<!-- Scala dependancies -->
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0" conf="scala->default"/>
<dependency org="org.apache.commons" name="commons-email" rev="1.2"/>
<!-- findbug dependencies -->
<dependency org="net.sourceforge.findbugs" name="findbugs" rev="1.3.2" conf="findbugs->default"/>
@ -48,7 +52,8 @@
<dependency org="net.sourceforge.findbugs" name="jsr305" rev="1.3.2" conf="default"/>
<!-- Exclude dependencies on sun libraries where the downloads aren't available but included in the jvm. -->
<exclude org="javax.servlet" />
<exclude org="javax.jms" />
<exclude org="com.sun.*" />
<exclude org="javax.*" />
</dependencies>
</ivy-module>

View File

@ -4,7 +4,7 @@ import java.io.File
import java.util.Arrays
import org.broadinstitute.sting.queue.engine.QGraph
import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util.{ProcessController, Logging, ScalaCompoundArgumentTypeDescriptor}
import org.broadinstitute.sting.queue.util._
/**
* Entry point of Queue. Compiles and runs QScripts passed in to the command line.
@ -38,6 +38,12 @@ class QCommandLine extends CommandLineProgram with Logging {
@ArgumentCollection
private val qSettings = new QSettings
@Argument(fullName="statusEmailFrom", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName
@Argument(fullName="statusEmailTo", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
private var statusEmailTo: List[String] = Nil
/**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated
* functions, and then builds and runs a QGraph based on the dependencies.
@ -52,6 +58,8 @@ class QCommandLine extends CommandLineProgram with Logging {
qGraph.expandedDotFile = expandedDotFile
qGraph.qSettings = qSettings
qGraph.debugMode = debugMode == true
qGraph.statusEmailFrom = statusEmailFrom
qGraph.statusEmailTo = statusEmailTo
val scripts = qScriptManager.createScripts()
for (script <- scripts) {

View File

@ -1,8 +1,8 @@
package org.broadinstitute.sting.queue
import org.broadinstitute.sting.commandline.Argument
import java.io.File
import java.lang.management.ManagementFactory
import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument}
import org.broadinstitute.sting.queue.util.{SystemUtils, EmailSettings}
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
@ -22,18 +22,15 @@ class QSettings {
@Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
var memoryLimit: Option[Int] = None
@ArgumentCollection
val emailSettings = new EmailSettings
}
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
*/
object QSettings {
/** A semi-unique job prefix using the host name and the process id. */
private val processNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
/** A semi-unique job prefix using the host name and the process id. */
private val processNamePrefix = "Q-" + SystemUtils.pidAtHost.split('.')(0)
}

View File

@ -11,8 +11,8 @@ import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.{QSettings, QException}
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging}
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging}
/**
* The internal dependency tracker between sets of function input and output files.
@ -25,6 +25,9 @@ class QGraph extends Logging {
var expandedDotFile: File = _
var qSettings: QSettings = _
var debugMode = false
var statusEmailFrom: String = _
var statusEmailTo: List[String] = _
private val jobGraph = newGraph
/**
@ -262,35 +265,51 @@ class QGraph extends Logging {
* Runs the jobs by traversing the graph.
*/
private def runJobs() = {
traverseFunctions(edge => {
val isDone = !this.startClean &&
edge.status == RunnerStatus.DONE &&
this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
if (!isDone)
edge.resetToPending()
})
var readyJobs = getReadyJobs
var runningJobs = Set.empty[FunctionEdge]
while (readyJobs.size + runningJobs.size > 0) {
var exitedJobs = List.empty[FunctionEdge]
runningJobs.foreach(runner => {
if (runner.status != RunnerStatus.RUNNING)
exitedJobs :+= runner
})
exitedJobs.foreach(runner => runningJobs -= runner)
readyJobs.foreach(f => {
f.runner = newRunner(f.function)
f.runner.start()
if (f.status == RunnerStatus.RUNNING) {
runningJobs += f
}
try {
traverseFunctions(edge => {
val isDone = !this.startClean &&
edge.status == RunnerStatus.DONE &&
this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
if (!isDone)
edge.resetToPending()
})
if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(30000L)
readyJobs = getReadyJobs
var readyJobs = getReadyJobs
var runningJobs = Set.empty[FunctionEdge]
while (readyJobs.size + runningJobs.size > 0) {
var exitedJobs = List.empty[FunctionEdge]
var failedJobs = List.empty[FunctionEdge]
runningJobs.foreach(runner => runner.status match {
case RunnerStatus.RUNNING => /* do nothing while still running */
case RunnerStatus.FAILED => exitedJobs :+= runner; failedJobs :+= runner
case RunnerStatus.DONE => exitedJobs :+= runner
})
exitedJobs.foreach(runner => runningJobs -= runner)
readyJobs.foreach(f => {
f.runner = newRunner(f.function)
f.runner.start()
f.status match {
case RunnerStatus.RUNNING => runningJobs += f
case RunnerStatus.FAILED => failedJobs :+= f
case RunnerStatus.DONE => /* do nothing and move on */
}
})
if (failedJobs.size > 0)
emailFailedJobs(failedJobs)
if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(30000L)
readyJobs = getReadyJobs
}
emailStatus()
} catch {
case e =>
logger.error("Uncaught error running jobs.", e)
throw e
}
}
@ -308,6 +327,57 @@ class QGraph extends Logging {
}
}
private def emailFailedJobs(jobs: List[FunctionEdge]) = {
if (statusEmailTo.size > 0) {
val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom
emailMessage.to = statusEmailTo
emailMessage.body = getStatus
emailMessage.subject = "Queue function: Failure"
emailMessage.body = "Failed functions: %n%n%s%n"
.format(jobs.map(_.function.description).mkString("%n%n".format()))
emailMessage.attachments = jobs.flatMap(edge => logFiles(edge))
emailMessage.trySend(qSettings.emailSettings)
}
}
private def emailStatus() = {
if (statusEmailTo.size > 0) {
var failedFunctions = List.empty[String]
var failedOutputs = List.empty[File]
foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) {
failedFunctions :+= edge.function.description
failedOutputs ++= logFiles(edge)
}
})
val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom
emailMessage.to = statusEmailTo
emailMessage.body = getStatus + "%n".format()
if (failedFunctions.size == 0) {
emailMessage.subject = "Queue run: Success"
} else {
emailMessage.subject = "Queue run: Failure"
emailMessage.attachments = failedOutputs
}
emailMessage.trySend(qSettings.emailSettings)
}
}
private def logFiles(edge: FunctionEdge) = {
// TODO: All functions should be writing error files, including InProcessFunctions
var failedOutputs = List.empty[File]
if (edge.function.isInstanceOf[CommandLineFunction]) {
val clf = edge.function.asInstanceOf[CommandLineFunction]
failedOutputs :+= clf.jobOutputFile
if (clf.jobErrorFile != null)
failedOutputs :+= clf.jobErrorFile
}
failedOutputs.filter(file => file != null && file.exists)
}
/**
* Tracks analysis status.
*/
@ -327,9 +397,26 @@ class QGraph extends Logging {
}
/**
* Gets job statuses by traversing the graph and looking for status-related files
* Logs job statuses by traversing the graph and looking for status-related files
*/
private def logStatus = {
doStatus(status => logger.info(status))
}
/**
* Gets job statuses by traversing the graph and looking for status-related files
*/
private def getStatus = {
val buffer = new StringBuilder
val nl = "%n".format()
doStatus(status => buffer.append(status).append(nl))
buffer.toString
}
/**
* Gets job statuses by traversing the graph and looking for status-related files
*/
private def doStatus(statusFunc: String => Unit) = {
var statuses = Map.empty[String, AnalysisStatus]
foreachFunction(edgeCLF => {
if (edgeCLF.function.analysisName != null) {
@ -360,7 +447,7 @@ class QGraph extends Logging {
info += formatSGStatus(status.scatter, "s")
info += formatSGStatus(status.gather, "g")
}
logger.info(info)
statusFunc(info)
})
}

View File

@ -0,0 +1,115 @@
package org.broadinstitute.sting.queue.util
import org.apache.commons.mail.{MultiPartEmail, EmailAttachment}
import java.io.{FileReader, File}
import javax.mail.internet.InternetAddress
import scala.collection.JavaConversions
/**
* Encapsulates a message to be sent over email.
*/
class EmailMessage extends Logging {
var from: String = _
var to: List[String] = Nil
var cc: List[String] = Nil
var bcc: List[String] = Nil
var subject: String = _
var body: String = _
var attachments: List[File] = Nil
/**
* Sends the email and throws an exception if the email can't be sent.
* @param settings The server settings for the email.
*/
def send(settings: EmailSettings) = {
val email = new MultiPartEmail
email.setHostName(settings.host)
email.setSmtpPort(settings.port)
email.setTLS(settings.tls)
if (settings.ssl) {
email.setSSL(true)
email.setSslSmtpPort(settings.port.toString)
}
if (settings.username != null && settings.password != null && settings.passwordFile != null) {
val password = {
if (settings.passwordFile != null) {
val reader = new FileReader(settings.passwordFile)
try {
org.apache.commons.io.IOUtils.toString(reader)
} finally {
org.apache.commons.io.IOUtils.closeQuietly(reader)
}
} else {
settings.password
}
}
email.setAuthentication(settings.username, password)
}
email.setFrom(this.from)
if (this.subject != null)
email.setSubject(this.subject)
if (this.body != null)
email.setMsg(this.body)
if (this.to.size > 0)
email.setTo(convert(this.to))
if (this.cc.size > 0)
email.setCc(convert(this.cc))
if (this.bcc.size > 0)
email.setBcc(convert(this.bcc))
for (file <- this.attachments) {
val attachment = new EmailAttachment
attachment.setDisposition(EmailAttachment.ATTACHMENT)
attachment.setPath(file.getAbsolutePath)
attachment.setDescription(file.getAbsolutePath)
attachment.setName(file.getName)
email.attach(attachment)
}
email.send
}
/**
* Tries twice 30 seconds apart to send the email. Then logs the message if it can't be sent.
* @param settings The server settings for the email.
*/
def trySend(settings: EmailSettings) = {
try {
Retry.attempt(() => send(settings), .5)
} catch {
case e => logger.error("Error sending message: %n%s".format(this.toString), e)
}
}
/**
* Converts the email addresses to a collection of InternetAddress which can bypass client side validation,
* specifically that the domain is specified.
* @param addresses List of email addresses.
* @return java.util.List of InternetAddress'es
*/
private def convert(addresses: List[String]) = {
JavaConversions.asList(addresses.map(address => new InternetAddress(address, false)))
}
override def toString = {
"""|
|From: %s
|To: %s
|Cc: %s
|Bcc: %s
|Subject: %s
|
|%s
|
|Attachments:
|%s
|""".stripMargin.trim.format(
this.from, this.to.mkString(", "),
this.cc.mkString(", "), this.bcc.mkString(", "),
this.subject, this.body,
this.attachments.map(_.getAbsolutePath).mkString("%n".format()))
}
}

View File

@ -0,0 +1,30 @@
package org.broadinstitute.sting.queue.util
import java.io.File
import org.broadinstitute.sting.commandline.Argument
/**
* Email server settings.
*/
class EmailSettings {
@Argument(doc="Email SMTP host. Defaults to localhost.", shortName="emailHost", fullName="emailSmtpHost", required=false)
var host = "localhost"
@Argument(doc="Email SMTP port. Defaults to 465 for ssl, otherwise 25.", shortName="emailPort", fullName="emailSmtpPort", required=false)
var port = 25
@Argument(doc="Email should use TLS. Defaults to false.", shortName="emailTLS", fullName="emailUseTLS", required=false)
var tls = false
@Argument(doc="Email should use SSL. Defaults to false.", shortName="emailSSL", fullName="emailUseSSL", required=false)
var ssl = false
@Argument(doc="Email SMTP username. Defaults to none.", shortName="emailUser", fullName="emailUsername", required=false)
var username: String = _
@Argument(doc="Email SMTP password. Defaults to none. Not secure! See emailPassFile.", shortName="emailPass", fullName="emailPassword", required=false)
var password: String = _
@Argument(doc="Email SMTP password file. Defaults to none.", shortName="emailPassFile", fullName="emailPasswordFile", required=false)
var passwordFile: File = _
}

View File

@ -0,0 +1,13 @@
package org.broadinstitute.sting.queue.util
import java.lang.management.ManagementFactory
import java.net.InetAddress
/**
* A collection of various system utilites.
*/
object SystemUtils {
val pidAtHost = ManagementFactory.getRuntimeMXBean.getName
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val domainName = hostName.split('.').takeRight(2).mkString(".")
}

View File

@ -9,6 +9,7 @@
<ibiblio name="libraries" m2compatible="true" />
<ibiblio name="libraries_with_inconsistent_poms" checkconsistency="false" m2compatible="true" />
<ibiblio name="reflections-repo" m2compatible="true" root="http://reflections.googlecode.com/svn/repo" />
<ibiblio name="java.net" m2compatible="false" root="http://download.java.net/maven/1/" pattern="[organisation]/jars/[artifact]-[revision].[ext]"/>
</resolvers>
<modules>
<module organisation="edu.mit.broad" resolver="projects" />
@ -19,5 +20,7 @@
<module organisation="gov.nist" module="Jama" resolver="projects" />
<!-- If colt fixes the version in the pom for 1.2.0 then this line can be removed. -->
<module organisation="colt" module="colt" resolver="libraries_with_inconsistent_poms" />
<module organisation="javax.mail" resolver="java.net" />
<module organisation="javax.activation" resolver="java.net" />
</modules>
</ivysettings>