Moved some shutdown logic from the LSF job runner into the QGraph.

Because of Java's type erasure JobManagers must provide runtime access to the runner class to shutdown.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5076 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-01-25 20:28:54 +00:00
parent 6ac888d26a
commit ce5b11317b
6 changed files with 75 additions and 38 deletions

View File

@ -3,5 +3,7 @@ package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.InProcessFunction
class InProcessJobManager extends JobManager[InProcessFunction, InProcessRunner] {
def runnerType = classOf[InProcessRunner]
def functionType = classOf[InProcessFunction]
def create(function: InProcessFunction) = new InProcessRunner(function)
}

View File

@ -6,6 +6,21 @@ import org.broadinstitute.sting.queue.function.QFunction
* Creates and stops JobRunners
*/
trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] {
/** The class type of the runner. Available at runtime even after erasure. */
def functionType: Class[TFunction]
/** The class type of the functions processed by the runner. Available at runtime even after erasure. */
def runnerType: Class[TRunner]
/** Creates a new runner.
* @param function Function for the runner.
*/
def create(function: TFunction): TRunner
def tryStop(runners: List[JobRunner[_]]) = {}
/**
* Stops a list of functions.
* @param runner Runners to stop.
*/
def tryStop(runners: List[TRunner]) {
}
}

View File

@ -6,6 +6,8 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction
* Creates and stops Lsf706JobRunners
*/
class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner] {
def runnerType = classOf[Lsf706JobRunner]
def functionType = classOf[CommandLineFunction]
def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
override def tryStop(runners: List[JobRunner[_]]) = Lsf706JobRunner.tryStop(runners)
override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
}

View File

@ -182,32 +182,24 @@ object Lsf706JobRunner extends Logging {
* Tries to stop any running jobs.
* @param runners Runners to stop.
*/
def tryStop(runners: List[JobRunner[_]]) = {
val lsfJobRunners = runners.filter(_.isInstanceOf[Lsf706JobRunner]).map(_.asInstanceOf[Lsf706JobRunner])
if (lsfJobRunners.size > 0) {
for (jobRunners <- lsfJobRunners.filterNot(_.jobId < 0).grouped(10)) {
try {
val njobs = jobRunners.size
val signalJobs = new signalBulkJobs
signalJobs.jobs = {
val p = new Memory(8 * njobs)
p.write(0, jobRunners.map(_.jobId).toArray, 0, njobs)
p
}
signalJobs.njobs = njobs
signalJobs.signal = 9
def tryStop(runners: List[Lsf706JobRunner]) {
for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) {
try {
val njobs = jobRunners.size
val signalJobs = new signalBulkJobs
signalJobs.jobs = {
val jobIds = new Memory(8 * njobs)
jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs)
jobIds
}
signalJobs.njobs = njobs
signalJobs.signal = 9
if (LibBat.lsb_killbulkjobs(signalJobs) < 0)
throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed"))
} catch {
case e =>
logger.error("Unable to kill all jobs.", e)
}
try {
jobRunners.foreach(_.removeTemporaryFiles())
} catch {
case e => /* ignore */
}
if (LibBat.lsb_killbulkjobs(signalJobs) < 0)
throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed"))
} catch {
case e =>
logger.error("Unable to kill all jobs.", e)
}
}
}

View File

@ -177,14 +177,14 @@ class QGraph extends Logging {
case f: FunctionEdge =>
this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING
case _ => false
}.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_))
}.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_))
}
private def getRunningJobs = {
jobGraph.edgeSet.filter{
case f: FunctionEdge => f.status == RunnerStatus.RUNNING
case _ => false
}.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_))
}.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_))
}
/**
@ -238,7 +238,14 @@ class QGraph extends Logging {
*/
private def dryRunJobs() = {
updateGraphStatus(false)
traverseFunctions(edge => logEdge(edge))
var readyJobs = getReadyJobs
while (!shuttingDown && readyJobs.size > 0) {
readyJobs.foreach(edge => {
logEdge(edge)
edge.markAsDone
})
readyJobs = getReadyJobs
}
}
private def logEdge(edge: FunctionEdge) = {
@ -390,7 +397,7 @@ class QGraph extends Logging {
private def checkRetryJobs(failed: List[FunctionEdge]) = {
if (settings.retries > 0) {
for (failedJob <- failed) {
if (failedJob.retries < settings.retries) {
if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) {
failedJob.retries += 1
failedJob.resetToPending(true)
logger.info("Reset for retry attempt %d of %d: %s".format(
@ -660,10 +667,10 @@ class QGraph extends Logging {
*/
private def foreachFunction(f: (FunctionEdge) => Unit) = {
jobGraph.edgeSet.toList
.filter(_.isInstanceOf[FunctionEdge])
.map(_.asInstanceOf[FunctionEdge])
.sortWith(compare(_,_))
.foreach(f(_))
.filter(_.isInstanceOf[FunctionEdge])
.asInstanceOf[List[FunctionEdge]]
.sortWith(compare(_,_))
.foreach(f(_))
}
private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean =
@ -759,8 +766,25 @@ class QGraph extends Logging {
*/
def shutdown() {
shuttingDown = true
val runningJobs = getRunningJobs
if (commandLineManager != null && !runningJobs.isEmpty)
commandLineManager.tryStop(runningJobs.map(_.runner))
val runners = getRunningJobs.map(_.runner)
val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]]
if (manager != null) {
val managerRunners = runners
.filter(runner => manager.runnerType.isAssignableFrom(runner.getClass))
.asInstanceOf[List[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
manager.tryStop(managerRunners)
} catch {
case e => /* ignore */
}
}
runners.foreach(runner =>
try {
runner.removeTemporaryFiles()
} catch {
case e => /* ignore */
}
)
}
}

View File

@ -3,5 +3,7 @@ package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
class ShellJobManager extends JobManager[CommandLineFunction, ShellJobRunner] {
def runnerType = classOf[ShellJobRunner]
def functionType = classOf[CommandLineFunction]
def create(function: CommandLineFunction) = new ShellJobRunner(function)
}