From 948b2fca20b25febe2aafcbd98db394392bea72b Mon Sep 17 00:00:00 2001 From: Francesco Date: Wed, 26 Jun 2013 10:29:45 +0200 Subject: [PATCH 1/4] added PbsEngine plugin into engine folders, to be called in Queue with -jobRunner PbsEngine; the plugin is written modifying the existing GridEngine plugin, used as a template Signed-off-by: Khalid Shakir --- .../pbsengine/PbsEngineJobManager.scala | 24 +++++ .../engine/pbsengine/PbsEngineJobRunner.scala | 91 +++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobManager.scala create mode 100644 public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobManager.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobManager.scala new file mode 100644 index 000000000..4a1dda78f --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobManager.scala @@ -0,0 +1,24 @@ +/* +* PBS Engine Job Manager +* this plugin has been developed modifying the Grid Engine plugin, +* used as a template: thanks to the author for providing the base +* of this work +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.queue.engine.pbsengine + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobManager + +class PbsEngineJobManager extends DrmaaJobManager { + override def create(function: CommandLineFunction) = new PbsEngineJobRunner(session, function) +} diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala new file mode 100644 index 000000000..fb00fe21f --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala @@ -0,0 +1,91 @@ +/* +* PBS Engine Job Runner +* this plugin has been developed modifying the Grid Engine plugin, +* used as a template: thanks to the author for providing the base +* of this work +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.queue.engine.pbsengine + +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobRunner +import org.ggf.drmaa.Session + +/** + * Runs jobs on a PBS/Torque Engine compute cluster. + * NB - THIS FILE HAS BEEN MODIFIED from the original code + * of the GridEngine package + */ +class PbsEngineJobRunner(session: Session, function: CommandLineFunction) extends DrmaaJobRunner(session, function) with Logging { + // Pbs Engine disallows certain characters from being in job names. + // This replaces all illegal characters with underscores + protected override val jobNameFilter = """[\n\t\r/:,@\\*?]""" + protected override val minRunnerPriority = -1023 + protected override val maxRunnerPriority = 0 + + override protected def functionNativeSpec = { + + // create nativeSpec variable + var nativeSpec: String = "" + + // If a project name is set specify the project name + if (function.jobProject != null) + nativeSpec += " -P " + function.jobProject + + // If the job queue is set specify the job queue + if (function.jobQueue != null) + nativeSpec += " -q " + function.jobQueue + else + nativeSpec += " -q normal " + + // If the resident set size is requested pass on the memory request + // mem_free is the standard, but may also be virtual_free or even not available + + if (function.qSettings.residentRequestParameter != null && function.residentRequest.isDefined) + nativeSpec += " -l %s=%dM".format(function.qSettings.residentRequestParameter, function.residentRequest.map(_ * 1024).get.ceil.toInt) + + // If the resident set size limit is defined specify the memory limit + if (function.residentLimit.isDefined) + nativeSpec += " -l mem=%dM".format(function.residentLimit.map(_ * 1024).get.ceil.toInt) + + // If more than 1 core is requested, set the proper request + // the cores will be requested as part of a single node + + if ( function.nCoresRequest.getOrElse(1) > 1 ) { + if ( function.qSettings.dontRequestMultipleCores ) + logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format( + function.shortDescription, function.nCoresRequest.get)) + else + nativeSpec += " -l nodes=1:ppn=%d".format(function.qSettings.parallelEnvironmentName, function.nCoresRequest.get) + } + + // Pass on any job resource requests + // NB: blank because resource requests in PBS can be preceded by different + // arguments, i.e. -l but also -o or -j if they are not exactly "resources" strictly speaking + // therefore the user will add them in the request, i.e. -jobResReq "-j oe" + // but this will allow more flexibility in setting the options for PBS jobs on different Clusters + + nativeSpec += function.jobResourceRequests.map(" " + _).mkString + + // Pass on any job environment names + nativeSpec += function.jobEnvironmentNames.map(" " + _).mkString + + // If the priority is set specify the priority + val priority = functionPriority + if (priority.isDefined) + nativeSpec += " -p " + priority.get + + logger.debug("Native spec is: %s".format(nativeSpec)) + (nativeSpec + " " + super.functionNativeSpec).trim() + } +} From acf90ca0276bd953d0f6ae1c5e136b8ff3c3439f Mon Sep 17 00:00:00 2001 From: Francesco Date: Thu, 27 Jun 2013 11:08:59 +0200 Subject: [PATCH 2/4] corrected number of arguments passed to PbsEngineJobRunner when requesting multiple cores Signed-off-by: Khalid Shakir --- .../sting/queue/engine/pbsengine/PbsEngineJobRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala index fb00fe21f..366e4cc29 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/pbsengine/PbsEngineJobRunner.scala @@ -66,7 +66,7 @@ class PbsEngineJobRunner(session: Session, function: CommandLineFunction) extend logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format( function.shortDescription, function.nCoresRequest.get)) else - nativeSpec += " -l nodes=1:ppn=%d".format(function.qSettings.parallelEnvironmentName, function.nCoresRequest.get) + nativeSpec += " -l nodes=1:ppn=%d".format(function.nCoresRequest.get) } // Pass on any job resource requests From ec206eccfca28d7dbbabdf351794b21653c355e9 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Thu, 27 Jun 2013 15:07:59 -0400 Subject: [PATCH 3/4] Switch "all" test pipeline job runners to mean the job runners that run at The Broad. --- .../sting/queue/pipeline/PipelineTest.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/public/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala b/public/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index e990b5233..e9a288117 100644 --- a/public/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala +++ b/public/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -45,11 +45,14 @@ object PipelineTest extends BaseTest with Logging { private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" private val md5DB = new MD5DB() - final val allJobRunners = { - val commandLinePluginManager = new CommandLinePluginManager - commandLinePluginManager.getPlugins.map(commandLinePluginManager.getName(_)).toSeq - } + /** + * All the job runners configured to run PipelineTests at The Broad. + */ + final val allJobRunners = Seq("Lsf706", "GridEngine", "Shell") + /** + * The default job runners to run. + */ final val defaultJobRunners = Seq("Lsf706", "GridEngine") /**