Memory limits changed from Int to Double.

Updated LSF calls to read memory units from config along with tweaks to select hosts.
Moved some common code from GridEngine and LSF to super classes.
This commit is contained in:
Khalid Shakir 2011-07-21 22:57:18 -04:00
parent 15610ce0c3
commit 59eb1f4663
13 changed files with 198 additions and 174 deletions

View File

@ -34,7 +34,6 @@ import org.testng.annotations.Test;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*;
import javax.jws.soap.SOAPBinding;
import java.io.File;
/**
@ -55,25 +54,25 @@ public class LibBatIntegrationTest extends BaseTest {
@Test
public void testReadConfEnv() {
LibLsf.config_param[] unitsParam = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4);
LibLsf.config_param[] configParams = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4);
unitsParam[0].paramName = "LSF_UNIT_FOR_LIMITS";
unitsParam[1].paramName = "LSF_CONFDIR";
unitsParam[2].paramName = "MADE_UP_PARAMETER";
configParams[0].paramName = "LSF_UNIT_FOR_LIMITS";
configParams[1].paramName = "LSF_CONFDIR";
configParams[2].paramName = "MADE_UP_PARAMETER";
Structure.autoWrite(unitsParam);
Structure.autoWrite(configParams);
if (LibLsf.ls_readconfenv(unitsParam[0], null) != 0) {
if (LibLsf.ls_readconfenv(configParams[0], null) != 0) {
Assert.fail(LibLsf.ls_sysmsg());
}
Structure.autoRead(unitsParam);
Structure.autoRead(configParams);
System.out.println("LSF_UNIT_FOR_LIMITS: " + unitsParam[0].paramValue);
Assert.assertNotNull(unitsParam[1].paramValue);
Assert.assertNull(unitsParam[2].paramValue);
Assert.assertNull(unitsParam[3].paramName);
Assert.assertNull(unitsParam[3].paramValue);
System.out.println("LSF_UNIT_FOR_LIMITS: " + configParams[0].paramValue);
Assert.assertNotNull(configParams[1].paramValue);
Assert.assertNull(configParams[2].paramValue);
Assert.assertNull(configParams[3].paramName);
Assert.assertNull(configParams[3].paramValue);
}
@Test

View File

@ -45,7 +45,7 @@ class QSettings {
var jobPriority: Option[Int] = None
@Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
var memoryLimit: Option[Int] = None
var memoryLimit: Option[Double] = None
@Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false)
var runDirectory = new File(".")

View File

@ -33,12 +33,29 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
*/
trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** The string representation of the identifier of the running job. */
def jobIdString: String = null
/** A generated exec shell script. */
protected var jobScript: File = _
/** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
/** Last known status */
protected var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
final override def status = this.lastStatus
def residentRequestMB: Option[Double] = function.memoryLimit.map(_ * 1024)
def residentLimitMB: Option[Double] = residentRequestMB.map( _ * 1.2 )
override def init() {
super.init()
var exec = new StringBuilder
@ -53,7 +70,21 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
}
exec.append(function.commandLine)
this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
this.jobScript = IOUtils.writeTempFile(exec.toString(), ".exec", "", jobStatusDir)
}
protected def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
}
override def checkUnknownStatus() {
val unknownStatusMillis = (System.currentTimeMillis - lastStatusUpdate)
if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), jobIdString, function.description))
}
}
override def cleanup() {

View File

@ -44,9 +44,9 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] {
/**
* Updates the status on a list of functions.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[TRunner]) {
}
def updateStatus(runners: Set[TRunner]): Set[TRunner] = Set.empty
/**
* Stops a list of functions.

View File

@ -52,6 +52,11 @@ trait JobRunner[TFunction <: QFunction] {
*/
def status: RunnerStatus.Value
/**
* Checks if the status has been unknown for an extended period of time.
*/
def checkUnknownStatus() {}
/**
* Returns the function to be run.
*/

View File

@ -1005,7 +1005,10 @@ class QGraph extends Logging {
.asInstanceOf[Set[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
manager.updateStatus(managerRunners)
val updatedRunners = manager.updateStatus(managerRunners)
for (runner <- managerRunners.diff(updatedRunners)) {
runner.checkUnknownStatus()
}
} catch {
case e => /* ignore */
}

View File

@ -40,12 +40,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
/** Job Id of the currently executing job. */
private var jobId: String = _
/** Last known status */
private var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
override def jobIdString = jobId
def start() {
GridEngineJobRunner.gridEngineSession.synchronized {
@ -82,11 +77,14 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
nativeSpecString += " -q " + function.jobQueue
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
val memAvl: String = function.memoryLimit.get + "G"
val memMax: String = (function.memoryLimit.get * 1.2 * 1024).ceil.toInt + "M"
nativeSpecString += " -l mem_free=" + memAvl + ",h_rss=" + memMax
// If the resident set size is requested pass on the memory request
if (residentRequestMB.isDefined) {
nativeSpecString += " -l mem_free=%dM".format(residentRequestMB.get.ceil.toInt)
}
// If the resident set size limit is defined specify the memory limit
if (residentLimitMB.isDefined) {
nativeSpecString += " -l h_rss=%dM".format(residentLimitMB.get.ceil.toInt)
}
// If the priority is set (user specified Int) specify the priority
@ -121,21 +119,11 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
logger.info("Submitted Grid Engine job id: " + jobId)
}
}
def status = this.lastStatus
private def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
}
}
object GridEngineJobRunner extends Logging {
private val gridEngineSession = SessionFactory.getFactory.getSession
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
initGridEngine()
/**
@ -156,16 +144,14 @@ object GridEngineJobRunner extends Logging {
/**
* Updates the status of a list of jobs.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[GridEngineJobRunner]) {
def updateStatus(runners: Set[GridEngineJobRunner]) = {
var updatedRunners = Set.empty[GridEngineJobRunner]
gridEngineSession.synchronized {
runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner})
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
updatedRunners
}
/**
@ -219,20 +205,11 @@ object GridEngineJobRunner extends Logging {
logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de)
}
Option(returnStatus) match {
case Some(returnStatus) =>
runner.updateStatus(returnStatus)
return true
case None => return false
}
}
private def checkUnknownStatus(runner: GridEngineJobRunner) {
val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description))
if (returnStatus != null) {
runner.updateStatus(returnStatus)
true
} else {
false
}
}

View File

@ -34,6 +34,6 @@ class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] {
def runnerType = classOf[Lsf706JobRunner]
def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) }
override def updateStatus(runners: Set[Lsf706JobRunner]) = { Lsf706JobRunner.updateStatus(runners) }
override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
}

View File

@ -32,8 +32,8 @@ import org.broadinstitute.sting.utils.Utils
import org.broadinstitute.sting.jna.clibrary.LibC
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
import com.sun.jna.ptr.IntByReference
import com.sun.jna.{StringArray, NativeLong}
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
import com.sun.jna.{Structure, StringArray, NativeLong}
/**
* Runs jobs on an LSF compute cluster.
@ -45,12 +45,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
/** Job Id of the currently executing job. */
private var jobId = -1L
/** Last known status */
private var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
override def jobIdString = jobId.toString
/**
* Dispatches the function on the LSF cluster.
@ -85,12 +80,19 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
request.options |= LibBat.SUB_QUEUE
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
// If the resident set size is requested pass on the memory request
if (residentRequestMB.isDefined) {
val memInUnits = Lsf706JobRunner.convertUnits(residentRequestMB.get)
request.resReq = "select[mem>%1$d] rusage[mem=%1$d]".format(memInUnits)
request.options |= LibBat.SUB_RES_REQ
}
// If the resident set size limit is defined specify the memory limit
if (residentLimitMB.isDefined) {
val memInUnits = Lsf706JobRunner.convertUnits(residentLimitMB.get)
request.rLimits(LibLsf.LSF_RLIMIT_RSS) = memInUnits
}
// If the priority is set (user specified Int) specify the priority
if (function.jobPriority.isDefined) {
request.userPriority = function.jobPriority.get
@ -122,11 +124,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
}
}
def status = this.lastStatus
private def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
override def checkUnknownStatus() {
// TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs.
// Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(jobId))
super.checkUnknownStatus()
}
}
@ -137,17 +141,8 @@ object Lsf706JobRunner extends Logging {
/** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
private val retryExpiredSeconds = 5 * 60
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
initLsf()
/** The name of the default queue. */
private var defaultQueue: String = _
/** The run limits for each queue. */
private var queueRlimitRun = Map.empty[String,Int]
/**
* Initialize the Lsf library.
*/
@ -161,8 +156,9 @@ object Lsf706JobRunner extends Logging {
/**
* Bulk updates job statuses.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[Lsf706JobRunner]) {
def updateStatus(runners: Set[Lsf706JobRunner]) = {
var updatedRunners = Set.empty[Lsf706JobRunner]
Lsf706JobRunner.lsfLibLock.synchronized {
@ -192,70 +188,7 @@ object Lsf706JobRunner extends Logging {
}
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
}
/**
* Tries to stop any running jobs.
* @param runners Runners to stop.
*/
def tryStop(runners: Set[Lsf706JobRunner]) {
lsfLibLock.synchronized {
// lsb_killbulkjobs does not seem to forward SIGTERM,
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
for (runner <- runners.filterNot(_.jobId < 0)) {
try {
if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
} catch {
case e =>
logger.error("Unable to kill job " + runner.jobId, e)
}
}
}
}
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
private def getRlimitRun(queue: String) = {
lsfLibLock.synchronized {
if (queue == null) {
if (defaultQueue != null) {
queueRlimitRun(defaultQueue)
} else {
// Get the info on the default queue.
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
defaultQueue = queueInfo.queue
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += defaultQueue -> limit
limit
}
} else {
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array[String](queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue))
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
}
}
updatedRunners
}
private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) {
@ -280,20 +213,6 @@ object Lsf706JobRunner extends Logging {
)
}
private def checkUnknownStatus(runner: Lsf706JobRunner) {
// TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs.
// Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId))
val unknownStatusMillis = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read LSF status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), runner.jobId, runner.function.description))
}
}
/**
* Returns true if LSF is expected to retry running the function.
* @param exitInfo The reason the job exited.
@ -309,4 +228,86 @@ object Lsf706JobRunner extends Logging {
}
}
}
/**
* Tries to stop any running jobs.
* @param runners Runners to stop.
*/
def tryStop(runners: Set[Lsf706JobRunner]) {
lsfLibLock.synchronized {
// lsb_killbulkjobs does not seem to forward SIGTERM,
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
for (runner <- runners.filterNot(_.jobId < 0)) {
try {
if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
} catch {
case e =>
logger.error("Unable to kill job " + runner.jobId, e)
}
}
}
}
/** The name of the default queue. */
private lazy val defaultQueue: String = {
lsfLibLock.synchronized {
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
queueInfo.queue
}
}
/** The run limits for each queue. */
private var queueRlimitRun = Map.empty[String,Int]
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
private def getRlimitRun(queueName: String) = {
lsfLibLock.synchronized {
val queue = if (queueName == null) defaultQueue else queueName
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array(queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue))
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
}
}
private lazy val unitDivisor: Double = {
lsfLibLock.synchronized {
val unitsParam: Array[LibLsf.config_param] = new LibLsf.config_param().toArray(2).asInstanceOf[Array[LibLsf.config_param]]
unitsParam(0).paramName = "LSF_UNIT_FOR_LIMITS"
Structure.autoWrite(unitsParam.asInstanceOf[Array[Structure]])
if (LibLsf.ls_readconfenv(unitsParam(0), null) != 0)
throw new QException(LibBat.lsb_sperror("ls_readconfenv() failed"))
Structure.autoRead(unitsParam.asInstanceOf[Array[Structure]])
unitsParam(0).paramValue match {
case "MB" => 1D
case "GB" => 1024D
case "TB" => 1024D * 1024
case "PB" => 1024D * 1024 * 1024
case "EB" => 1024D * 1024 * 1024 * 1024
case null => 1D
}
}
}
private def convertUnits(mb: Double) = (mb / unitDivisor).ceil.toInt
}

View File

@ -50,10 +50,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu
// Allow advanced users to update the job.
updateJobRun(job)
runStatus = RunnerStatus.RUNNING
updateStatus(RunnerStatus.RUNNING)
job.run()
runStatus = RunnerStatus.DONE
updateStatus(RunnerStatus.FAILED)
}
def status = runStatus
override def checkUnknownStatus() {}
}

View File

@ -9,7 +9,7 @@ trait CommandLineFunction extends QFunction with Logging {
def commandLine: String
/** Upper memory limit */
var memoryLimit: Option[Int] = None
var memoryLimit: Option[Double] = None
/** Job project to run the command */
var jobProject: String = _
@ -56,7 +56,7 @@ trait CommandLineFunction extends QFunction with Logging {
if (memoryLimit.isEmpty)
memoryLimit = qSettings.memoryLimit
super.freezeFieldValues
super.freezeFieldValues()
}
/**

View File

@ -47,7 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
/**
* Memory limit for the java executable, or if None will use the default memoryLimit.
*/
var javaMemoryLimit: Option[Int] = None
var javaMemoryLimit: Option[Double] = None
/**
* Returns the java executable to run.
@ -61,8 +61,8 @@ trait JavaCommandLineFunction extends CommandLineFunction {
null
}
override def freezeFieldValues = {
super.freezeFieldValues
override def freezeFieldValues() {
super.freezeFieldValues()
if (javaMemoryLimit.isEmpty && memoryLimit.isDefined)
javaMemoryLimit = memoryLimit
@ -72,7 +72,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
}
def javaOpts = "%s -Djava.io.tmpdir=%s"
.format(optional(" -Xmx", javaMemoryLimit, "g"), jobTempDir)
.format(optional(" -Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m"), jobTempDir)
def commandLine = "java%s %s"
.format(javaOpts, javaExecutable)

View File

@ -29,7 +29,7 @@ import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec}
class HelloWorldPipelineTest {
@Test
def testHelloWorld {
def testHelloWorld() {
val spec = new PipelineTestSpec
spec.name = "HelloWorld"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala"
@ -37,15 +37,23 @@ class HelloWorldPipelineTest {
}
@Test
def testHelloWorldWithPrefix {
def testHelloWorldWithPrefix() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPrefix"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPrefix HelloWorld"
PipelineTest.executeTest(spec)
}
@Test
def testHelloWorldWithMemoryLimit() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPrefix"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -memLimit 1.25"
PipelineTest.executeTest(spec)
}
@Test(enabled=false)
def testHelloWorldWithPriority {
def testHelloWorldWithPriority() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPriority"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPriority 100"