From f1f9bd6dcc5b2cb07c191efa9e74732919556452 Mon Sep 17 00:00:00 2001 From: kshakir Date: Wed, 23 Feb 2011 18:59:08 +0000 Subject: [PATCH] Due to recent LSF hiccups put a very brief (.5-2min) retry around getting status. Can't wait too long because statuses are archived an hour after exit. TODO: Switch to bulk status checks and add status archive lookups. Sending SIGTERM(15) instead of SIGKILL(9) to allow for graceful termination of child process. Printing out the name of the QScripts in the compile error text. Added a pipelineretry -PR pass through for the MFCP and MFCPTest. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5295 348d0f76-0448-11de-a6fe-93d51630548a --- .../playground/MultiFullCallingPipeline.scala | 4 +++ .../sting/queue/QScriptManager.scala | 4 +-- .../sting/queue/engine/Lsf706JobRunner.scala | 34 ++++++++----------- .../MultiFullCallingPipelineTest.scala | 2 +- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/scala/qscript/playground/MultiFullCallingPipeline.scala b/scala/qscript/playground/MultiFullCallingPipeline.scala index 2650f5b5c..0f61747eb 100644 --- a/scala/qscript/playground/MultiFullCallingPipeline.scala +++ b/scala/qscript/playground/MultiFullCallingPipeline.scala @@ -28,6 +28,9 @@ class MultiFullCallingPipeline extends QScript { @Argument(doc="pipeline priority", shortName="PP", required = false) var pipelinePriority: Option[Int] = None + @Argument(doc="pipeline retry", shortName="PR", required = false) + var pipelineRetry: Option[Int] = None + @Argument(doc="run with -tearScript", shortName="TS") var runWithTearScript = false @@ -85,6 +88,7 @@ class MultiFullCallingPipeline extends QScript { optional(" -jobQueue ", qscript.pipelineJobQueue) + optional(" -shortJobQueue ", qscript.pipelineShortQueue) + optional(" -jobPriority ", qscript.pipelinePriority) + + optional(" -retry ", qscript.pipelineRetry) + optional(" -tearScript ", tearScript) + " -S %s --gatkjar %s -jobProject %s -jobPrefix %s -Y %s -bsub -run" .format(pipelineScript, gatkJar, yamlName, yamlName, yamlFile) diff --git a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala index 6af376810..a70276bef 100644 --- a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala @@ -40,8 +40,8 @@ class QScriptManager() extends Logging { reporter.printSummary() if (reporter.hasErrors) { - val msg = "Compile failed with %d error%s".format( - reporter.ERROR.count, plural(reporter.ERROR.count)) + val msg = "Compile of %s failed with %d error%s".format( + scripts.mkString(", "), reporter.ERROR.count, plural(reporter.ERROR.count)) throw new QException(msg) } else if (reporter.WARNING.count > 0) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 70fdf2d48..1107c1b25 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -8,9 +8,9 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import java.util.Date -import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} +import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference -import com.sun.jna.{StringArray, NativeLong, Memory} +import com.sun.jna.{StringArray, NativeLong} /** * Runs jobs on an LSF compute cluster. @@ -107,9 +107,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR var exitInfo = 0 var endTime: NativeLong = null - val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) - if (result < 0) - throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) + var result = 0 + Retry.attempt(() => { + result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) + if (result < 0) + throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) + }, 0.5, 1, 2) try { if (result > 1) throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId)) @@ -179,6 +182,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR object Lsf706JobRunner extends Logging { private val lsfLibLock = new Object + private val SIGTERM = 15 init() @@ -244,23 +248,15 @@ object Lsf706JobRunner extends Logging { */ def tryStop(runners: List[Lsf706JobRunner]) { lsfLibLock.synchronized { - for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { + // lsb_killbulkjobs does not seem to forward SIGTERM, + // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. + for (jobRunner <- runners.filterNot(_.jobId < 0)) { try { - val njobs = jobRunners.size - val signalJobs = new signalBulkJobs - signalJobs.jobs = { - val jobIds = new Memory(8 * njobs) - jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) - jobIds - } - signalJobs.njobs = njobs - signalJobs.signal = 9 - - if (LibBat.lsb_killbulkjobs(signalJobs) < 0) - throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) + if (LibBat.lsb_signaljob(jobRunner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + jobRunner.jobId)) } catch { case e => - logger.error("Unable to kill all jobs.", e) + logger.error("Unable to kill job " + jobRunner.jobId, e) } } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala index 987f393f8..2d81a6955 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala @@ -67,7 +67,7 @@ class MultiFullCallingPipelineTest { // Run the pipeline with the expected inputs. val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" + - " -jobProject %s -YL %s -PJQ %s -stingHome %s") + " -jobProject %s -YL %s -PJQ %s -PR 2 -stingHome %s") .format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir) val pipelineSpec = new PipelineTestSpec