From ce5b11317b8f8cdc2393b468f7102585ea046109 Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 25 Jan 2011 20:28:54 +0000 Subject: [PATCH] Moved some shutdown logic from the LSF job runner into the QGraph. Because of Java's type erasure JobManagers must provide runtime access to the runner class to shutdown. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5076 348d0f76-0448-11de-a6fe-93d51630548a --- .../queue/engine/InProcessJobManager.scala | 2 + .../sting/queue/engine/JobManager.scala | 17 ++++++- .../sting/queue/engine/Lsf706JobManager.scala | 4 +- .../sting/queue/engine/Lsf706JobRunner.scala | 42 +++++++---------- .../sting/queue/engine/QGraph.scala | 46 ++++++++++++++----- .../sting/queue/engine/ShellJobManager.scala | 2 + 6 files changed, 75 insertions(+), 38 deletions(-) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessJobManager.scala index 310881897..b327dec7d 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/InProcessJobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessJobManager.scala @@ -3,5 +3,7 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction class InProcessJobManager extends JobManager[InProcessFunction, InProcessRunner] { + def runnerType = classOf[InProcessRunner] + def functionType = classOf[InProcessFunction] def create(function: InProcessFunction) = new InProcessRunner(function) } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala index 9d48b5490..a55e7822b 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala @@ -6,6 +6,21 @@ import org.broadinstitute.sting.queue.function.QFunction * Creates and stops JobRunners */ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] { + /** The class type of the runner. Available at runtime even after erasure. */ + def functionType: Class[TFunction] + + /** The class type of the functions processed by the runner. Available at runtime even after erasure. */ + def runnerType: Class[TRunner] + + /** Creates a new runner. + * @param function Function for the runner. + */ def create(function: TFunction): TRunner - def tryStop(runners: List[JobRunner[_]]) = {} + + /** + * Stops a list of functions. + * @param runner Runners to stop. + */ + def tryStop(runners: List[TRunner]) { + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala index 7b8a3286f..de85b66b2 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala @@ -6,6 +6,8 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction * Creates and stops Lsf706JobRunners */ class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner] { + def runnerType = classOf[Lsf706JobRunner] + def functionType = classOf[CommandLineFunction] def create(function: CommandLineFunction) = new Lsf706JobRunner(function) - override def tryStop(runners: List[JobRunner[_]]) = Lsf706JobRunner.tryStop(runners) + override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index c9933a2ed..61dc04ae7 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -182,32 +182,24 @@ object Lsf706JobRunner extends Logging { * Tries to stop any running jobs. * @param runners Runners to stop. */ - def tryStop(runners: List[JobRunner[_]]) = { - val lsfJobRunners = runners.filter(_.isInstanceOf[Lsf706JobRunner]).map(_.asInstanceOf[Lsf706JobRunner]) - if (lsfJobRunners.size > 0) { - for (jobRunners <- lsfJobRunners.filterNot(_.jobId < 0).grouped(10)) { - try { - val njobs = jobRunners.size - val signalJobs = new signalBulkJobs - signalJobs.jobs = { - val p = new Memory(8 * njobs) - p.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) - p - } - signalJobs.njobs = njobs - signalJobs.signal = 9 + def tryStop(runners: List[Lsf706JobRunner]) { + for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { + try { + val njobs = jobRunners.size + val signalJobs = new signalBulkJobs + signalJobs.jobs = { + val jobIds = new Memory(8 * njobs) + jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) + jobIds + } + signalJobs.njobs = njobs + signalJobs.signal = 9 - if (LibBat.lsb_killbulkjobs(signalJobs) < 0) - throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) - } catch { - case e => - logger.error("Unable to kill all jobs.", e) - } - try { - jobRunners.foreach(_.removeTemporaryFiles()) - } catch { - case e => /* ignore */ - } + if (LibBat.lsb_killbulkjobs(signalJobs) < 0) + throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) + } catch { + case e => + logger.error("Unable to kill all jobs.", e) } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index d0838849c..99b14b49b 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -177,14 +177,14 @@ class QGraph extends Logging { case f: FunctionEdge => this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING case _ => false - }.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_)) + }.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_)) } private def getRunningJobs = { jobGraph.edgeSet.filter{ case f: FunctionEdge => f.status == RunnerStatus.RUNNING case _ => false - }.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_)) + }.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_)) } /** @@ -238,7 +238,14 @@ class QGraph extends Logging { */ private def dryRunJobs() = { updateGraphStatus(false) - traverseFunctions(edge => logEdge(edge)) + var readyJobs = getReadyJobs + while (!shuttingDown && readyJobs.size > 0) { + readyJobs.foreach(edge => { + logEdge(edge) + edge.markAsDone + }) + readyJobs = getReadyJobs + } } private def logEdge(edge: FunctionEdge) = { @@ -390,7 +397,7 @@ class QGraph extends Logging { private def checkRetryJobs(failed: List[FunctionEdge]) = { if (settings.retries > 0) { for (failedJob <- failed) { - if (failedJob.retries < settings.retries) { + if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) { failedJob.retries += 1 failedJob.resetToPending(true) logger.info("Reset for retry attempt %d of %d: %s".format( @@ -660,10 +667,10 @@ class QGraph extends Logging { */ private def foreachFunction(f: (FunctionEdge) => Unit) = { jobGraph.edgeSet.toList - .filter(_.isInstanceOf[FunctionEdge]) - .map(_.asInstanceOf[FunctionEdge]) - .sortWith(compare(_,_)) - .foreach(f(_)) + .filter(_.isInstanceOf[FunctionEdge]) + .asInstanceOf[List[FunctionEdge]] + .sortWith(compare(_,_)) + .foreach(f(_)) } private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean = @@ -759,8 +766,25 @@ class QGraph extends Logging { */ def shutdown() { shuttingDown = true - val runningJobs = getRunningJobs - if (commandLineManager != null && !runningJobs.isEmpty) - commandLineManager.tryStop(runningJobs.map(_.runner)) + val runners = getRunningJobs.map(_.runner) + val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] + if (manager != null) { + val managerRunners = runners + .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) + .asInstanceOf[List[JobRunner[QFunction]]] + if (managerRunners.size > 0) + try { + manager.tryStop(managerRunners) + } catch { + case e => /* ignore */ + } + } + runners.foreach(runner => + try { + runner.removeTemporaryFiles() + } catch { + case e => /* ignore */ + } + ) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala index 32fe8f18d..3561d44cf 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala @@ -3,5 +3,7 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.CommandLineFunction class ShellJobManager extends JobManager[CommandLineFunction, ShellJobRunner] { + def runnerType = classOf[ShellJobRunner] + def functionType = classOf[CommandLineFunction] def create(function: CommandLineFunction) = new ShellJobRunner(function) }