diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala index e001f8665..71970a36b 100644 --- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -36,7 +36,7 @@ class QSettings { var jobNamePrefix: String = QSettings.processNamePrefix @Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false) - var jobProject: String = "Queue" + var jobProject: String = _ @Argument(fullName="job_queue", shortName="jobQueue", doc="Default queue for compute farm jobs.", required=false) var jobQueue: String = _ diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index eb31796b2..bfcc4d48c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -353,6 +353,8 @@ class QGraph extends Logging { try { if (settings.bsub) settings.jobRunner = "Lsf706" + else if (settings.qsub) + settings.jobRunner = "GridEngine" else if (settings.jobRunner == null) settings.jobRunner = "Shell" commandLineManager = commandLinePluginManager.createByName(settings.jobRunner) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index be4dea51a..6ece600dd 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -42,6 +42,9 @@ class QGraphSettings { @Argument(fullName="bsub", shortName="bsub", doc="Equivalent to -jobRunner Lsf706", required=false) var bsub = false + @Argument(fullName="qsub", shortName="qsub", doc="Equivalent to -jobRunner GridEngine", required=false) + var qsub = false + @Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false) var getStatus = false diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala index 39870e786..9918669fa 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -24,79 +24,129 @@ package org.broadinstitute.sting.queue.engine.gridengine -import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.QException +import org.broadinstitute.sting.queue.util.{Logging,Retry} import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import org.ggf.drmaa.{DrmaaException,JobInfo,JobTemplate,Session,SessionFactory} +import java.util.Collections +/** + * Runs jobs on a Grid Engine compute cluster. + */ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { // Run the static initializer for GridEngineJobRunner GridEngineJobRunner - def start() = { - // TODO: Copy settings from function to GridEngine syntax. - /* - val gridEngineJob = new ... + /** Job Id of the currently executing job. */ + private var jobId: String = _ - // Set the display name to 4000 characters of the description (or whatever the GE max is) - gridEngineJob.displayName = function.description.take(4000) + /** Last known status */ + private var lastStatus: RunnerStatus.Value = _ - // Set the output file for stdout - gridEngineJob.outputFile = function.jobOutputFile.getPath + /** The last time the status was updated */ + protected var lastStatusUpdate: Long = _ - // Set the current working directory - gridEngineJob.workingDirectory = function.commandDirectory.getPath + def start() { + GridEngineJobRunner.gridEngineSession.synchronized { + val gridEngineJob: JobTemplate = GridEngineJobRunner.gridEngineSession.createJobTemplate - // If the error file is set specify the separate output for stderr - if (function.jobErrorFile != null) { - gridEngineJob.errFile = function.jobErrorFile.getPath + // Force the remote environment to inherit local environment settings + var nativeSpecString: String = "-V" + + // Set the display name to 1024 characters of the description + gridEngineJob.setJobName(function.description.take(1024)) + + // Set the output file for stdout + gridEngineJob.setOutputPath(":" + function.jobOutputFile.getPath) + + // Set the current working directory + gridEngineJob.setWorkingDirectory(function.commandDirectory.getPath) + + // If the error file is set specify the separate output for stderr + // Otherwise join with stdout + if (Option(function.jobErrorFile) != None) { + gridEngineJob.setErrorPath(":" + function.jobErrorFile.getPath) + } else { + gridEngineJob.setJoinFiles(true) + } + + // If a project name is set specify the project name + if (Option(function.jobProject) != None) { + nativeSpecString += " -P " + function.jobProject + } + + // If the job queue is set specify the job queue + if (Option(function.jobQueue) != None) { + nativeSpecString += " -q " + function.jobQueue + } + + // If the memory limit is set (GB) specify the memory limit + if (function.memoryLimit.isDefined) { + val memLim: String = function.memoryLimit.get + "G" + nativeSpecString += " -l mem_free=" + memLim + ",h_vmem=" + memLim + } + + // If the priority is set (user specified Int) specify the priority + if (function.jobPriority.isDefined) { + nativeSpecString += " -p " + function.jobPriority.get + } + + gridEngineJob.setNativeSpecification(nativeSpecString) + + // Instead of running the function.commandLine, run "sh " + gridEngineJob.setRemoteCommand("sh") + gridEngineJob.setArgs(Collections.singletonList(jobScript.toString)) + + // Allow advanced users to update the request via QFunction.updateJobRun() + updateJobRun(gridEngineJob) + + updateStatus(RunnerStatus.PENDING) + + // Start the job and store the id so it can be killed in tryStop + try { + Retry.attempt(() => { + try { + jobId = GridEngineJobRunner.gridEngineSession.runJob(gridEngineJob) + } catch { + case de: DrmaaException => throw new QException("Unable to submit job: " + de.getLocalizedMessage) + } + }, 1, 5, 10) + } finally { + // Prevent memory leaks + GridEngineJobRunner.gridEngineSession.deleteJobTemplate(gridEngineJob) + } + logger.info("Submitted Grid Engine job id: " + jobId) } - - // If a project name is set specify the project name - if (function.jobProject != null) { - gridEngineJob.projectName = function.jobProject - } - - // If the job queue is set specify the job queue - if (function.jobQueue != null) { - gridEngineJob.queue = function.jobQueue - } - - // If the memory limit is set (GB) specify the memory limit - if (function.memoryLimit.isDefined) { - gridEngineJob.jobMemoryLimit = function.memoryLimit.get + "GB" - } - - // If the priority is set (user specified Int) specify the priority - if (function.jobPriority.isDefined) { - gridEngineJob.jobPriority = function.jobPriority.get - } - - // Instead of running the function.commandLine, run "sh " - gridEngineJob.command = "sh " + jobScript - - // Store the status so it can be returned in the status method. - myStatus = RunnerStatus.RUNNING - - // Start the job and store the id so it can be killed in tryStop - myJobId = gridEngineJob.start() - */ - - logger.warn("TODO: implement Grid Engine support") } - // TODO: Return the latest status: RUNNING, FAILED, or DONE - def status = throw new RuntimeException("TODO: Grid Engine return status such as: " + RunnerStatus.FAILED) + def status = this.lastStatus + + private def updateStatus(updatedStatus: RunnerStatus.Value) { + this.lastStatus = updatedStatus + this.lastStatusUpdate = System.currentTimeMillis + } } object GridEngineJobRunner extends Logging { + private val gridEngineSession = SessionFactory.getFactory.getSession + + /** Amount of time a job can go without status before giving up. */ + private val unknownStatusMaxSeconds = 5 * 60 + initGridEngine() /** * Initialize the Grid Engine library. */ private def initGridEngine() { - // TODO: Init - logger.warn("TODO Grid Engine: Initialize here.") + gridEngineSession.synchronized { + try { + gridEngineSession.init("") + } catch { + case de: DrmaaException => throw new QException("init() failed", de) + } + } } /** @@ -104,7 +154,14 @@ object GridEngineJobRunner extends Logging { * @param runners Runners to update. */ def updateStatus(runners: Set[GridEngineJobRunner]) { - // TODO: Bulk update. If not possible this method can be removed here and in GridEngineJobManager. + var updatedRunners = Set.empty[GridEngineJobRunner] + gridEngineSession.synchronized { + runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner}) + } + + for (runner <- runners.diff(updatedRunners)) { + checkUnknownStatus(runner) + } } /** @@ -112,6 +169,70 @@ object GridEngineJobRunner extends Logging { * @param runners Runners to stop. */ def tryStop(runners: Set[GridEngineJobRunner]) { - // TODO: Stop runners. SIGTERM(15) is preferred to SIGKILL(9). + // Stop runners. SIGTERM(15) is preferred to SIGKILL(9). + // Only way to send SIGTERM is for the Sys Admin set the terminate_method + // resource of the designated queue to SIGTERM + gridEngineSession.synchronized { + for (runner <- runners.filterNot(runner => Option(runner.jobId) == None)) { + try { + gridEngineSession.control(runner.jobId, Session.TERMINATE) + } catch { + case e => + logger.error("Unable to kill job " + runner.jobId, e) + } + } + gridEngineSession.exit() + } + } + + private def updateRunnerStatus(runner: GridEngineJobRunner): Boolean = { + var returnStatus: RunnerStatus.Value = null + + try { + val jobStatus = gridEngineSession.getJobProgramStatus(runner.jobId); + jobStatus match { + case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.PENDING + case Session.DONE => + val jobInfo: JobInfo = gridEngineSession.wait(runner.jobId, Session.TIMEOUT_NO_WAIT) + if ((jobInfo.hasExited && jobInfo.getExitStatus > 0) + || jobInfo.hasSignaled + || jobInfo.wasAborted) + returnStatus = RunnerStatus.FAILED + else + returnStatus = RunnerStatus.DONE + case Session.FAILED => returnStatus = RunnerStatus.FAILED + case Session.UNDETERMINED => logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId) + case _ => returnStatus = RunnerStatus.RUNNING + } + } catch { + // getJobProgramStatus will throw an exception once wait has run, as the + // job will be reaped. If the status is currently DONE or FAILED, return + // the status. + case de: DrmaaException => + if (runner.lastStatus == RunnerStatus.DONE || runner.lastStatus == RunnerStatus.FAILED) + returnStatus = runner.lastStatus + else + logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de) + } + + Option(returnStatus) match { + case Some(returnStatus) => + runner.updateStatus(returnStatus) + return true + case None => return false + } + } + + private def checkUnknownStatus(runner: GridEngineJobRunner) { + val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate) + if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + runner.updateStatus(RunnerStatus.FAILED) + logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description)) + } + } + + override def finalize() { + gridEngineSession.exit() } }