Emergency fix for Ryan:

- Catching errors when LSF fails and retrying.
- When LSF retries fail, catching the error, marking the job as failed, and no longer bkilling everything by exiting Queue.
- Caching function fields by class instead of each instance of a function saving a list of its fields.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4490 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-13 22:22:01 +00:00
parent 83b8676b69
commit 5ee12875fb
7 changed files with 147 additions and 16 deletions

View File

@ -10,6 +10,7 @@ import java.lang.String
import org.apache.log4j.Level
import scala.tools.nsc.util.{FakePos, NoPosition, Position}
import org.broadinstitute.sting.utils.classloader.{PackageUtils, PluginManager}
import org.broadinstitute.sting.queue.util.TextFormatUtils._
/**
* Plugin manager for QScripts which loads QScripts into the current class loader.
@ -81,13 +82,6 @@ object QScriptManager extends Logging {
}
}
/**
* Returns the string "s" if x is greater than 1.
* @param x Value to test.
* @return "s" if x is greater than one else "".
*/
private def plural(x: Int) = if (x > 1) "s" else ""
/**
* NSC (New Scala Compiler) reporter which logs to Log4J.
* Heavily based on scala/src/compiler/scala/tools/nsc/reporters/ConsoleReporter.scala

View File

@ -1,8 +1,8 @@
package org.broadinstitute.sting.queue.engine
import java.io.File
import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging}
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
/**
* Runs jobs on an LSF compute cluster.
@ -45,6 +45,8 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with
if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory))
job.workingDir = function.commandDirectory
job.extraBsubArgs ++= function.extraArgs
if (function.jobRestartable)
job.extraBsubArgs :+= "-r"
@ -74,9 +76,19 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with
function.jobErrorFile.delete()
runStatus = RunnerStatus.RUNNING
job.run()
jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString
logger.info("Submitted LSF job id: " + job.bsubJobId)
try {
Retry.attempt(() => job.run(), 1, 5, 10)
jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString
logger.info("Submitted LSF job id: " + job.bsubJobId)
} catch {
case re: RetryException =>
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
case e =>
logger.error("Error trying to start job.", e)
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
}
}
/**

View File

@ -33,6 +33,9 @@ trait CommandLineFunction extends QFunction with Logging {
/** Job queue to run the command */
var jobQueue: String = _
/** Extra arguments to specify on the command line */
var extraArgs: List[String] = Nil
/** Temporary directory to write any files */
var jobTempDir: File = IOUtils.javaTempDir

View File

@ -85,14 +85,14 @@ trait QFunction {
def failOutputs = statusPaths.map(path => new File(path + ".fail"))
/** The complete list of fields on this CommandLineFunction. */
lazy val functionFields: List[ArgumentSource] = initFunctionFields
def functionFields = QFunction.classFields(this.getClass).functionFields
/** The @Input fields on this CommandLineFunction. */
lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input]))
def inputFields = QFunction.classFields(this.getClass).inputFields
/** The @Output fields on this CommandLineFunction. */
lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output]))
def outputFields = QFunction.classFields(this.getClass).outputFields
/** The @Argument fields on this CommandLineFunction. */
lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument]))
def argumentFields = QFunction.classFields(this.getClass).argumentFields
/**
* Called at most once, returns the list of fields for this function.
*/
@ -344,3 +344,40 @@ trait QFunction {
*/
private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _))
}
object QFunction {
/**
* The list of fields defined on a class
* @param clazz The class to lookup fields.
*/
private class ClassFields(clazz: Class[_]) {
/** The complete list of fields on this CommandLineFunction. */
val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(clazz).toList
/** The @Input fields on this CommandLineFunction. */
val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input]))
/** The @Output fields on this CommandLineFunction. */
val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output]))
/** The @Argument fields on this CommandLineFunction. */
val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument]))
}
/**
* The mapping from class to fields.
*/
private var classFieldsMap = Map.empty[Class[_], ClassFields]
/**
* Returns the fields for a class.
* @param clazz Class to retrieve fields for.
* @return the fields for the class.
*/
private def classFields(clazz: Class[_]) = {
classFieldsMap.get(clazz) match {
case Some(classFields) => classFields
case None =>
val classFields = new ClassFields(clazz)
classFieldsMap += clazz -> classFields
classFields
}
}
}

View File

@ -0,0 +1,45 @@
package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.queue.util.TextFormatUtils._
/**
* Utilities for retrying a function and waiting a number of minutes.
*/
object Retry extends Logging {
/**
* Attempt the function and return the value on success.
* Each time the function fails the stack trace is logged.
* @param f Function to run and return its value.
* @param wait The length in minutes to wait before each attempt.
* @throws RetryException when all retries are exhausted.
* @return The successful result of f.
*/
def attempt[A](f: () => A, wait: Double*): A = {
var count = 0
var success = false
val tries = wait.size + 1
var result = null.asInstanceOf[A]
while (!success && count < tries) {
try {
result = f()
success = true
} catch {
case e => {
count += 1
if (count < tries) {
val minutes = wait(count-1)
logger.error("Caught error during attempt %s of %s."
.format(count, tries), e)
logger.error("Retrying in %.1f minute%s.".format(minutes, plural(minutes)))
Thread.sleep((minutes * 1000 * 60).toLong)
} else {
logger.error("Caught error during attempt %s of %s. Giving up."
.format(count, tries), e)
throw new RetryException("Gave up after %s attempts.".format(tries), e)
}
}
}
}
result
}
}

View File

@ -0,0 +1,9 @@
package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.queue.QException
/**
* Thrown after giving up on retrying.
*/
class RetryException(private val message: String, private val throwable: Throwable)
extends QException(message, throwable) {}

View File

@ -0,0 +1,31 @@
package org.broadinstitute.sting.queue.util
object TextFormatUtils {
/**
* Returns the string "s" if x is greater than 1.
* @param x Value to test.
* @return "s" if x is greater than one else "".
*/
def plural(x: Int) = if (x > 1) "s" else ""
/**
* Returns the string "s" if x is greater than 1.
* @param x Value to test.
* @return "s" if x is greater than one else "".
*/
def plural(x: Long) = if (x > 1) "s" else ""
/**
* Returns the string "s" if x is not equal to 1.
* @param x Value to test.
* @return "s" if x is greater than one else "".
*/
def plural(x: Float) = if (x != 1) "s" else ""
/**
* Returns the string "s" if x is not equal to 1.
* @param x Value to test.
* @return "s" if x is greater than one else "".
*/
def plural(x: Double) = if (x != 1) "s" else ""
}