Merge pull request #314 from broadinstitute/ks_francesco_pbs_patch

Ks francesco pbs patch
This commit is contained in:
droazen 2013-07-01 12:39:38 -07:00
commit 2964ebaa4e
3 changed files with 122 additions and 4 deletions

View File

@ -0,0 +1,24 @@
/*
* PBS Engine Job Manager
* this plugin has been developed modifying the Grid Engine plugin,
* used as a template: thanks to the author for providing the base
* of this work
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.engine.pbsengine
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobManager
class PbsEngineJobManager extends DrmaaJobManager {
override def create(function: CommandLineFunction) = new PbsEngineJobRunner(session, function)
}

View File

@ -0,0 +1,91 @@
/*
* PBS Engine Job Runner
* this plugin has been developed modifying the Grid Engine plugin,
* used as a template: thanks to the author for providing the base
* of this work
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.engine.pbsengine
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.drmaa.DrmaaJobRunner
import org.ggf.drmaa.Session
/**
* Runs jobs on a PBS/Torque Engine compute cluster.
* NB - THIS FILE HAS BEEN MODIFIED from the original code
* of the GridEngine package
*/
class PbsEngineJobRunner(session: Session, function: CommandLineFunction) extends DrmaaJobRunner(session, function) with Logging {
// Pbs Engine disallows certain characters from being in job names.
// This replaces all illegal characters with underscores
protected override val jobNameFilter = """[\n\t\r/:,@\\*?]"""
protected override val minRunnerPriority = -1023
protected override val maxRunnerPriority = 0
override protected def functionNativeSpec = {
// create nativeSpec variable
var nativeSpec: String = ""
// If a project name is set specify the project name
if (function.jobProject != null)
nativeSpec += " -P " + function.jobProject
// If the job queue is set specify the job queue
if (function.jobQueue != null)
nativeSpec += " -q " + function.jobQueue
else
nativeSpec += " -q normal "
// If the resident set size is requested pass on the memory request
// mem_free is the standard, but may also be virtual_free or even not available
if (function.qSettings.residentRequestParameter != null && function.residentRequest.isDefined)
nativeSpec += " -l %s=%dM".format(function.qSettings.residentRequestParameter, function.residentRequest.map(_ * 1024).get.ceil.toInt)
// If the resident set size limit is defined specify the memory limit
if (function.residentLimit.isDefined)
nativeSpec += " -l mem=%dM".format(function.residentLimit.map(_ * 1024).get.ceil.toInt)
// If more than 1 core is requested, set the proper request
// the cores will be requested as part of a single node
if ( function.nCoresRequest.getOrElse(1) > 1 ) {
if ( function.qSettings.dontRequestMultipleCores )
logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format(
function.shortDescription, function.nCoresRequest.get))
else
nativeSpec += " -l nodes=1:ppn=%d".format(function.nCoresRequest.get)
}
// Pass on any job resource requests
// NB: blank because resource requests in PBS can be preceded by different
// arguments, i.e. -l but also -o or -j if they are not exactly "resources" strictly speaking
// therefore the user will add them in the request, i.e. -jobResReq "-j oe"
// but this will allow more flexibility in setting the options for PBS jobs on different Clusters
nativeSpec += function.jobResourceRequests.map(" " + _).mkString
// Pass on any job environment names
nativeSpec += function.jobEnvironmentNames.map(" " + _).mkString
// If the priority is set specify the priority
val priority = functionPriority
if (priority.isDefined)
nativeSpec += " -p " + priority.get
logger.debug("Native spec is: %s".format(nativeSpec))
(nativeSpec + " " + super.functionNativeSpec).trim()
}
}

View File

@ -45,11 +45,14 @@ object PipelineTest extends BaseTest with Logging {
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
private val md5DB = new MD5DB()
final val allJobRunners = {
val commandLinePluginManager = new CommandLinePluginManager
commandLinePluginManager.getPlugins.map(commandLinePluginManager.getName(_)).toSeq
}
/**
* All the job runners configured to run PipelineTests at The Broad.
*/
final val allJobRunners = Seq("Lsf706", "GridEngine", "Shell")
/**
* The default job runners to run.
*/
final val defaultJobRunners = Seq("Lsf706", "GridEngine")
/**