From c03341aec1d5c3d9bb08fd97ea7ff3ef9df8a82d Mon Sep 17 00:00:00 2001 From: kshakir Date: Mon, 7 Mar 2011 23:52:48 +0000 Subject: [PATCH] JobRunner can be specified on the command line. -bsub is currently short form of -jobRunner Lsf706. Added an empty wrapper for a GridEngine job runner which is only activated when SGE_ROOT is detected. Refactored a bit more common code into CommandLineJobRunner / JobRunner / FunctionEdge. Status for analyisNames now includes the number of functions in state. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5399 348d0f76-0448-11de-a6fe-93d51630548a --- build.xml | 15 +- .../queue/engine/CommandLineJobManager.scala | 34 ++++ .../queue/engine/CommandLineJobRunner.scala | 40 +++- .../engine/CommandLinePluginManager.scala | 32 +++ .../sting/queue/engine/FunctionEdge.scala | 9 +- .../sting/queue/engine/JobManager.scala | 28 ++- .../sting/queue/engine/JobRunner.scala | 36 +++- .../sting/queue/engine/Lsf706JobManager.scala | 15 -- .../sting/queue/engine/QGraph.scala | 138 ++++++++----- .../sting/queue/engine/QGraphSettings.scala | 31 ++- .../sting/queue/engine/ShellJobManager.scala | 9 - .../sting/queue/engine/ShellJobRunner.scala | 35 ---- .../gridengine/GridEngineJobManager.scala | 36 ++++ .../gridengine/GridEngineJobRunner.scala | 117 +++++++++++ .../queue/engine/lsf/Lsf706JobManager.scala | 39 ++++ .../engine/{ => lsf}/Lsf706JobRunner.scala | 183 ++++++++++-------- .../queue/engine/shell/ShellJobManager.scala | 33 ++++ .../queue/engine/shell/ShellJobRunner.scala | 59 ++++++ .../sting/queue/function/QFunction.scala | 6 +- 19 files changed, 684 insertions(+), 211 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala rename scala/src/org/broadinstitute/sting/queue/engine/{ => lsf}/Lsf706JobRunner.scala (77%) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala create mode 100755 scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala diff --git a/build.xml b/build.xml index 642291a04..b6ae61e7f 100644 --- a/build.xml +++ b/build.xml @@ -96,6 +96,7 @@ + - + + + + @@ -154,6 +159,12 @@ + + + + + + @@ -260,6 +271,7 @@ + @@ -530,6 +542,7 @@ + diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala new file mode 100644 index 000000000..d2eedb60f --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala @@ -0,0 +1,34 @@ +/* +* Copyright (c) 2011, The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +* OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.CommandLineFunction + +/** + * Creates and stops CommandLineJobRunners + */ +trait CommandLineJobManager[TRunner <: CommandLineJobRunner] extends JobManager[CommandLineFunction, TRunner] { + def functionType = classOf[CommandLineFunction] +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index 553116ce0..2fbfab5ec 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.CommandLineFunction @@ -10,15 +34,13 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils} trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { /** A generated exec shell script. */ - protected var exec: File = _ + protected var jobScript: File = _ /** Which directory to use for the job status files. */ protected def jobStatusDir = function.jobTempDir - /** - * Writes the function command line to an exec file. - */ - protected def writeExec() { + override def init() { + super.init() var exec = new StringBuilder var dirs = Set.empty[File] @@ -31,11 +53,11 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { } exec.append(function.commandLine) - this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) + this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) } - override def removeTemporaryFiles() { - super.removeTemporaryFiles() - IOUtils.tryDelete(exec) + override def cleanup() { + super.cleanup() + IOUtils.tryDelete(jobScript) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala new file mode 100644 index 000000000..4fb6d1ee1 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.utils.classloader.PluginManager + +class CommandLinePluginManager extends + PluginManager[CommandLineJobManager[CommandLineJobRunner]]( + classOf[CommandLineJobManager[CommandLineJobRunner]], "JobManager", "JobManager") { +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 53a555aa6..68bc7ae61 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -52,12 +52,13 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod function.deleteOutputs() function.mkOutputDirectories() + runner.init() runner.start() } catch { case e => currentStatus = RunnerStatus.FAILED try { - runner.removeTemporaryFiles() + runner.cleanup() function.failOutputs.foreach(_.createNewFile()) writeStackTrace(e) } catch { @@ -78,7 +79,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod if (currentStatus == RunnerStatus.FAILED) { try { - runner.removeTemporaryFiles() + runner.cleanup() function.failOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the error handler */ @@ -87,7 +88,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod tailError() } else if (currentStatus == RunnerStatus.DONE) { try { - runner.removeTemporaryFiles() + runner.cleanup() function.doneOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the done handler */ @@ -98,7 +99,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod case e => currentStatus = RunnerStatus.FAILED try { - runner.removeTemporaryFiles() + runner.cleanup() function.failOutputs.foreach(_.createNewFile()) writeStackTrace(e) } catch { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala index b8a4dd5a7..d2be4939a 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.QFunction @@ -21,13 +45,13 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] { * Updates the status on a list of functions. * @param runners Runners to update. */ - def updateStatus(runners: List[TRunner]) { + def updateStatus(runners: Set[TRunner]) { } /** * Stops a list of functions. * @param runners Runners to stop. */ - def tryStop(runners: List[TRunner]) { + def tryStop(runners: Set[TRunner]) { } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 138326750..4b4d44988 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.QFunction @@ -6,6 +30,13 @@ import org.broadinstitute.sting.queue.function.QFunction * Base interface for job runners. */ trait JobRunner[TFunction <: QFunction] { + + /** + * Initializes this job. + */ + def init() { + } + /** * Runs the function. * After the function returns the status of the function should @@ -27,9 +58,10 @@ trait JobRunner[TFunction <: QFunction] { def function: TFunction /** - * Removes all temporary files used for this job. + * Cleans up after the function is run. + * For example removing all temporary files. */ - def removeTemporaryFiles() { + def cleanup() { } /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala deleted file mode 100644 index b0809ddd2..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala +++ /dev/null @@ -1,15 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.function.CommandLineFunction - -/** - * Creates and stops Lsf706JobRunners - */ -class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner] { - def runnerType = classOf[Lsf706JobRunner] - def functionType = classOf[CommandLineFunction] - def create(function: CommandLineFunction) = new Lsf706JobRunner(function) - - override def updateStatus(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) } - override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 51df7267d..7210deefc 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.engine import org.jgrapht.traverse.TopologicalOrderIterator @@ -10,10 +34,10 @@ import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} -import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.apache.commons.lang.StringUtils import org.broadinstitute.sting.queue.util._ import collection.immutable.{TreeSet, TreeMap} +import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction} /** * The internal dependency tracker between sets of function input and output files. @@ -39,9 +63,9 @@ class QGraph extends Logging { private val nl = "%n".format() + private val commandLinePluginManager = new CommandLinePluginManager + private var commandLineManager: CommandLineJobManager[CommandLineJobRunner] = _ private val inProcessManager = new InProcessJobManager - private var commandLineManager: JobManager[CommandLineFunction, _<:JobRunner[CommandLineFunction]] = _ - private def managers = List[Any](inProcessManager, commandLineManager) /** @@ -82,6 +106,10 @@ class QGraph extends Logging { logStatus() } else if (this.dryRun) { dryRunJobs() + if (running && isReady) { + logger.info("Dry run completed successfully!") + logger.info("Re-run with \"-run\" to execute the functions.") + } } else if (isReady) { logger.info("Running jobs.") runJobs() @@ -90,11 +118,6 @@ class QGraph extends Logging { if (numMissingValues > 0) { logger.error("Total missing values: " + numMissingValues) } - - if (running && isReady && this.dryRun) { - logger.info("Dry run completed successfully!") - logger.info("Re-run with \"-run\" to execute the functions.") - } } } } @@ -106,7 +129,7 @@ class QGraph extends Logging { renderToDot(settings.dotFile) validate() - if (running && numMissingValues == 0 && settings.bsubAllJobs) { + if (running && numMissingValues == 0) { logger.info("Generating scatter gather jobs.") val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) @@ -302,10 +325,11 @@ class QGraph extends Logging { */ private def runJobs() { try { - if (settings.bsubAllJobs) - commandLineManager = new Lsf706JobManager - else - commandLineManager = new ShellJobManager + if (settings.bsub) + settings.jobRunner = "Lsf706" + else if (settings.jobRunner == null) + settings.jobRunner = "Shell" + commandLineManager = commandLinePluginManager.createByName(settings.jobRunner) if (settings.startFromScratch) { logger.info("Removing outputs from previous runs.") @@ -529,15 +553,20 @@ class QGraph extends Logging { * Tracks analysis status. */ private class AnalysisStatus(val analysisName: String) { - var status = RunnerStatus.PENDING - var scatter = new ScatterGatherStatus - var gather = new ScatterGatherStatus + val jobs = new GroupStatus + val scatter = new GroupStatus + val gather = new GroupStatus + + def total = jobs.total + scatter.total + gather.total + def done = jobs.done + scatter.done + gather.done + def failed = jobs.failed + scatter.failed + gather.failed + def skipped = jobs.skipped + scatter.skipped + gather.skipped } /** - * Tracks scatter gather status. + * Tracks status of a group of jobs. */ - private class ScatterGatherStatus { + private class GroupStatus { var total = 0 var done = 0 var failed = 0 @@ -574,30 +603,33 @@ class QGraph extends Logging { }) statuses.foreach(status => { - val sgTotal = status.scatter.total + status.gather.total - val sgDone = status.scatter.done + status.gather.done - val sgFailed = status.scatter.failed + status.gather.failed - val sgSkipped = status.scatter.skipped + status.gather.skipped + val total = status.total + val done = status.done + val failed = status.failed + val skipped = status.skipped + val jobsTotal = status.jobs.total + val jobsDone = status.jobs.done val gatherTotal = status.gather.total val gatherDone = status.gather.done - if (sgTotal > 0) { - var sgStatus = RunnerStatus.PENDING - if (sgFailed > 0) - sgStatus = RunnerStatus.FAILED - else if (gatherDone == gatherTotal) - sgStatus = RunnerStatus.DONE - else if (sgDone + sgSkipped == sgTotal) - sgStatus = RunnerStatus.SKIPPED - else if (sgDone > 0) - sgStatus = RunnerStatus.RUNNING - status.status = sgStatus - } - var info = ("%-" + maxWidth + "s [%s]") - .format(status.analysisName, StringUtils.center(status.status.toString, 7)) + var summaryStatus = RunnerStatus.PENDING + if (failed > 0) + summaryStatus = RunnerStatus.FAILED + else if (gatherDone == gatherTotal && jobsDone == jobsTotal) + summaryStatus = RunnerStatus.DONE + else if (done + skipped == total) + summaryStatus = RunnerStatus.SKIPPED + else if (done > 0) + summaryStatus = RunnerStatus.RUNNING + + var info = ("%-" + maxWidth + "s %7s") + .format(status.analysisName, "[" + summaryStatus.toString + "]") + if (status.jobs.total > 1) { + info += formatGroupStatus(status.jobs) + } if (status.scatter.total + status.gather.total > 1) { - info += formatSGStatus(status.scatter, "s") - info += formatSGStatus(status.gather, "g") + info += formatGroupStatus(status.scatter, "s:") + info += formatGroupStatus(status.gather, "g:") } statusFunc(info) }) @@ -607,21 +639,23 @@ class QGraph extends Logging { * Updates a status map with scatter/gather status information (e.g. counts) */ private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) { - if (edge.function.isInstanceOf[GatherFunction]) { - updateSGStatus(stats.gather, edge) + if (edge.function.isInstanceOf[ScatterFunction]) { + updateGroupStatus(stats.scatter, edge) } else if (edge.function.isInstanceOf[CloneFunction]) { - updateSGStatus(stats.scatter, edge) + updateGroupStatus(stats.scatter, edge) + } else if (edge.function.isInstanceOf[GatherFunction]) { + updateGroupStatus(stats.gather, edge) } else { - stats.status = edge.status + updateGroupStatus(stats.jobs, edge) } } - private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) { - stats.total += 1 + private def updateGroupStatus(groupStatus: GroupStatus, edge: FunctionEdge) { + groupStatus.total += 1 edge.status match { - case RunnerStatus.DONE => stats.done += 1 - case RunnerStatus.FAILED => stats.failed += 1 - case RunnerStatus.SKIPPED => stats.skipped += 1 + case RunnerStatus.DONE => groupStatus.done += 1 + case RunnerStatus.FAILED => groupStatus.failed += 1 + case RunnerStatus.SKIPPED => groupStatus.skipped += 1 /* can't tell the difference between pending and running right now! */ case RunnerStatus.PENDING => case RunnerStatus.RUNNING => @@ -631,8 +665,8 @@ class QGraph extends Logging { /** * Formats a status into nice strings */ - private def formatSGStatus(stats: ScatterGatherStatus, prefix: String) = { - " %s:%dt/%dd/%df".format( + private def formatGroupStatus(stats: GroupStatus, prefix: String = "") = { + " %s%dt/%dd/%df".format( prefix, stats.total, stats.done, stats.failed) } @@ -815,7 +849,7 @@ class QGraph extends Logging { .asInstanceOf[Set[JobRunner[QFunction]]] if (managerRunners.size > 0) try { - manager.updateStatus(managerRunners.toList) + manager.updateStatus(managerRunners) } catch { case e => /* ignore */ } @@ -846,13 +880,13 @@ class QGraph extends Logging { .asInstanceOf[Set[JobRunner[QFunction]]] if (managerRunners.size > 0) try { - manager.tryStop(managerRunners.toList) + manager.tryStop(managerRunners) } catch { case e => /* ignore */ } for (runner <- managerRunners) { try { - runner.removeTemporaryFiles() + runner.cleanup() } catch { case e => /* ignore */ } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 0056e1a2b..650599377 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.engine import java.io.File @@ -12,8 +36,11 @@ class QGraphSettings { @ArgumentCollection val qSettings = new QSettings - @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false) - var bsubAllJobs = false + @Argument(fullName="job_runner", shortName="jobRunner", doc="Use the specified job runner to dispatch command line jobs", required=false) + var jobRunner: String = _ + + @Argument(fullName="bsub", shortName="bsub", doc="Equivalent to -jobRunner Lsf706", required=false) + var bsub = false @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) var run = false diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala deleted file mode 100644 index 3561d44cf..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala +++ /dev/null @@ -1,9 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.function.CommandLineFunction - -class ShellJobManager extends JobManager[CommandLineFunction, ShellJobRunner] { - def runnerType = classOf[ShellJobRunner] - def functionType = classOf[CommandLineFunction] - def create(function: CommandLineFunction) = new ShellJobRunner(function) -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala deleted file mode 100755 index 5aebebb66..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ /dev/null @@ -1,35 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.queue.util.ShellJob - -/** - * Runs jobs one at a time locally - */ -class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner { - private var runStatus: RunnerStatus.Value = _ - - /** - * Runs the function on the local shell. - * @param function Command to run. - */ - def start() { - val job = new ShellJob - - job.workingDir = function.commandDirectory - job.outputFile = function.jobOutputFile - job.errorFile = function.jobErrorFile - - writeExec() - job.shellScript = exec - - // Allow advanced users to update the job. - updateJobRun(job) - - runStatus = RunnerStatus.RUNNING - job.run() - runStatus = RunnerStatus.DONE - } - - def status = runStatus -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala new file mode 100644 index 000000000..78bd2cc78 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.gridengine + +import org.broadinstitute.sting.queue.engine.CommandLineJobManager +import org.broadinstitute.sting.queue.function.CommandLineFunction + +class GridEngineJobManager extends CommandLineJobManager[GridEngineJobRunner] { + def runnerType = classOf[GridEngineJobRunner] + def create(function: CommandLineFunction) = new GridEngineJobRunner(function) + + override def updateStatus(runners: Set[GridEngineJobRunner]) = { GridEngineJobRunner.updateStatus(runners) } + override def tryStop(runners: Set[GridEngineJobRunner]) { GridEngineJobRunner.tryStop(runners) } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala new file mode 100644 index 000000000..39870e786 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.gridengine + +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} + +class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { + // Run the static initializer for GridEngineJobRunner + GridEngineJobRunner + + def start() = { + // TODO: Copy settings from function to GridEngine syntax. + /* + val gridEngineJob = new ... + + // Set the display name to 4000 characters of the description (or whatever the GE max is) + gridEngineJob.displayName = function.description.take(4000) + + // Set the output file for stdout + gridEngineJob.outputFile = function.jobOutputFile.getPath + + // Set the current working directory + gridEngineJob.workingDirectory = function.commandDirectory.getPath + + // If the error file is set specify the separate output for stderr + if (function.jobErrorFile != null) { + gridEngineJob.errFile = function.jobErrorFile.getPath + } + + // If a project name is set specify the project name + if (function.jobProject != null) { + gridEngineJob.projectName = function.jobProject + } + + // If the job queue is set specify the job queue + if (function.jobQueue != null) { + gridEngineJob.queue = function.jobQueue + } + + // If the memory limit is set (GB) specify the memory limit + if (function.memoryLimit.isDefined) { + gridEngineJob.jobMemoryLimit = function.memoryLimit.get + "GB" + } + + // If the priority is set (user specified Int) specify the priority + if (function.jobPriority.isDefined) { + gridEngineJob.jobPriority = function.jobPriority.get + } + + // Instead of running the function.commandLine, run "sh " + gridEngineJob.command = "sh " + jobScript + + // Store the status so it can be returned in the status method. + myStatus = RunnerStatus.RUNNING + + // Start the job and store the id so it can be killed in tryStop + myJobId = gridEngineJob.start() + */ + + logger.warn("TODO: implement Grid Engine support") + } + + // TODO: Return the latest status: RUNNING, FAILED, or DONE + def status = throw new RuntimeException("TODO: Grid Engine return status such as: " + RunnerStatus.FAILED) +} + +object GridEngineJobRunner extends Logging { + initGridEngine() + + /** + * Initialize the Grid Engine library. + */ + private def initGridEngine() { + // TODO: Init + logger.warn("TODO Grid Engine: Initialize here.") + } + + /** + * Updates the status of a list of jobs. + * @param runners Runners to update. + */ + def updateStatus(runners: Set[GridEngineJobRunner]) { + // TODO: Bulk update. If not possible this method can be removed here and in GridEngineJobManager. + } + + /** + * Tries to stop any running jobs. + * @param runners Runners to stop. + */ + def tryStop(runners: Set[GridEngineJobRunner]) { + // TODO: Stop runners. SIGTERM(15) is preferred to SIGKILL(9). + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala new file mode 100644 index 000000000..c0fff9125 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.lsf + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.CommandLineJobManager + +/** + * Creates and stops Lsf706JobRunners + */ +class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] { + def runnerType = classOf[Lsf706JobRunner] + def create(function: CommandLineFunction) = new Lsf706JobRunner(function) + + override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) } + override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala similarity index 77% rename from scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala rename to scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index d4bead82d..b7783c902 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -1,6 +1,29 @@ -package org.broadinstitute.sting.queue.engine +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.lsf -import java.io.File import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.QException @@ -10,6 +33,7 @@ import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference import com.sun.jna.{StringArray, NativeLong} +import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} /** * Runs jobs on an LSF compute cluster. @@ -38,50 +62,55 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) request.rLimits(i) = LibLsf.DEFAULT_RLIMIT; + // Set the display name of the job to the first 4000 characters + request.jobName = function.description.take(4000) + request.options |= LibBat.SUB_JOB_NAME + + // Set the output file for stdout request.outFile = function.jobOutputFile.getPath request.options |= LibBat.SUB_OUT_FILE + // Set the current working directory + request.cwd = function.commandDirectory.getPath + request.options3 |= LibBat.SUB3_CWD + + // If the error file is set specify the separate output for stderr if (function.jobErrorFile != null) { request.errFile = function.jobErrorFile.getPath request.options |= LibBat.SUB_ERR_FILE } + // If a project name is set specify the project name if (function.jobProject != null) { request.projectName = function.jobProject request.options |= LibBat.SUB_PROJECT_NAME } + // If the job queue is set specify the job queue if (function.jobQueue != null) { request.queue = function.jobQueue request.options |= LibBat.SUB_QUEUE } - if (IOUtils.absolute(new File(".")) != function.commandDirectory) { - request.cwd = function.commandDirectory.getPath - request.options3 |= LibBat.SUB3_CWD - } - + // If the memory limit is set (GB) specify the memory limit if (function.memoryLimit.isDefined) { request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" request.options |= LibBat.SUB_RES_REQ } - if (function.description != null) { - request.jobName = function.description.take(1000) - request.options |= LibBat.SUB_JOB_NAME - } - + // If the priority is set (user specified Int) specify the priority if (function.jobPriority.isDefined) { request.userPriority = function.jobPriority.get request.options2 |= LibBat.SUB2_JOB_PRIORITY } + // LSF specific: get the max runtime for the jobQueue and pass it for this job request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) - writeExec() - request.command = "sh " + exec + // Run the command as sh + request.command = "sh " + jobScript - // Allow advanced users to update the request. + // Allow advanced users to update the request via QFunction.updateJobRun() updateJobRun(request) updateStatus(RunnerStatus.RUNNING) @@ -113,7 +142,7 @@ object Lsf706JobRunner extends Logging { /** Amount of time a job can go without status before giving up. */ private val unknownStatusMaxSeconds = 5 * 60 - init() + initLsf() /** The name of the default queue. */ private var defaultQueue: String = _ @@ -124,20 +153,80 @@ object Lsf706JobRunner extends Logging { /** * Initialize the Lsf library. */ - private def init() = { + private def initLsf() { lsfLibLock.synchronized { if (LibBat.lsb_init("Queue") < 0) throw new QException(LibBat.lsb_sperror("lsb_init() failed")) } } + /** + * Bulk updates job statuses. + * @param runners Runners to update. + */ + def updateStatus(runners: Set[Lsf706JobRunner]) { + var updatedRunners = Set.empty[Lsf706JobRunner] + + Lsf706JobRunner.lsfLibLock.synchronized { + val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB) + if (result < 0) { + logger.error(LibBat.lsb_sperror("Unable to check LSF job info")) + } else { + try { + val more = new IntByReference(result) + while (more.getValue > 0) { + val jobInfo = LibBat.lsb_readjobinfo(more) + if (jobInfo == null) { + logger.error(LibBat.lsb_sperror("Unable to read LSF job info")) + more.setValue(0) + } else { + runners.find(runner => runner.jobId == jobInfo.jobId) match { + case Some(runner) => + updateRunnerStatus(runner, jobInfo) + updatedRunners += runner + case None => /* not our job */ + } + } + } + } finally { + LibBat.lsb_closejobinfo() + } + } + } + + for (runner <- runners.diff(updatedRunners)) { + checkUnknownStatus(runner) + } + } + + /** + * Tries to stop any running jobs. + * @param runners Runners to stop. + */ + def tryStop(runners: Set[Lsf706JobRunner]) { + lsfLibLock.synchronized { + // lsb_killbulkjobs does not seem to forward SIGTERM, + // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. + for (runner <- runners.filterNot(_.jobId < 0)) { + try { + if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) + } catch { + case e => + logger.error("Unable to kill job " + runner.jobId, e) + } + } + } + } + + /** * Returns the run limit in seconds for the queue. * If the queue name is null returns the length of the default queue. * @param queue Name of the queue or null for the default queue. * @return the run limit in seconds for the queue. */ - def getRlimitRun(queue: String) = { + private def getRlimitRun(queue: String) = { lsfLibLock.synchronized { if (queue == null) { if (defaultQueue != null) { @@ -171,64 +260,6 @@ object Lsf706JobRunner extends Logging { } } - /** - * Updates the status of a list of jobs. - */ - def updateStatus(runners: List[Lsf706JobRunner]) { - var updatedRunners = List.empty[Lsf706JobRunner] - - Lsf706JobRunner.lsfLibLock.synchronized { - val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB) - if (result < 0) { - logger.error(LibBat.lsb_sperror("Unable to check LSF job info")) - } else { - try { - val more = new IntByReference(result) - while (more.getValue > 0) { - val jobInfo = LibBat.lsb_readjobinfo(more) - if (jobInfo == null) { - logger.error(LibBat.lsb_sperror("Unable to read LSF job info")) - more.setValue(0) - } else { - runners.find(runner => runner.jobId == jobInfo.jobId) match { - case Some(runner) => - updateRunnerStatus(runner, jobInfo) - updatedRunners :+= runner - case None => /* not our job */ - } - } - } - } finally { - LibBat.lsb_closejobinfo() - } - } - } - - for (runner <- runners.diff(updatedRunners)) { - checkUnknownStatus(runner) - } - } - - /** - * Tries to stop any running jobs. - * @param runners Runners to stop. - */ - def tryStop(runners: List[Lsf706JobRunner]) { - lsfLibLock.synchronized { - // lsb_killbulkjobs does not seem to forward SIGTERM, - // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. - for (runner <- runners.filterNot(_.jobId < 0)) { - try { - if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) - logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) - } catch { - case e => - logger.error("Unable to kill job " + runner.jobId, e) - } - } - } - } - private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) { val jobStatus = jobInfo.status val exitStatus = jobInfo.exitStatus diff --git a/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala new file mode 100644 index 000000000..c5c8d719c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.shell + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.CommandLineJobManager + +class ShellJobManager extends CommandLineJobManager[ShellJobRunner] { + def runnerType = classOf[ShellJobRunner] + def create(function: CommandLineFunction) = new ShellJobRunner(function) +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala new file mode 100755 index 000000000..603511a30 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine.shell + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util.ShellJob +import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} + +/** + * Runs jobs one at a time locally + */ +class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner { + private var runStatus: RunnerStatus.Value = _ + + /** + * Runs the function on the local shell. + * @param function Command to run. + */ + def start() { + val job = new ShellJob + + job.workingDir = function.commandDirectory + job.outputFile = function.jobOutputFile + job.errorFile = function.jobErrorFile + + job.shellScript = jobScript + + // Allow advanced users to update the job. + updateJobRun(job) + + runStatus = RunnerStatus.RUNNING + job.run() + runStatus = RunnerStatus.DONE + } + + def status = runStatus +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 707ae8f13..8ba81ea73 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -14,10 +14,8 @@ import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, R * Inputs are matched to other outputs by using .equals() */ trait QFunction extends Logging { - /** - * Analysis function name - */ - var analysisName: String = _ + /** A short description of this step in the graph */ + var analysisName: String = "" /** Prefix for automatic job name creation */ var jobNamePrefix: String = _