From 59eb1f46634d60ccc3c1442fc5243a1df09016b2 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Thu, 21 Jul 2011 22:57:18 -0400 Subject: [PATCH] Memory limits changed from Int to Double. Updated LSF calls to read memory units from config along with tweaks to select hosts. Moved some common code from GridEngine and LSF to super classes. --- .../jna/lsf/v7_0_6/LibBatIntegrationTest.java | 25 +-- .../sting/queue/QSettings.scala | 2 +- .../queue/engine/CommandLineJobRunner.scala | 33 ++- .../sting/queue/engine/JobManager.scala | 4 +- .../sting/queue/engine/JobRunner.scala | 5 + .../sting/queue/engine/QGraph.scala | 5 +- .../gridengine/GridEngineJobRunner.scala | 57 ++--- .../queue/engine/lsf/Lsf706JobManager.scala | 2 +- .../queue/engine/lsf/Lsf706JobRunner.scala | 207 +++++++++--------- .../queue/engine/shell/ShellJobRunner.scala | 6 +- .../queue/function/CommandLineFunction.scala | 4 +- .../function/JavaCommandLineFunction.scala | 8 +- .../examples/HelloWorldPipelineTest.scala | 14 +- 13 files changed, 198 insertions(+), 174 deletions(-) diff --git a/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index aa6303a6f..77db34cbc 100644 --- a/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -34,7 +34,6 @@ import org.testng.annotations.Test; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; -import javax.jws.soap.SOAPBinding; import java.io.File; /** @@ -55,25 +54,25 @@ public class LibBatIntegrationTest extends BaseTest { @Test public void testReadConfEnv() { - LibLsf.config_param[] unitsParam = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4); + LibLsf.config_param[] configParams = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4); - unitsParam[0].paramName = "LSF_UNIT_FOR_LIMITS"; - unitsParam[1].paramName = "LSF_CONFDIR"; - unitsParam[2].paramName = "MADE_UP_PARAMETER"; + configParams[0].paramName = "LSF_UNIT_FOR_LIMITS"; + configParams[1].paramName = "LSF_CONFDIR"; + configParams[2].paramName = "MADE_UP_PARAMETER"; - Structure.autoWrite(unitsParam); + Structure.autoWrite(configParams); - if (LibLsf.ls_readconfenv(unitsParam[0], null) != 0) { + if (LibLsf.ls_readconfenv(configParams[0], null) != 0) { Assert.fail(LibLsf.ls_sysmsg()); } - Structure.autoRead(unitsParam); + Structure.autoRead(configParams); - System.out.println("LSF_UNIT_FOR_LIMITS: " + unitsParam[0].paramValue); - Assert.assertNotNull(unitsParam[1].paramValue); - Assert.assertNull(unitsParam[2].paramValue); - Assert.assertNull(unitsParam[3].paramName); - Assert.assertNull(unitsParam[3].paramValue); + System.out.println("LSF_UNIT_FOR_LIMITS: " + configParams[0].paramValue); + Assert.assertNotNull(configParams[1].paramValue); + Assert.assertNull(configParams[2].paramValue); + Assert.assertNull(configParams[3].paramName); + Assert.assertNull(configParams[3].paramValue); } @Test diff --git a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala index 71970a36b..05c1a1775 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -45,7 +45,7 @@ class QSettings { var jobPriority: Option[Int] = None @Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false) - var memoryLimit: Option[Int] = None + var memoryLimit: Option[Double] = None @Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false) var runDirectory = new File(".") diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index 2fbfab5ec..2e3108136 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -33,12 +33,29 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils} */ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { + /** The string representation of the identifier of the running job. */ + def jobIdString: String = null + /** A generated exec shell script. */ protected var jobScript: File = _ /** Which directory to use for the job status files. */ protected def jobStatusDir = function.jobTempDir + /** Amount of time a job can go without status before giving up. */ + private val unknownStatusMaxSeconds = 5 * 60 + + /** Last known status */ + protected var lastStatus: RunnerStatus.Value = _ + + /** The last time the status was updated */ + protected var lastStatusUpdate: Long = _ + + final override def status = this.lastStatus + + def residentRequestMB: Option[Double] = function.memoryLimit.map(_ * 1024) + def residentLimitMB: Option[Double] = residentRequestMB.map( _ * 1.2 ) + override def init() { super.init() var exec = new StringBuilder @@ -53,7 +70,21 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { } exec.append(function.commandLine) - this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) + this.jobScript = IOUtils.writeTempFile(exec.toString(), ".exec", "", jobStatusDir) + } + + protected def updateStatus(updatedStatus: RunnerStatus.Value) { + this.lastStatus = updatedStatus + this.lastStatusUpdate = System.currentTimeMillis + } + + override def checkUnknownStatus() { + val unknownStatusMillis = (System.currentTimeMillis - lastStatusUpdate) + if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + updateStatus(RunnerStatus.FAILED) + logger.error("Unable to read status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), jobIdString, function.description)) + } } override def cleanup() { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala index d2be4939a..30187f7e2 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala @@ -44,9 +44,9 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] { /** * Updates the status on a list of functions. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[TRunner]) { - } + def updateStatus(runners: Set[TRunner]): Set[TRunner] = Set.empty /** * Stops a list of functions. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 4b4d44988..de5fbde05 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -52,6 +52,11 @@ trait JobRunner[TFunction <: QFunction] { */ def status: RunnerStatus.Value + /** + * Checks if the status has been unknown for an extended period of time. + */ + def checkUnknownStatus() {} + /** * Returns the function to be run. */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 8ed3f84c1..a52e9c561 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -1005,7 +1005,10 @@ class QGraph extends Logging { .asInstanceOf[Set[JobRunner[QFunction]]] if (managerRunners.size > 0) try { - manager.updateStatus(managerRunners) + val updatedRunners = manager.updateStatus(managerRunners) + for (runner <- managerRunners.diff(updatedRunners)) { + runner.checkUnknownStatus() + } } catch { case e => /* ignore */ } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala index 82edf6221..8c639b5bb 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -40,12 +40,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine /** Job Id of the currently executing job. */ private var jobId: String = _ - - /** Last known status */ - private var lastStatus: RunnerStatus.Value = _ - - /** The last time the status was updated */ - protected var lastStatusUpdate: Long = _ + override def jobIdString = jobId def start() { GridEngineJobRunner.gridEngineSession.synchronized { @@ -82,11 +77,14 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine nativeSpecString += " -q " + function.jobQueue } - // If the memory limit is set (GB) specify the memory limit - if (function.memoryLimit.isDefined) { - val memAvl: String = function.memoryLimit.get + "G" - val memMax: String = (function.memoryLimit.get * 1.2 * 1024).ceil.toInt + "M" - nativeSpecString += " -l mem_free=" + memAvl + ",h_rss=" + memMax + // If the resident set size is requested pass on the memory request + if (residentRequestMB.isDefined) { + nativeSpecString += " -l mem_free=%dM".format(residentRequestMB.get.ceil.toInt) + } + + // If the resident set size limit is defined specify the memory limit + if (residentLimitMB.isDefined) { + nativeSpecString += " -l h_rss=%dM".format(residentLimitMB.get.ceil.toInt) } // If the priority is set (user specified Int) specify the priority @@ -121,21 +119,11 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine logger.info("Submitted Grid Engine job id: " + jobId) } } - - def status = this.lastStatus - - private def updateStatus(updatedStatus: RunnerStatus.Value) { - this.lastStatus = updatedStatus - this.lastStatusUpdate = System.currentTimeMillis - } } object GridEngineJobRunner extends Logging { private val gridEngineSession = SessionFactory.getFactory.getSession - /** Amount of time a job can go without status before giving up. */ - private val unknownStatusMaxSeconds = 5 * 60 - initGridEngine() /** @@ -156,16 +144,14 @@ object GridEngineJobRunner extends Logging { /** * Updates the status of a list of jobs. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[GridEngineJobRunner]) { + def updateStatus(runners: Set[GridEngineJobRunner]) = { var updatedRunners = Set.empty[GridEngineJobRunner] gridEngineSession.synchronized { runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner}) } - - for (runner <- runners.diff(updatedRunners)) { - checkUnknownStatus(runner) - } + updatedRunners } /** @@ -219,20 +205,11 @@ object GridEngineJobRunner extends Logging { logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de) } - Option(returnStatus) match { - case Some(returnStatus) => - runner.updateStatus(returnStatus) - return true - case None => return false - } - } - - private def checkUnknownStatus(runner: GridEngineJobRunner) { - val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate) - if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runner.updateStatus(RunnerStatus.FAILED) - logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description)) + if (returnStatus != null) { + runner.updateStatus(returnStatus) + true + } else { + false } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala index c0fff9125..23ddab619 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala @@ -34,6 +34,6 @@ class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] { def runnerType = classOf[Lsf706JobRunner] def create(function: CommandLineFunction) = new Lsf706JobRunner(function) - override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) } + override def updateStatus(runners: Set[Lsf706JobRunner]) = { Lsf706JobRunner.updateStatus(runners) } override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index ac2f036b4..46dd08332 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -32,8 +32,8 @@ import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference -import com.sun.jna.{StringArray, NativeLong} import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import com.sun.jna.{Structure, StringArray, NativeLong} /** * Runs jobs on an LSF compute cluster. @@ -45,12 +45,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR /** Job Id of the currently executing job. */ private var jobId = -1L - - /** Last known status */ - private var lastStatus: RunnerStatus.Value = _ - - /** The last time the status was updated */ - protected var lastStatusUpdate: Long = _ + override def jobIdString = jobId.toString /** * Dispatches the function on the LSF cluster. @@ -85,12 +80,19 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR request.options |= LibBat.SUB_QUEUE } - // If the memory limit is set (GB) specify the memory limit - if (function.memoryLimit.isDefined) { - request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" + // If the resident set size is requested pass on the memory request + if (residentRequestMB.isDefined) { + val memInUnits = Lsf706JobRunner.convertUnits(residentRequestMB.get) + request.resReq = "select[mem>%1$d] rusage[mem=%1$d]".format(memInUnits) request.options |= LibBat.SUB_RES_REQ } + // If the resident set size limit is defined specify the memory limit + if (residentLimitMB.isDefined) { + val memInUnits = Lsf706JobRunner.convertUnits(residentLimitMB.get) + request.rLimits(LibLsf.LSF_RLIMIT_RSS) = memInUnits + } + // If the priority is set (user specified Int) specify the priority if (function.jobPriority.isDefined) { request.userPriority = function.jobPriority.get @@ -122,11 +124,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR } } - def status = this.lastStatus - - private def updateStatus(updatedStatus: RunnerStatus.Value) { - this.lastStatus = updatedStatus - this.lastStatusUpdate = System.currentTimeMillis + override def checkUnknownStatus() { + // TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs. + // Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod + // LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct) + // LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist) + logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(jobId)) + super.checkUnknownStatus() } } @@ -137,17 +141,8 @@ object Lsf706JobRunner extends Logging { /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */ private val retryExpiredSeconds = 5 * 60 - /** Amount of time a job can go without status before giving up. */ - private val unknownStatusMaxSeconds = 5 * 60 - initLsf() - /** The name of the default queue. */ - private var defaultQueue: String = _ - - /** The run limits for each queue. */ - private var queueRlimitRun = Map.empty[String,Int] - /** * Initialize the Lsf library. */ @@ -161,8 +156,9 @@ object Lsf706JobRunner extends Logging { /** * Bulk updates job statuses. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[Lsf706JobRunner]) { + def updateStatus(runners: Set[Lsf706JobRunner]) = { var updatedRunners = Set.empty[Lsf706JobRunner] Lsf706JobRunner.lsfLibLock.synchronized { @@ -192,70 +188,7 @@ object Lsf706JobRunner extends Logging { } } - for (runner <- runners.diff(updatedRunners)) { - checkUnknownStatus(runner) - } - } - - /** - * Tries to stop any running jobs. - * @param runners Runners to stop. - */ - def tryStop(runners: Set[Lsf706JobRunner]) { - lsfLibLock.synchronized { - // lsb_killbulkjobs does not seem to forward SIGTERM, - // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. - for (runner <- runners.filterNot(_.jobId < 0)) { - try { - if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) - logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) - } catch { - case e => - logger.error("Unable to kill job " + runner.jobId, e) - } - } - } - } - - - /** - * Returns the run limit in seconds for the queue. - * If the queue name is null returns the length of the default queue. - * @param queue Name of the queue or null for the default queue. - * @return the run limit in seconds for the queue. - */ - private def getRlimitRun(queue: String) = { - lsfLibLock.synchronized { - if (queue == null) { - if (defaultQueue != null) { - queueRlimitRun(defaultQueue) - } else { - // Get the info on the default queue. - val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) - if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) - defaultQueue = queueInfo.queue - val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += defaultQueue -> limit - limit - } - } else { - queueRlimitRun.get(queue) match { - case Some(limit) => limit - case None => - // Cache miss. Go get the run limits from LSF. - val queues = new StringArray(Array[String](queue)) - val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) - if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) - val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += queue -> limit - limit - } - } - } + updatedRunners } private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) { @@ -280,20 +213,6 @@ object Lsf706JobRunner extends Logging { ) } - private def checkUnknownStatus(runner: Lsf706JobRunner) { - // TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs. - // Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod - // LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct) - // LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist) - logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId)) - val unknownStatusMillis = (System.currentTimeMillis - runner.lastStatusUpdate) - if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runner.updateStatus(RunnerStatus.FAILED) - logger.error("Unable to read LSF status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), runner.jobId, runner.function.description)) - } - } - /** * Returns true if LSF is expected to retry running the function. * @param exitInfo The reason the job exited. @@ -309,4 +228,86 @@ object Lsf706JobRunner extends Logging { } } } + + /** + * Tries to stop any running jobs. + * @param runners Runners to stop. + */ + def tryStop(runners: Set[Lsf706JobRunner]) { + lsfLibLock.synchronized { + // lsb_killbulkjobs does not seem to forward SIGTERM, + // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. + for (runner <- runners.filterNot(_.jobId < 0)) { + try { + if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) + } catch { + case e => + logger.error("Unable to kill job " + runner.jobId, e) + } + } + } + } + + /** The name of the default queue. */ + private lazy val defaultQueue: String = { + lsfLibLock.synchronized { + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) + queueInfo.queue + } + } + + /** The run limits for each queue. */ + private var queueRlimitRun = Map.empty[String,Int] + + /** + * Returns the run limit in seconds for the queue. + * If the queue name is null returns the length of the default queue. + * @param queue Name of the queue or null for the default queue. + * @return the run limit in seconds for the queue. + */ + private def getRlimitRun(queueName: String) = { + lsfLibLock.synchronized { + val queue = if (queueName == null) defaultQueue else queueName + queueRlimitRun.get(queue) match { + case Some(limit) => limit + case None => + // Cache miss. Go get the run limits from LSF. + val queues = new StringArray(Array(queue)) + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += queue -> limit + limit + } + } + } + + private lazy val unitDivisor: Double = { + lsfLibLock.synchronized { + val unitsParam: Array[LibLsf.config_param] = new LibLsf.config_param().toArray(2).asInstanceOf[Array[LibLsf.config_param]] + unitsParam(0).paramName = "LSF_UNIT_FOR_LIMITS" + + Structure.autoWrite(unitsParam.asInstanceOf[Array[Structure]]) + if (LibLsf.ls_readconfenv(unitsParam(0), null) != 0) + throw new QException(LibBat.lsb_sperror("ls_readconfenv() failed")) + Structure.autoRead(unitsParam.asInstanceOf[Array[Structure]]) + + unitsParam(0).paramValue match { + case "MB" => 1D + case "GB" => 1024D + case "TB" => 1024D * 1024 + case "PB" => 1024D * 1024 * 1024 + case "EB" => 1024D * 1024 * 1024 * 1024 + case null => 1D + } + } + } + + private def convertUnits(mb: Double) = (mb / unitDivisor).ceil.toInt } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala index 603511a30..128d8773c 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -50,10 +50,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu // Allow advanced users to update the job. updateJobRun(job) - runStatus = RunnerStatus.RUNNING + updateStatus(RunnerStatus.RUNNING) job.run() - runStatus = RunnerStatus.DONE + updateStatus(RunnerStatus.FAILED) } - def status = runStatus + override def checkUnknownStatus() {} } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 2b1abb2d0..c62fdcd7c 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -9,7 +9,7 @@ trait CommandLineFunction extends QFunction with Logging { def commandLine: String /** Upper memory limit */ - var memoryLimit: Option[Int] = None + var memoryLimit: Option[Double] = None /** Job project to run the command */ var jobProject: String = _ @@ -56,7 +56,7 @@ trait CommandLineFunction extends QFunction with Logging { if (memoryLimit.isEmpty) memoryLimit = qSettings.memoryLimit - super.freezeFieldValues + super.freezeFieldValues() } /** diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala index 72445442e..e8279f62b 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala @@ -47,7 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction { /** * Memory limit for the java executable, or if None will use the default memoryLimit. */ - var javaMemoryLimit: Option[Int] = None + var javaMemoryLimit: Option[Double] = None /** * Returns the java executable to run. @@ -61,8 +61,8 @@ trait JavaCommandLineFunction extends CommandLineFunction { null } - override def freezeFieldValues = { - super.freezeFieldValues + override def freezeFieldValues() { + super.freezeFieldValues() if (javaMemoryLimit.isEmpty && memoryLimit.isDefined) javaMemoryLimit = memoryLimit @@ -72,7 +72,7 @@ trait JavaCommandLineFunction extends CommandLineFunction { } def javaOpts = "%s -Djava.io.tmpdir=%s" - .format(optional(" -Xmx", javaMemoryLimit, "g"), jobTempDir) + .format(optional(" -Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m"), jobTempDir) def commandLine = "java%s %s" .format(javaOpts, javaExecutable) diff --git a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala index 0871e769b..7c76823da 100644 --- a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala +++ b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -29,7 +29,7 @@ import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec} class HelloWorldPipelineTest { @Test - def testHelloWorld { + def testHelloWorld() { val spec = new PipelineTestSpec spec.name = "HelloWorld" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala" @@ -37,15 +37,23 @@ class HelloWorldPipelineTest { } @Test - def testHelloWorldWithPrefix { + def testHelloWorldWithPrefix() { val spec = new PipelineTestSpec spec.name = "HelloWorldWithPrefix" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPrefix HelloWorld" PipelineTest.executeTest(spec) } + @Test + def testHelloWorldWithMemoryLimit() { + val spec = new PipelineTestSpec + spec.name = "HelloWorldWithPrefix" + spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -memLimit 1.25" + PipelineTest.executeTest(spec) + } + @Test(enabled=false) - def testHelloWorldWithPriority { + def testHelloWorldWithPriority() { val spec = new PipelineTestSpec spec.name = "HelloWorldWithPriority" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPriority 100"