Merge branch 'master' of ssh://nickel.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable

This commit is contained in:
Guillermo del Angel 2011-07-22 08:13:58 -04:00
commit 3d0853149b
13 changed files with 198 additions and 174 deletions

View File

@ -34,7 +34,6 @@ import org.testng.annotations.Test;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*;
import javax.jws.soap.SOAPBinding;
import java.io.File;
/**
@ -55,25 +54,25 @@ public class LibBatIntegrationTest extends BaseTest {
@Test
public void testReadConfEnv() {
LibLsf.config_param[] unitsParam = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4);
LibLsf.config_param[] configParams = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4);
unitsParam[0].paramName = "LSF_UNIT_FOR_LIMITS";
unitsParam[1].paramName = "LSF_CONFDIR";
unitsParam[2].paramName = "MADE_UP_PARAMETER";
configParams[0].paramName = "LSF_UNIT_FOR_LIMITS";
configParams[1].paramName = "LSF_CONFDIR";
configParams[2].paramName = "MADE_UP_PARAMETER";
Structure.autoWrite(unitsParam);
Structure.autoWrite(configParams);
if (LibLsf.ls_readconfenv(unitsParam[0], null) != 0) {
if (LibLsf.ls_readconfenv(configParams[0], null) != 0) {
Assert.fail(LibLsf.ls_sysmsg());
}
Structure.autoRead(unitsParam);
Structure.autoRead(configParams);
System.out.println("LSF_UNIT_FOR_LIMITS: " + unitsParam[0].paramValue);
Assert.assertNotNull(unitsParam[1].paramValue);
Assert.assertNull(unitsParam[2].paramValue);
Assert.assertNull(unitsParam[3].paramName);
Assert.assertNull(unitsParam[3].paramValue);
System.out.println("LSF_UNIT_FOR_LIMITS: " + configParams[0].paramValue);
Assert.assertNotNull(configParams[1].paramValue);
Assert.assertNull(configParams[2].paramValue);
Assert.assertNull(configParams[3].paramName);
Assert.assertNull(configParams[3].paramValue);
}
@Test

View File

@ -45,7 +45,7 @@ class QSettings {
var jobPriority: Option[Int] = None
@Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
var memoryLimit: Option[Int] = None
var memoryLimit: Option[Double] = None
@Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false)
var runDirectory = new File(".")

View File

@ -33,12 +33,29 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
*/
trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** The string representation of the identifier of the running job. */
def jobIdString: String = null
/** A generated exec shell script. */
protected var jobScript: File = _
/** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
/** Last known status */
protected var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
final override def status = this.lastStatus
def residentRequestMB: Option[Double] = function.memoryLimit.map(_ * 1024)
def residentLimitMB: Option[Double] = residentRequestMB.map( _ * 1.2 )
override def init() {
super.init()
var exec = new StringBuilder
@ -53,7 +70,21 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
}
exec.append(function.commandLine)
this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
this.jobScript = IOUtils.writeTempFile(exec.toString(), ".exec", "", jobStatusDir)
}
protected def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
}
override def checkUnknownStatus() {
val unknownStatusMillis = (System.currentTimeMillis - lastStatusUpdate)
if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), jobIdString, function.description))
}
}
override def cleanup() {

View File

@ -44,9 +44,9 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] {
/**
* Updates the status on a list of functions.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[TRunner]) {
}
def updateStatus(runners: Set[TRunner]): Set[TRunner] = Set.empty
/**
* Stops a list of functions.

View File

@ -52,6 +52,11 @@ trait JobRunner[TFunction <: QFunction] {
*/
def status: RunnerStatus.Value
/**
* Checks if the status has been unknown for an extended period of time.
*/
def checkUnknownStatus() {}
/**
* Returns the function to be run.
*/

View File

@ -1005,7 +1005,10 @@ class QGraph extends Logging {
.asInstanceOf[Set[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
manager.updateStatus(managerRunners)
val updatedRunners = manager.updateStatus(managerRunners)
for (runner <- managerRunners.diff(updatedRunners)) {
runner.checkUnknownStatus()
}
} catch {
case e => /* ignore */
}

View File

@ -40,12 +40,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
/** Job Id of the currently executing job. */
private var jobId: String = _
/** Last known status */
private var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
override def jobIdString = jobId
def start() {
GridEngineJobRunner.gridEngineSession.synchronized {
@ -82,11 +77,14 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
nativeSpecString += " -q " + function.jobQueue
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
val memAvl: String = function.memoryLimit.get + "G"
val memMax: String = (function.memoryLimit.get * 1.2 * 1024).ceil.toInt + "M"
nativeSpecString += " -l mem_free=" + memAvl + ",h_rss=" + memMax
// If the resident set size is requested pass on the memory request
if (residentRequestMB.isDefined) {
nativeSpecString += " -l mem_free=%dM".format(residentRequestMB.get.ceil.toInt)
}
// If the resident set size limit is defined specify the memory limit
if (residentLimitMB.isDefined) {
nativeSpecString += " -l h_rss=%dM".format(residentLimitMB.get.ceil.toInt)
}
// If the priority is set (user specified Int) specify the priority
@ -121,21 +119,11 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
logger.info("Submitted Grid Engine job id: " + jobId)
}
}
def status = this.lastStatus
private def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
}
}
object GridEngineJobRunner extends Logging {
private val gridEngineSession = SessionFactory.getFactory.getSession
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
initGridEngine()
/**
@ -156,16 +144,14 @@ object GridEngineJobRunner extends Logging {
/**
* Updates the status of a list of jobs.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[GridEngineJobRunner]) {
def updateStatus(runners: Set[GridEngineJobRunner]) = {
var updatedRunners = Set.empty[GridEngineJobRunner]
gridEngineSession.synchronized {
runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner})
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
updatedRunners
}
/**
@ -219,20 +205,11 @@ object GridEngineJobRunner extends Logging {
logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de)
}
Option(returnStatus) match {
case Some(returnStatus) =>
runner.updateStatus(returnStatus)
return true
case None => return false
}
}
private def checkUnknownStatus(runner: GridEngineJobRunner) {
val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description))
if (returnStatus != null) {
runner.updateStatus(returnStatus)
true
} else {
false
}
}

View File

@ -34,6 +34,6 @@ class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] {
def runnerType = classOf[Lsf706JobRunner]
def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) }
override def updateStatus(runners: Set[Lsf706JobRunner]) = { Lsf706JobRunner.updateStatus(runners) }
override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
}

View File

@ -32,8 +32,8 @@ import org.broadinstitute.sting.utils.Utils
import org.broadinstitute.sting.jna.clibrary.LibC
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
import com.sun.jna.ptr.IntByReference
import com.sun.jna.{StringArray, NativeLong}
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
import com.sun.jna.{Structure, StringArray, NativeLong}
/**
* Runs jobs on an LSF compute cluster.
@ -45,12 +45,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
/** Job Id of the currently executing job. */
private var jobId = -1L
/** Last known status */
private var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
override def jobIdString = jobId.toString
/**
* Dispatches the function on the LSF cluster.
@ -85,12 +80,19 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
request.options |= LibBat.SUB_QUEUE
}
// If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
// If the resident set size is requested pass on the memory request
if (residentRequestMB.isDefined) {
val memInUnits = Lsf706JobRunner.convertUnits(residentRequestMB.get)
request.resReq = "select[mem>%1$d] rusage[mem=%1$d]".format(memInUnits)
request.options |= LibBat.SUB_RES_REQ
}
// If the resident set size limit is defined specify the memory limit
if (residentLimitMB.isDefined) {
val memInUnits = Lsf706JobRunner.convertUnits(residentLimitMB.get)
request.rLimits(LibLsf.LSF_RLIMIT_RSS) = memInUnits
}
// If the priority is set (user specified Int) specify the priority
if (function.jobPriority.isDefined) {
request.userPriority = function.jobPriority.get
@ -122,11 +124,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
}
}
def status = this.lastStatus
private def updateStatus(updatedStatus: RunnerStatus.Value) {
this.lastStatus = updatedStatus
this.lastStatusUpdate = System.currentTimeMillis
override def checkUnknownStatus() {
// TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs.
// Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(jobId))
super.checkUnknownStatus()
}
}
@ -137,17 +141,8 @@ object Lsf706JobRunner extends Logging {
/** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
private val retryExpiredSeconds = 5 * 60
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
initLsf()
/** The name of the default queue. */
private var defaultQueue: String = _
/** The run limits for each queue. */
private var queueRlimitRun = Map.empty[String,Int]
/**
* Initialize the Lsf library.
*/
@ -161,8 +156,9 @@ object Lsf706JobRunner extends Logging {
/**
* Bulk updates job statuses.
* @param runners Runners to update.
* @return runners which were updated.
*/
def updateStatus(runners: Set[Lsf706JobRunner]) {
def updateStatus(runners: Set[Lsf706JobRunner]) = {
var updatedRunners = Set.empty[Lsf706JobRunner]
Lsf706JobRunner.lsfLibLock.synchronized {
@ -192,70 +188,7 @@ object Lsf706JobRunner extends Logging {
}
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
}
/**
* Tries to stop any running jobs.
* @param runners Runners to stop.
*/
def tryStop(runners: Set[Lsf706JobRunner]) {
lsfLibLock.synchronized {
// lsb_killbulkjobs does not seem to forward SIGTERM,
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
for (runner <- runners.filterNot(_.jobId < 0)) {
try {
if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
} catch {
case e =>
logger.error("Unable to kill job " + runner.jobId, e)
}
}
}
}
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
private def getRlimitRun(queue: String) = {
lsfLibLock.synchronized {
if (queue == null) {
if (defaultQueue != null) {
queueRlimitRun(defaultQueue)
} else {
// Get the info on the default queue.
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
defaultQueue = queueInfo.queue
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += defaultQueue -> limit
limit
}
} else {
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array[String](queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue))
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
}
}
updatedRunners
}
private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) {
@ -280,20 +213,6 @@ object Lsf706JobRunner extends Logging {
)
}
private def checkUnknownStatus(runner: Lsf706JobRunner) {
// TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs.
// Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId))
val unknownStatusMillis = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read LSF status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), runner.jobId, runner.function.description))
}
}
/**
* Returns true if LSF is expected to retry running the function.
* @param exitInfo The reason the job exited.
@ -309,4 +228,86 @@ object Lsf706JobRunner extends Logging {
}
}
}
/**
* Tries to stop any running jobs.
* @param runners Runners to stop.
*/
def tryStop(runners: Set[Lsf706JobRunner]) {
lsfLibLock.synchronized {
// lsb_killbulkjobs does not seem to forward SIGTERM,
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
for (runner <- runners.filterNot(_.jobId < 0)) {
try {
if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
} catch {
case e =>
logger.error("Unable to kill job " + runner.jobId, e)
}
}
}
}
/** The name of the default queue. */
private lazy val defaultQueue: String = {
lsfLibLock.synchronized {
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
queueInfo.queue
}
}
/** The run limits for each queue. */
private var queueRlimitRun = Map.empty[String,Int]
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
private def getRlimitRun(queueName: String) = {
lsfLibLock.synchronized {
val queue = if (queueName == null) defaultQueue else queueName
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array(queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue))
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
}
}
private lazy val unitDivisor: Double = {
lsfLibLock.synchronized {
val unitsParam: Array[LibLsf.config_param] = new LibLsf.config_param().toArray(2).asInstanceOf[Array[LibLsf.config_param]]
unitsParam(0).paramName = "LSF_UNIT_FOR_LIMITS"
Structure.autoWrite(unitsParam.asInstanceOf[Array[Structure]])
if (LibLsf.ls_readconfenv(unitsParam(0), null) != 0)
throw new QException(LibBat.lsb_sperror("ls_readconfenv() failed"))
Structure.autoRead(unitsParam.asInstanceOf[Array[Structure]])
unitsParam(0).paramValue match {
case "MB" => 1D
case "GB" => 1024D
case "TB" => 1024D * 1024
case "PB" => 1024D * 1024 * 1024
case "EB" => 1024D * 1024 * 1024 * 1024
case null => 1D
}
}
}
private def convertUnits(mb: Double) = (mb / unitDivisor).ceil.toInt
}

View File

@ -50,10 +50,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu
// Allow advanced users to update the job.
updateJobRun(job)
runStatus = RunnerStatus.RUNNING
updateStatus(RunnerStatus.RUNNING)
job.run()
runStatus = RunnerStatus.DONE
updateStatus(RunnerStatus.FAILED)
}
def status = runStatus
override def checkUnknownStatus() {}
}

View File

@ -9,7 +9,7 @@ trait CommandLineFunction extends QFunction with Logging {
def commandLine: String
/** Upper memory limit */
var memoryLimit: Option[Int] = None
var memoryLimit: Option[Double] = None
/** Job project to run the command */
var jobProject: String = _
@ -56,7 +56,7 @@ trait CommandLineFunction extends QFunction with Logging {
if (memoryLimit.isEmpty)
memoryLimit = qSettings.memoryLimit
super.freezeFieldValues
super.freezeFieldValues()
}
/**

View File

@ -47,7 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
/**
* Memory limit for the java executable, or if None will use the default memoryLimit.
*/
var javaMemoryLimit: Option[Int] = None
var javaMemoryLimit: Option[Double] = None
/**
* Returns the java executable to run.
@ -61,8 +61,8 @@ trait JavaCommandLineFunction extends CommandLineFunction {
null
}
override def freezeFieldValues = {
super.freezeFieldValues
override def freezeFieldValues() {
super.freezeFieldValues()
if (javaMemoryLimit.isEmpty && memoryLimit.isDefined)
javaMemoryLimit = memoryLimit
@ -72,7 +72,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
}
def javaOpts = "%s -Djava.io.tmpdir=%s"
.format(optional(" -Xmx", javaMemoryLimit, "g"), jobTempDir)
.format(optional(" -Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m"), jobTempDir)
def commandLine = "java%s %s"
.format(javaOpts, javaExecutable)

View File

@ -29,7 +29,7 @@ import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec}
class HelloWorldPipelineTest {
@Test
def testHelloWorld {
def testHelloWorld() {
val spec = new PipelineTestSpec
spec.name = "HelloWorld"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala"
@ -37,15 +37,23 @@ class HelloWorldPipelineTest {
}
@Test
def testHelloWorldWithPrefix {
def testHelloWorldWithPrefix() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPrefix"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPrefix HelloWorld"
PipelineTest.executeTest(spec)
}
@Test
def testHelloWorldWithMemoryLimit() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPrefix"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -memLimit 1.25"
PipelineTest.executeTest(spec)
}
@Test(enabled=false)
def testHelloWorldWithPriority {
def testHelloWorldWithPriority() {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPriority"
spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPriority 100"