added PbsEngine plugin into engine folders, to be called in Queue with -jobRunner PbsEngine; the plugin is written modifying the existing GridEngine plugin, used as a template

Signed-off-by: Khalid Shakir <kshakir@broadinstitute.org>
This commit is contained in:
Francesco 2013-06-26 10:29:45 +02:00 committed by Khalid Shakir
parent 4ec50caea2
commit 948b2fca20
2 changed files with 115 additions and 0 deletions

View File

@ -0,0 +1,24 @@
/*
* PBS Engine Job Manager
* this plugin has been developed modifying the Grid Engine plugin,
* used as a template: thanks to the author for providing the base
* of this work
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.engine.pbsengine
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobManager
class PbsEngineJobManager extends DrmaaJobManager {
override def create(function: CommandLineFunction) = new PbsEngineJobRunner(session, function)
}

View File

@ -0,0 +1,91 @@
/*
* PBS Engine Job Runner
* this plugin has been developed modifying the Grid Engine plugin,
* used as a template: thanks to the author for providing the base
* of this work
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.engine.pbsengine
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobRunner
import org.ggf.drmaa.Session
/**
* Runs jobs on a PBS/Torque Engine compute cluster.
* NB - THIS FILE HAS BEEN MODIFIED from the original code
* of the GridEngine package
*/
class PbsEngineJobRunner(session: Session, function: CommandLineFunction) extends DrmaaJobRunner(session, function) with Logging {
// Pbs Engine disallows certain characters from being in job names.
// This replaces all illegal characters with underscores
protected override val jobNameFilter = """[\n\t\r/:,@\\*?]"""
protected override val minRunnerPriority = -1023
protected override val maxRunnerPriority = 0
override protected def functionNativeSpec = {
// create nativeSpec variable
var nativeSpec: String = ""
// If a project name is set specify the project name
if (function.jobProject != null)
nativeSpec += " -P " + function.jobProject
// If the job queue is set specify the job queue
if (function.jobQueue != null)
nativeSpec += " -q " + function.jobQueue
else
nativeSpec += " -q normal "
// If the resident set size is requested pass on the memory request
// mem_free is the standard, but may also be virtual_free or even not available
if (function.qSettings.residentRequestParameter != null && function.residentRequest.isDefined)
nativeSpec += " -l %s=%dM".format(function.qSettings.residentRequestParameter, function.residentRequest.map(_ * 1024).get.ceil.toInt)
// If the resident set size limit is defined specify the memory limit
if (function.residentLimit.isDefined)
nativeSpec += " -l mem=%dM".format(function.residentLimit.map(_ * 1024).get.ceil.toInt)
// If more than 1 core is requested, set the proper request
// the cores will be requested as part of a single node
if ( function.nCoresRequest.getOrElse(1) > 1 ) {
if ( function.qSettings.dontRequestMultipleCores )
logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format(
function.shortDescription, function.nCoresRequest.get))
else
nativeSpec += " -l nodes=1:ppn=%d".format(function.qSettings.parallelEnvironmentName, function.nCoresRequest.get)
}
// Pass on any job resource requests
// NB: blank because resource requests in PBS can be preceded by different
// arguments, i.e. -l but also -o or -j if they are not exactly "resources" strictly speaking
// therefore the user will add them in the request, i.e. -jobResReq "-j oe"
// but this will allow more flexibility in setting the options for PBS jobs on different Clusters
nativeSpec += function.jobResourceRequests.map(" " + _).mkString
// Pass on any job environment names
nativeSpec += function.jobEnvironmentNames.map(" " + _).mkString
// If the priority is set specify the priority
val priority = functionPriority
if (priority.isDefined)
nativeSpec += " -p " + priority.get
logger.debug("Native spec is: %s".format(nativeSpec))
(nativeSpec + " " + super.functionNativeSpec).trim()
}
}