Grid Engine backend to GATK-Queue, initial commit of implementation

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5788 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
dheiman 2011-05-11 13:21:45 +00:00
parent dfdef2d29b
commit 16db86e6cb
4 changed files with 179 additions and 53 deletions

View File

@ -36,7 +36,7 @@ class QSettings {
var jobNamePrefix: String = QSettings.processNamePrefix
@Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false)
var jobProject: String = "Queue"
var jobProject: String = _
@Argument(fullName="job_queue", shortName="jobQueue", doc="Default queue for compute farm jobs.", required=false)
var jobQueue: String = _

View File

@ -353,6 +353,8 @@ class QGraph extends Logging {
try {
if (settings.bsub)
settings.jobRunner = "Lsf706"
else if (settings.qsub)
settings.jobRunner = "GridEngine"
else if (settings.jobRunner == null)
settings.jobRunner = "Shell"
commandLineManager = commandLinePluginManager.createByName(settings.jobRunner)

View File

@ -42,6 +42,9 @@ class QGraphSettings {
@Argument(fullName="bsub", shortName="bsub", doc="Equivalent to -jobRunner Lsf706", required=false)
var bsub = false
@Argument(fullName="qsub", shortName="qsub", doc="Equivalent to -jobRunner GridEngine", required=false)
var qsub = false
@Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false)
var getStatus = false

View File

@ -24,79 +24,129 @@
package org.broadinstitute.sting.queue.engine.gridengine
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.util.{Logging,Retry}
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
import org.ggf.drmaa.{DrmaaException,JobInfo,JobTemplate,Session,SessionFactory}
import java.util.Collections
/**
* Runs jobs on a Grid Engine compute cluster.
*/
class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
// Run the static initializer for GridEngineJobRunner
GridEngineJobRunner
def start() = {
// TODO: Copy settings from function to GridEngine syntax.
/*
val gridEngineJob = new ...
/** Job Id of the currently executing job. */
private var jobId: String = _
// Set the display name to 4000 characters of the description (or whatever the GE max is)
gridEngineJob.displayName = function.description.take(4000)
/** Last known status */
private var lastStatus: RunnerStatus.Value = _
// Set the output file for stdout
gridEngineJob.outputFile = function.jobOutputFile.getPath
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
// Set the current working directory
gridEngineJob.workingDirectory = function.commandDirectory.getPath
def start() {
GridEngineJobRunner.gridEngineSession.synchronized {
val gridEngineJob: JobTemplate = GridEngineJobRunner.gridEngineSession.createJobTemplate
// If the error file is set specify the separate output for stderr
if (function.jobErrorFile != null) {
gridEngineJob.errFile = function.jobErrorFile.getPath
// Force the remote environment to inherit local environment settings
var nativeSpecString: String = "-V"
// Set the display name to 1024 characters of the description
gridEngineJob.setJobName(function.description.take(1024))
// Set the output file for stdout
gridEngineJob.setOutputPath(":" + function.jobOutputFile.getPath)
// Set the current working directory
gridEngineJob.setWorkingDirectory(function.commandDirectory.getPath)
// If the error file is set specify the separate output for stderr
// Otherwise join with stdout
if (Option(function.jobErrorFile) != None) {
gridEngineJob.setErrorPath(":" + function.jobErrorFile.getPath)
} else {
gridEngineJob.setJoinFiles(true)
}
// If a project name is set specify the project name
if (Option(function.jobProject) != None) {
nativeSpecString += " -P " + function.jobProject
}
// If the job queue is set specify the job queue
if (Option(function.jobQueue) != None) {
nativeSpecString += " -q " + function.jobQueue
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
val memLim: String = function.memoryLimit.get + "G"
nativeSpecString += " -l mem_free=" + memLim + ",h_vmem=" + memLim
}
// If the priority is set (user specified Int) specify the priority
if (function.jobPriority.isDefined) {
nativeSpecString += " -p " + function.jobPriority.get
}
gridEngineJob.setNativeSpecification(nativeSpecString)
// Instead of running the function.commandLine, run "sh <jobScript>"
gridEngineJob.setRemoteCommand("sh")
gridEngineJob.setArgs(Collections.singletonList(jobScript.toString))
// Allow advanced users to update the request via QFunction.updateJobRun()
updateJobRun(gridEngineJob)
updateStatus(RunnerStatus.PENDING)
// Start the job and store the id so it can be killed in tryStop
try {
Retry.attempt(() => {
try {
jobId = GridEngineJobRunner.gridEngineSession.runJob(gridEngineJob)
} catch {
case de: DrmaaException => throw new QException("Unable to submit job: " + de.getLocalizedMessage)
}
}, 1, 5, 10)
} finally {
// Prevent memory leaks
GridEngineJobRunner.gridEngineSession.deleteJobTemplate(gridEngineJob)
}
logger.info("Submitted Grid Engine job id: " + jobId)
}
// If a project name is set specify the project name
if (function.jobProject != null) {
gridEngineJob.projectName = function.jobProject
}
// If the job queue is set specify the job queue
if (function.jobQueue != null) {
gridEngineJob.queue = function.jobQueue
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
gridEngineJob.jobMemoryLimit = function.memoryLimit.get + "GB"
}
// If the priority is set (user specified Int) specify the priority
if (function.jobPriority.isDefined) {
gridEngineJob.jobPriority = function.jobPriority.get
}
// Instead of running the function.commandLine, run "sh <jobScript>"
gridEngineJob.command = "sh " + jobScript
// Store the status so it can be returned in the status method.
myStatus = RunnerStatus.RUNNING
// Start the job and store the id so it can be killed in tryStop
myJobId = gridEngineJob.start()
*/
logger.warn("TODO: implement Grid Engine support")
}
// TODO: Return the latest status: RUNNING, FAILED, or DONE
def status = throw new RuntimeException("TODO: Grid Engine return status such as: " + RunnerStatus.FAILED)
def status = this.lastStatus
private def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
}
}
object GridEngineJobRunner extends Logging {
private val gridEngineSession = SessionFactory.getFactory.getSession
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
initGridEngine()
/**
* Initialize the Grid Engine library.
*/
private def initGridEngine() {
// TODO: Init
logger.warn("TODO Grid Engine: Initialize here.")
gridEngineSession.synchronized {
try {
gridEngineSession.init("")
} catch {
case de: DrmaaException => throw new QException("init() failed", de)
}
}
}
/**
@ -104,7 +154,14 @@ object GridEngineJobRunner extends Logging {
* @param runners Runners to update.
*/
def updateStatus(runners: Set[GridEngineJobRunner]) {
// TODO: Bulk update. If not possible this method can be removed here and in GridEngineJobManager.
var updatedRunners = Set.empty[GridEngineJobRunner]
gridEngineSession.synchronized {
runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner})
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
}
/**
@ -112,6 +169,70 @@ object GridEngineJobRunner extends Logging {
* @param runners Runners to stop.
*/
def tryStop(runners: Set[GridEngineJobRunner]) {
// TODO: Stop runners. SIGTERM(15) is preferred to SIGKILL(9).
// Stop runners. SIGTERM(15) is preferred to SIGKILL(9).
// Only way to send SIGTERM is for the Sys Admin set the terminate_method
// resource of the designated queue to SIGTERM
gridEngineSession.synchronized {
for (runner <- runners.filterNot(runner => Option(runner.jobId) == None)) {
try {
gridEngineSession.control(runner.jobId, Session.TERMINATE)
} catch {
case e =>
logger.error("Unable to kill job " + runner.jobId, e)
}
}
gridEngineSession.exit()
}
}
private def updateRunnerStatus(runner: GridEngineJobRunner): Boolean = {
var returnStatus: RunnerStatus.Value = null
try {
val jobStatus = gridEngineSession.getJobProgramStatus(runner.jobId);
jobStatus match {
case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.PENDING
case Session.DONE =>
val jobInfo: JobInfo = gridEngineSession.wait(runner.jobId, Session.TIMEOUT_NO_WAIT)
if ((jobInfo.hasExited && jobInfo.getExitStatus > 0)
|| jobInfo.hasSignaled
|| jobInfo.wasAborted)
returnStatus = RunnerStatus.FAILED
else
returnStatus = RunnerStatus.DONE
case Session.FAILED => returnStatus = RunnerStatus.FAILED
case Session.UNDETERMINED => logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId)
case _ => returnStatus = RunnerStatus.RUNNING
}
} catch {
// getJobProgramStatus will throw an exception once wait has run, as the
// job will be reaped. If the status is currently DONE or FAILED, return
// the status.
case de: DrmaaException =>
if (runner.lastStatus == RunnerStatus.DONE || runner.lastStatus == RunnerStatus.FAILED)
returnStatus = runner.lastStatus
else
logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de)
}
Option(returnStatus) match {
case Some(returnStatus) =>
runner.updateStatus(returnStatus)
return true
case None => return false
}
}
private def checkUnknownStatus(runner: GridEngineJobRunner) {
val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description))
}
}
override def finalize() {
gridEngineSession.exit()
}
}