diff --git a/scala/qscript/playground/MultiFullCallingPipeline.scala b/scala/qscript/playground/MultiFullCallingPipeline.scala index 2650f5b5c..0f61747eb 100644 --- a/scala/qscript/playground/MultiFullCallingPipeline.scala +++ b/scala/qscript/playground/MultiFullCallingPipeline.scala @@ -28,6 +28,9 @@ class MultiFullCallingPipeline extends QScript { @Argument(doc="pipeline priority", shortName="PP", required = false) var pipelinePriority: Option[Int] = None + @Argument(doc="pipeline retry", shortName="PR", required = false) + var pipelineRetry: Option[Int] = None + @Argument(doc="run with -tearScript", shortName="TS") var runWithTearScript = false @@ -85,6 +88,7 @@ class MultiFullCallingPipeline extends QScript { optional(" -jobQueue ", qscript.pipelineJobQueue) + optional(" -shortJobQueue ", qscript.pipelineShortQueue) + optional(" -jobPriority ", qscript.pipelinePriority) + + optional(" -retry ", qscript.pipelineRetry) + optional(" -tearScript ", tearScript) + " -S %s --gatkjar %s -jobProject %s -jobPrefix %s -Y %s -bsub -run" .format(pipelineScript, gatkJar, yamlName, yamlName, yamlFile) diff --git a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala index 6af376810..a70276bef 100644 --- a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala @@ -40,8 +40,8 @@ class QScriptManager() extends Logging { reporter.printSummary() if (reporter.hasErrors) { - val msg = "Compile failed with %d error%s".format( - reporter.ERROR.count, plural(reporter.ERROR.count)) + val msg = "Compile of %s failed with %d error%s".format( + scripts.mkString(", "), reporter.ERROR.count, plural(reporter.ERROR.count)) throw new QException(msg) } else if (reporter.WARNING.count > 0) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 70fdf2d48..1107c1b25 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -8,9 +8,9 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import java.util.Date -import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} +import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference -import com.sun.jna.{StringArray, NativeLong, Memory} +import com.sun.jna.{StringArray, NativeLong} /** * Runs jobs on an LSF compute cluster. @@ -107,9 +107,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR var exitInfo = 0 var endTime: NativeLong = null - val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) - if (result < 0) - throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) + var result = 0 + Retry.attempt(() => { + result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) + if (result < 0) + throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) + }, 0.5, 1, 2) try { if (result > 1) throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId)) @@ -179,6 +182,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR object Lsf706JobRunner extends Logging { private val lsfLibLock = new Object + private val SIGTERM = 15 init() @@ -244,23 +248,15 @@ object Lsf706JobRunner extends Logging { */ def tryStop(runners: List[Lsf706JobRunner]) { lsfLibLock.synchronized { - for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { + // lsb_killbulkjobs does not seem to forward SIGTERM, + // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. + for (jobRunner <- runners.filterNot(_.jobId < 0)) { try { - val njobs = jobRunners.size - val signalJobs = new signalBulkJobs - signalJobs.jobs = { - val jobIds = new Memory(8 * njobs) - jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) - jobIds - } - signalJobs.njobs = njobs - signalJobs.signal = 9 - - if (LibBat.lsb_killbulkjobs(signalJobs) < 0) - throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) + if (LibBat.lsb_signaljob(jobRunner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + jobRunner.jobId)) } catch { case e => - logger.error("Unable to kill all jobs.", e) + logger.error("Unable to kill job " + jobRunner.jobId, e) } } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala index 987f393f8..2d81a6955 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala @@ -67,7 +67,7 @@ class MultiFullCallingPipelineTest { // Run the pipeline with the expected inputs. val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" + - " -jobProject %s -YL %s -PJQ %s -stingHome %s") + " -jobProject %s -YL %s -PJQ %s -PR 2 -stingHome %s") .format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir) val pipelineSpec = new PipelineTestSpec