From 5ee12875fb80eb694750272173f90f0169755d8b Mon Sep 17 00:00:00 2001 From: kshakir Date: Wed, 13 Oct 2010 22:22:01 +0000 Subject: [PATCH] Emergency fix for Ryan: - Catching errors when LSF fails and retrying. - When LSF retries fail, catching the error, marking the job as failed, and no longer bkilling everything by exiting Queue. - Caching function fields by class instead of each instance of a function saving a list of its fields. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4490 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/queue/QScriptManager.scala | 8 +--- .../sting/queue/engine/LsfJobRunner.scala | 20 ++++++-- .../queue/function/CommandLineFunction.scala | 3 ++ .../sting/queue/function/QFunction.scala | 47 +++++++++++++++++-- .../sting/queue/util/Retry.scala | 45 ++++++++++++++++++ .../sting/queue/util/RetryException.scala | 9 ++++ .../sting/queue/util/TextFormatUtils.scala | 31 ++++++++++++ 7 files changed, 147 insertions(+), 16 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/util/Retry.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/RetryException.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/TextFormatUtils.scala diff --git a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala index 6630a1af3..eb8ea710a 100644 --- a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala @@ -10,6 +10,7 @@ import java.lang.String import org.apache.log4j.Level import scala.tools.nsc.util.{FakePos, NoPosition, Position} import org.broadinstitute.sting.utils.classloader.{PackageUtils, PluginManager} +import org.broadinstitute.sting.queue.util.TextFormatUtils._ /** * Plugin manager for QScripts which loads QScripts into the current class loader. @@ -81,13 +82,6 @@ object QScriptManager extends Logging { } } - /** - * Returns the string "s" if x is greater than 1. - * @param x Value to test. - * @return "s" if x is greater than one else "". - */ - private def plural(x: Int) = if (x > 1) "s" else "" - /** * NSC (New Scala Compiler) reporter which logs to Log4J. * Heavily based on scala/src/compiler/scala/tools/nsc/reporters/ConsoleReporter.scala diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 3a486993b..7c2910e69 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -1,8 +1,8 @@ package org.broadinstitute.sting.queue.engine import java.io.File -import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging} import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util._ /** * Runs jobs on an LSF compute cluster. @@ -45,6 +45,8 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory)) job.workingDir = function.commandDirectory + job.extraBsubArgs ++= function.extraArgs + if (function.jobRestartable) job.extraBsubArgs :+= "-r" @@ -74,9 +76,19 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with function.jobErrorFile.delete() runStatus = RunnerStatus.RUNNING - job.run() - jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString - logger.info("Submitted LSF job id: " + job.bsubJobId) + try { + Retry.attempt(() => job.run(), 1, 5, 10) + jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString + logger.info("Submitted LSF job id: " + job.bsubJobId) + } catch { + case re: RetryException => + removeTemporaryFiles() + runStatus = RunnerStatus.FAILED + case e => + logger.error("Error trying to start job.", e) + removeTemporaryFiles() + runStatus = RunnerStatus.FAILED + } } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 2b9e5528f..987e8bbba 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -33,6 +33,9 @@ trait CommandLineFunction extends QFunction with Logging { /** Job queue to run the command */ var jobQueue: String = _ + /** Extra arguments to specify on the command line */ + var extraArgs: List[String] = Nil + /** Temporary directory to write any files */ var jobTempDir: File = IOUtils.javaTempDir diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 40e978c8f..41b9f9769 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -85,14 +85,14 @@ trait QFunction { def failOutputs = statusPaths.map(path => new File(path + ".fail")) /** The complete list of fields on this CommandLineFunction. */ - lazy val functionFields: List[ArgumentSource] = initFunctionFields + def functionFields = QFunction.classFields(this.getClass).functionFields /** The @Input fields on this CommandLineFunction. */ - lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) + def inputFields = QFunction.classFields(this.getClass).inputFields /** The @Output fields on this CommandLineFunction. */ - lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) + def outputFields = QFunction.classFields(this.getClass).outputFields /** The @Argument fields on this CommandLineFunction. */ - lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) - + def argumentFields = QFunction.classFields(this.getClass).argumentFields + /** * Called at most once, returns the list of fields for this function. */ @@ -344,3 +344,40 @@ trait QFunction { */ private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _)) } + +object QFunction { + /** + * The list of fields defined on a class + * @param clazz The class to lookup fields. + */ + private class ClassFields(clazz: Class[_]) { + /** The complete list of fields on this CommandLineFunction. */ + val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(clazz).toList + /** The @Input fields on this CommandLineFunction. */ + val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) + /** The @Output fields on this CommandLineFunction. */ + val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) + /** The @Argument fields on this CommandLineFunction. */ + val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) + } + + /** + * The mapping from class to fields. + */ + private var classFieldsMap = Map.empty[Class[_], ClassFields] + + /** + * Returns the fields for a class. + * @param clazz Class to retrieve fields for. + * @return the fields for the class. + */ + private def classFields(clazz: Class[_]) = { + classFieldsMap.get(clazz) match { + case Some(classFields) => classFields + case None => + val classFields = new ClassFields(clazz) + classFieldsMap += clazz -> classFields + classFields + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/Retry.scala b/scala/src/org/broadinstitute/sting/queue/util/Retry.scala new file mode 100644 index 000000000..06b6573fd --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/Retry.scala @@ -0,0 +1,45 @@ +package org.broadinstitute.sting.queue.util + +import org.broadinstitute.sting.queue.util.TextFormatUtils._ + +/** + * Utilities for retrying a function and waiting a number of minutes. + */ +object Retry extends Logging { + /** + * Attempt the function and return the value on success. + * Each time the function fails the stack trace is logged. + * @param f Function to run and return its value. + * @param wait The length in minutes to wait before each attempt. + * @throws RetryException when all retries are exhausted. + * @return The successful result of f. + */ + def attempt[A](f: () => A, wait: Double*): A = { + var count = 0 + var success = false + val tries = wait.size + 1 + var result = null.asInstanceOf[A] + while (!success && count < tries) { + try { + result = f() + success = true + } catch { + case e => { + count += 1 + if (count < tries) { + val minutes = wait(count-1) + logger.error("Caught error during attempt %s of %s." + .format(count, tries), e) + logger.error("Retrying in %.1f minute%s.".format(minutes, plural(minutes))) + Thread.sleep((minutes * 1000 * 60).toLong) + } else { + logger.error("Caught error during attempt %s of %s. Giving up." + .format(count, tries), e) + throw new RetryException("Gave up after %s attempts.".format(tries), e) + } + } + } + } + result + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/RetryException.scala b/scala/src/org/broadinstitute/sting/queue/util/RetryException.scala new file mode 100644 index 000000000..e4ae5e712 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/RetryException.scala @@ -0,0 +1,9 @@ +package org.broadinstitute.sting.queue.util + +import org.broadinstitute.sting.queue.QException + +/** + * Thrown after giving up on retrying. + */ +class RetryException(private val message: String, private val throwable: Throwable) + extends QException(message, throwable) {} diff --git a/scala/src/org/broadinstitute/sting/queue/util/TextFormatUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/TextFormatUtils.scala new file mode 100644 index 000000000..6980e29bd --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/TextFormatUtils.scala @@ -0,0 +1,31 @@ +package org.broadinstitute.sting.queue.util + +object TextFormatUtils { + /** + * Returns the string "s" if x is greater than 1. + * @param x Value to test. + * @return "s" if x is greater than one else "". + */ + def plural(x: Int) = if (x > 1) "s" else "" + + /** + * Returns the string "s" if x is greater than 1. + * @param x Value to test. + * @return "s" if x is greater than one else "". + */ + def plural(x: Long) = if (x > 1) "s" else "" + + /** + * Returns the string "s" if x is not equal to 1. + * @param x Value to test. + * @return "s" if x is greater than one else "". + */ + def plural(x: Float) = if (x != 1) "s" else "" + + /** + * Returns the string "s" if x is not equal to 1. + * @param x Value to test. + * @return "s" if x is greater than one else "". + */ + def plural(x: Double) = if (x != 1) "s" else "" +}