diff --git a/build.xml b/build.xml
index 642291a04..b6ae61e7f 100644
--- a/build.xml
+++ b/build.xml
@@ -96,6 +96,7 @@
+
-
+
+
+
+
@@ -154,6 +159,12 @@
+
+
+
+
+
+
@@ -260,6 +271,7 @@
+
@@ -530,6 +542,7 @@
+
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala
new file mode 100644
index 000000000..d2eedb60f
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobManager.scala
@@ -0,0 +1,34 @@
+/*
+* Copyright (c) 2011, The Broad Institute
+*
+* Permission is hereby granted, free of charge, to any person
+* obtaining a copy of this software and associated documentation
+* files (the "Software"), to deal in the Software without
+* restriction, including without limitation the rights to use,
+* copy, modify, merge, publish, distribute, sublicense, and/or sell
+* copies of the Software, and to permit persons to whom the
+* Software is furnished to do so, subject to the following
+* conditions:
+*
+* The above copyright notice and this permission notice shall be
+* included in all copies or substantial portions of the Software.
+* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+* OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+package org.broadinstitute.sting.queue.engine
+
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+
+/**
+ * Creates and stops CommandLineJobRunners
+ */
+trait CommandLineJobManager[TRunner <: CommandLineJobRunner] extends JobManager[CommandLineFunction, TRunner] {
+ def functionType = classOf[CommandLineFunction]
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
index 553116ce0..2fbfab5ec 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
@@ -1,3 +1,27 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
@@ -10,15 +34,13 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** A generated exec shell script. */
- protected var exec: File = _
+ protected var jobScript: File = _
/** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir
- /**
- * Writes the function command line to an exec file.
- */
- protected def writeExec() {
+ override def init() {
+ super.init()
var exec = new StringBuilder
var dirs = Set.empty[File]
@@ -31,11 +53,11 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
}
exec.append(function.commandLine)
- this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
+ this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
}
- override def removeTemporaryFiles() {
- super.removeTemporaryFiles()
- IOUtils.tryDelete(exec)
+ override def cleanup() {
+ super.cleanup()
+ IOUtils.tryDelete(jobScript)
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala
new file mode 100644
index 000000000..4fb6d1ee1
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLinePluginManager.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine
+
+import org.broadinstitute.sting.utils.classloader.PluginManager
+
+class CommandLinePluginManager extends
+ PluginManager[CommandLineJobManager[CommandLineJobRunner]](
+ classOf[CommandLineJobManager[CommandLineJobRunner]], "JobManager", "JobManager") {
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala
index 53a555aa6..68bc7ae61 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala
@@ -52,12 +52,13 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
function.deleteOutputs()
function.mkOutputDirectories()
+ runner.init()
runner.start()
} catch {
case e =>
currentStatus = RunnerStatus.FAILED
try {
- runner.removeTemporaryFiles()
+ runner.cleanup()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
@@ -78,7 +79,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
if (currentStatus == RunnerStatus.FAILED) {
try {
- runner.removeTemporaryFiles()
+ runner.cleanup()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
@@ -87,7 +88,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
tailError()
} else if (currentStatus == RunnerStatus.DONE) {
try {
- runner.removeTemporaryFiles()
+ runner.cleanup()
function.doneOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the done handler */
@@ -98,7 +99,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
case e =>
currentStatus = RunnerStatus.FAILED
try {
- runner.removeTemporaryFiles()
+ runner.cleanup()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala
index b8a4dd5a7..d2be4939a 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala
@@ -1,3 +1,27 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.QFunction
@@ -21,13 +45,13 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] {
* Updates the status on a list of functions.
* @param runners Runners to update.
*/
- def updateStatus(runners: List[TRunner]) {
+ def updateStatus(runners: Set[TRunner]) {
}
/**
* Stops a list of functions.
* @param runners Runners to stop.
*/
- def tryStop(runners: List[TRunner]) {
+ def tryStop(runners: Set[TRunner]) {
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala
index 138326750..4b4d44988 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala
@@ -1,3 +1,27 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.QFunction
@@ -6,6 +30,13 @@ import org.broadinstitute.sting.queue.function.QFunction
* Base interface for job runners.
*/
trait JobRunner[TFunction <: QFunction] {
+
+ /**
+ * Initializes this job.
+ */
+ def init() {
+ }
+
/**
* Runs the function.
* After the function returns the status of the function should
@@ -27,9 +58,10 @@ trait JobRunner[TFunction <: QFunction] {
def function: TFunction
/**
- * Removes all temporary files used for this job.
+ * Cleans up after the function is run.
+ * For example removing all temporary files.
*/
- def removeTemporaryFiles() {
+ def cleanup() {
}
/**
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala
deleted file mode 100644
index b0809ddd2..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-import org.broadinstitute.sting.queue.function.CommandLineFunction
-
-/**
- * Creates and stops Lsf706JobRunners
- */
-class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner] {
- def runnerType = classOf[Lsf706JobRunner]
- def functionType = classOf[CommandLineFunction]
- def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
-
- override def updateStatus(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) }
- override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
index 51df7267d..7210deefc 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
@@ -1,3 +1,27 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.queue.engine
import org.jgrapht.traverse.TopologicalOrderIterator
@@ -10,10 +34,10 @@ import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
-import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.util._
import collection.immutable.{TreeSet, TreeMap}
+import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction}
/**
* The internal dependency tracker between sets of function input and output files.
@@ -39,9 +63,9 @@ class QGraph extends Logging {
private val nl = "%n".format()
+ private val commandLinePluginManager = new CommandLinePluginManager
+ private var commandLineManager: CommandLineJobManager[CommandLineJobRunner] = _
private val inProcessManager = new InProcessJobManager
- private var commandLineManager: JobManager[CommandLineFunction, _<:JobRunner[CommandLineFunction]] = _
-
private def managers = List[Any](inProcessManager, commandLineManager)
/**
@@ -82,6 +106,10 @@ class QGraph extends Logging {
logStatus()
} else if (this.dryRun) {
dryRunJobs()
+ if (running && isReady) {
+ logger.info("Dry run completed successfully!")
+ logger.info("Re-run with \"-run\" to execute the functions.")
+ }
} else if (isReady) {
logger.info("Running jobs.")
runJobs()
@@ -90,11 +118,6 @@ class QGraph extends Logging {
if (numMissingValues > 0) {
logger.error("Total missing values: " + numMissingValues)
}
-
- if (running && isReady && this.dryRun) {
- logger.info("Dry run completed successfully!")
- logger.info("Re-run with \"-run\" to execute the functions.")
- }
}
}
}
@@ -106,7 +129,7 @@ class QGraph extends Logging {
renderToDot(settings.dotFile)
validate()
- if (running && numMissingValues == 0 && settings.bsubAllJobs) {
+ if (running && numMissingValues == 0) {
logger.info("Generating scatter gather jobs.")
val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge))
@@ -302,10 +325,11 @@ class QGraph extends Logging {
*/
private def runJobs() {
try {
- if (settings.bsubAllJobs)
- commandLineManager = new Lsf706JobManager
- else
- commandLineManager = new ShellJobManager
+ if (settings.bsub)
+ settings.jobRunner = "Lsf706"
+ else if (settings.jobRunner == null)
+ settings.jobRunner = "Shell"
+ commandLineManager = commandLinePluginManager.createByName(settings.jobRunner)
if (settings.startFromScratch) {
logger.info("Removing outputs from previous runs.")
@@ -529,15 +553,20 @@ class QGraph extends Logging {
* Tracks analysis status.
*/
private class AnalysisStatus(val analysisName: String) {
- var status = RunnerStatus.PENDING
- var scatter = new ScatterGatherStatus
- var gather = new ScatterGatherStatus
+ val jobs = new GroupStatus
+ val scatter = new GroupStatus
+ val gather = new GroupStatus
+
+ def total = jobs.total + scatter.total + gather.total
+ def done = jobs.done + scatter.done + gather.done
+ def failed = jobs.failed + scatter.failed + gather.failed
+ def skipped = jobs.skipped + scatter.skipped + gather.skipped
}
/**
- * Tracks scatter gather status.
+ * Tracks status of a group of jobs.
*/
- private class ScatterGatherStatus {
+ private class GroupStatus {
var total = 0
var done = 0
var failed = 0
@@ -574,30 +603,33 @@ class QGraph extends Logging {
})
statuses.foreach(status => {
- val sgTotal = status.scatter.total + status.gather.total
- val sgDone = status.scatter.done + status.gather.done
- val sgFailed = status.scatter.failed + status.gather.failed
- val sgSkipped = status.scatter.skipped + status.gather.skipped
+ val total = status.total
+ val done = status.done
+ val failed = status.failed
+ val skipped = status.skipped
+ val jobsTotal = status.jobs.total
+ val jobsDone = status.jobs.done
val gatherTotal = status.gather.total
val gatherDone = status.gather.done
- if (sgTotal > 0) {
- var sgStatus = RunnerStatus.PENDING
- if (sgFailed > 0)
- sgStatus = RunnerStatus.FAILED
- else if (gatherDone == gatherTotal)
- sgStatus = RunnerStatus.DONE
- else if (sgDone + sgSkipped == sgTotal)
- sgStatus = RunnerStatus.SKIPPED
- else if (sgDone > 0)
- sgStatus = RunnerStatus.RUNNING
- status.status = sgStatus
- }
- var info = ("%-" + maxWidth + "s [%s]")
- .format(status.analysisName, StringUtils.center(status.status.toString, 7))
+ var summaryStatus = RunnerStatus.PENDING
+ if (failed > 0)
+ summaryStatus = RunnerStatus.FAILED
+ else if (gatherDone == gatherTotal && jobsDone == jobsTotal)
+ summaryStatus = RunnerStatus.DONE
+ else if (done + skipped == total)
+ summaryStatus = RunnerStatus.SKIPPED
+ else if (done > 0)
+ summaryStatus = RunnerStatus.RUNNING
+
+ var info = ("%-" + maxWidth + "s %7s")
+ .format(status.analysisName, "[" + summaryStatus.toString + "]")
+ if (status.jobs.total > 1) {
+ info += formatGroupStatus(status.jobs)
+ }
if (status.scatter.total + status.gather.total > 1) {
- info += formatSGStatus(status.scatter, "s")
- info += formatSGStatus(status.gather, "g")
+ info += formatGroupStatus(status.scatter, "s:")
+ info += formatGroupStatus(status.gather, "g:")
}
statusFunc(info)
})
@@ -607,21 +639,23 @@ class QGraph extends Logging {
* Updates a status map with scatter/gather status information (e.g. counts)
*/
private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) {
- if (edge.function.isInstanceOf[GatherFunction]) {
- updateSGStatus(stats.gather, edge)
+ if (edge.function.isInstanceOf[ScatterFunction]) {
+ updateGroupStatus(stats.scatter, edge)
} else if (edge.function.isInstanceOf[CloneFunction]) {
- updateSGStatus(stats.scatter, edge)
+ updateGroupStatus(stats.scatter, edge)
+ } else if (edge.function.isInstanceOf[GatherFunction]) {
+ updateGroupStatus(stats.gather, edge)
} else {
- stats.status = edge.status
+ updateGroupStatus(stats.jobs, edge)
}
}
- private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) {
- stats.total += 1
+ private def updateGroupStatus(groupStatus: GroupStatus, edge: FunctionEdge) {
+ groupStatus.total += 1
edge.status match {
- case RunnerStatus.DONE => stats.done += 1
- case RunnerStatus.FAILED => stats.failed += 1
- case RunnerStatus.SKIPPED => stats.skipped += 1
+ case RunnerStatus.DONE => groupStatus.done += 1
+ case RunnerStatus.FAILED => groupStatus.failed += 1
+ case RunnerStatus.SKIPPED => groupStatus.skipped += 1
/* can't tell the difference between pending and running right now! */
case RunnerStatus.PENDING =>
case RunnerStatus.RUNNING =>
@@ -631,8 +665,8 @@ class QGraph extends Logging {
/**
* Formats a status into nice strings
*/
- private def formatSGStatus(stats: ScatterGatherStatus, prefix: String) = {
- " %s:%dt/%dd/%df".format(
+ private def formatGroupStatus(stats: GroupStatus, prefix: String = "") = {
+ " %s%dt/%dd/%df".format(
prefix, stats.total, stats.done, stats.failed)
}
@@ -815,7 +849,7 @@ class QGraph extends Logging {
.asInstanceOf[Set[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
- manager.updateStatus(managerRunners.toList)
+ manager.updateStatus(managerRunners)
} catch {
case e => /* ignore */
}
@@ -846,13 +880,13 @@ class QGraph extends Logging {
.asInstanceOf[Set[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
- manager.tryStop(managerRunners.toList)
+ manager.tryStop(managerRunners)
} catch {
case e => /* ignore */
}
for (runner <- managerRunners) {
try {
- runner.removeTemporaryFiles()
+ runner.cleanup()
} catch {
case e => /* ignore */
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala
index 0056e1a2b..650599377 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala
@@ -1,3 +1,27 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.queue.engine
import java.io.File
@@ -12,8 +36,11 @@ class QGraphSettings {
@ArgumentCollection
val qSettings = new QSettings
- @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false)
- var bsubAllJobs = false
+ @Argument(fullName="job_runner", shortName="jobRunner", doc="Use the specified job runner to dispatch command line jobs", required=false)
+ var jobRunner: String = _
+
+ @Argument(fullName="bsub", shortName="bsub", doc="Equivalent to -jobRunner Lsf706", required=false)
+ var bsub = false
@Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false)
var run = false
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala
deleted file mode 100644
index 3561d44cf..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobManager.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-import org.broadinstitute.sting.queue.function.CommandLineFunction
-
-class ShellJobManager extends JobManager[CommandLineFunction, ShellJobRunner] {
- def runnerType = classOf[ShellJobRunner]
- def functionType = classOf[CommandLineFunction]
- def create(function: CommandLineFunction) = new ShellJobRunner(function)
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala
deleted file mode 100755
index 5aebebb66..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-import org.broadinstitute.sting.queue.function.CommandLineFunction
-import org.broadinstitute.sting.queue.util.ShellJob
-
-/**
- * Runs jobs one at a time locally
- */
-class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner {
- private var runStatus: RunnerStatus.Value = _
-
- /**
- * Runs the function on the local shell.
- * @param function Command to run.
- */
- def start() {
- val job = new ShellJob
-
- job.workingDir = function.commandDirectory
- job.outputFile = function.jobOutputFile
- job.errorFile = function.jobErrorFile
-
- writeExec()
- job.shellScript = exec
-
- // Allow advanced users to update the job.
- updateJobRun(job)
-
- runStatus = RunnerStatus.RUNNING
- job.run()
- runStatus = RunnerStatus.DONE
- }
-
- def status = runStatus
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala
new file mode 100644
index 000000000..78bd2cc78
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobManager.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.gridengine
+
+import org.broadinstitute.sting.queue.engine.CommandLineJobManager
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+
+class GridEngineJobManager extends CommandLineJobManager[GridEngineJobRunner] {
+ def runnerType = classOf[GridEngineJobRunner]
+ def create(function: CommandLineFunction) = new GridEngineJobRunner(function)
+
+ override def updateStatus(runners: Set[GridEngineJobRunner]) = { GridEngineJobRunner.updateStatus(runners) }
+ override def tryStop(runners: Set[GridEngineJobRunner]) { GridEngineJobRunner.tryStop(runners) }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala
new file mode 100644
index 000000000..39870e786
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.gridengine
+
+import org.broadinstitute.sting.queue.util.Logging
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
+
+class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
+ // Run the static initializer for GridEngineJobRunner
+ GridEngineJobRunner
+
+ def start() = {
+ // TODO: Copy settings from function to GridEngine syntax.
+ /*
+ val gridEngineJob = new ...
+
+ // Set the display name to 4000 characters of the description (or whatever the GE max is)
+ gridEngineJob.displayName = function.description.take(4000)
+
+ // Set the output file for stdout
+ gridEngineJob.outputFile = function.jobOutputFile.getPath
+
+ // Set the current working directory
+ gridEngineJob.workingDirectory = function.commandDirectory.getPath
+
+ // If the error file is set specify the separate output for stderr
+ if (function.jobErrorFile != null) {
+ gridEngineJob.errFile = function.jobErrorFile.getPath
+ }
+
+ // If a project name is set specify the project name
+ if (function.jobProject != null) {
+ gridEngineJob.projectName = function.jobProject
+ }
+
+ // If the job queue is set specify the job queue
+ if (function.jobQueue != null) {
+ gridEngineJob.queue = function.jobQueue
+ }
+
+ // If the memory limit is set (GB) specify the memory limit
+ if (function.memoryLimit.isDefined) {
+ gridEngineJob.jobMemoryLimit = function.memoryLimit.get + "GB"
+ }
+
+ // If the priority is set (user specified Int) specify the priority
+ if (function.jobPriority.isDefined) {
+ gridEngineJob.jobPriority = function.jobPriority.get
+ }
+
+ // Instead of running the function.commandLine, run "sh "
+ gridEngineJob.command = "sh " + jobScript
+
+ // Store the status so it can be returned in the status method.
+ myStatus = RunnerStatus.RUNNING
+
+ // Start the job and store the id so it can be killed in tryStop
+ myJobId = gridEngineJob.start()
+ */
+
+ logger.warn("TODO: implement Grid Engine support")
+ }
+
+ // TODO: Return the latest status: RUNNING, FAILED, or DONE
+ def status = throw new RuntimeException("TODO: Grid Engine return status such as: " + RunnerStatus.FAILED)
+}
+
+object GridEngineJobRunner extends Logging {
+ initGridEngine()
+
+ /**
+ * Initialize the Grid Engine library.
+ */
+ private def initGridEngine() {
+ // TODO: Init
+ logger.warn("TODO Grid Engine: Initialize here.")
+ }
+
+ /**
+ * Updates the status of a list of jobs.
+ * @param runners Runners to update.
+ */
+ def updateStatus(runners: Set[GridEngineJobRunner]) {
+ // TODO: Bulk update. If not possible this method can be removed here and in GridEngineJobManager.
+ }
+
+ /**
+ * Tries to stop any running jobs.
+ * @param runners Runners to stop.
+ */
+ def tryStop(runners: Set[GridEngineJobRunner]) {
+ // TODO: Stop runners. SIGTERM(15) is preferred to SIGKILL(9).
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala
new file mode 100644
index 000000000..c0fff9125
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.lsf
+
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+import org.broadinstitute.sting.queue.engine.CommandLineJobManager
+
+/**
+ * Creates and stops Lsf706JobRunners
+ */
+class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] {
+ def runnerType = classOf[Lsf706JobRunner]
+ def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
+
+ override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) }
+ override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala
similarity index 77%
rename from scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala
rename to scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala
index d4bead82d..b7783c902 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala
@@ -1,6 +1,29 @@
-package org.broadinstitute.sting.queue.engine
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.lsf
-import java.io.File
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.QException
@@ -10,6 +33,7 @@ import org.broadinstitute.sting.jna.clibrary.LibC
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
import com.sun.jna.ptr.IntByReference
import com.sun.jna.{StringArray, NativeLong}
+import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
/**
* Runs jobs on an LSF compute cluster.
@@ -38,50 +62,55 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS)
request.rLimits(i) = LibLsf.DEFAULT_RLIMIT;
+ // Set the display name of the job to the first 4000 characters
+ request.jobName = function.description.take(4000)
+ request.options |= LibBat.SUB_JOB_NAME
+
+ // Set the output file for stdout
request.outFile = function.jobOutputFile.getPath
request.options |= LibBat.SUB_OUT_FILE
+ // Set the current working directory
+ request.cwd = function.commandDirectory.getPath
+ request.options3 |= LibBat.SUB3_CWD
+
+ // If the error file is set specify the separate output for stderr
if (function.jobErrorFile != null) {
request.errFile = function.jobErrorFile.getPath
request.options |= LibBat.SUB_ERR_FILE
}
+ // If a project name is set specify the project name
if (function.jobProject != null) {
request.projectName = function.jobProject
request.options |= LibBat.SUB_PROJECT_NAME
}
+ // If the job queue is set specify the job queue
if (function.jobQueue != null) {
request.queue = function.jobQueue
request.options |= LibBat.SUB_QUEUE
}
- if (IOUtils.absolute(new File(".")) != function.commandDirectory) {
- request.cwd = function.commandDirectory.getPath
- request.options3 |= LibBat.SUB3_CWD
- }
-
+ // If the memory limit is set (GB) specify the memory limit
if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
request.options |= LibBat.SUB_RES_REQ
}
- if (function.description != null) {
- request.jobName = function.description.take(1000)
- request.options |= LibBat.SUB_JOB_NAME
- }
-
+ // If the priority is set (user specified Int) specify the priority
if (function.jobPriority.isDefined) {
request.userPriority = function.jobPriority.get
request.options2 |= LibBat.SUB2_JOB_PRIORITY
}
+ // LSF specific: get the max runtime for the jobQueue and pass it for this job
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue)
- writeExec()
- request.command = "sh " + exec
+ // Run the command as sh
+ request.command = "sh " + jobScript
- // Allow advanced users to update the request.
+ // Allow advanced users to update the request via QFunction.updateJobRun()
updateJobRun(request)
updateStatus(RunnerStatus.RUNNING)
@@ -113,7 +142,7 @@ object Lsf706JobRunner extends Logging {
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
- init()
+ initLsf()
/** The name of the default queue. */
private var defaultQueue: String = _
@@ -124,20 +153,80 @@ object Lsf706JobRunner extends Logging {
/**
* Initialize the Lsf library.
*/
- private def init() = {
+ private def initLsf() {
lsfLibLock.synchronized {
if (LibBat.lsb_init("Queue") < 0)
throw new QException(LibBat.lsb_sperror("lsb_init() failed"))
}
}
+ /**
+ * Bulk updates job statuses.
+ * @param runners Runners to update.
+ */
+ def updateStatus(runners: Set[Lsf706JobRunner]) {
+ var updatedRunners = Set.empty[Lsf706JobRunner]
+
+ Lsf706JobRunner.lsfLibLock.synchronized {
+ val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB)
+ if (result < 0) {
+ logger.error(LibBat.lsb_sperror("Unable to check LSF job info"))
+ } else {
+ try {
+ val more = new IntByReference(result)
+ while (more.getValue > 0) {
+ val jobInfo = LibBat.lsb_readjobinfo(more)
+ if (jobInfo == null) {
+ logger.error(LibBat.lsb_sperror("Unable to read LSF job info"))
+ more.setValue(0)
+ } else {
+ runners.find(runner => runner.jobId == jobInfo.jobId) match {
+ case Some(runner) =>
+ updateRunnerStatus(runner, jobInfo)
+ updatedRunners += runner
+ case None => /* not our job */
+ }
+ }
+ }
+ } finally {
+ LibBat.lsb_closejobinfo()
+ }
+ }
+ }
+
+ for (runner <- runners.diff(updatedRunners)) {
+ checkUnknownStatus(runner)
+ }
+ }
+
+ /**
+ * Tries to stop any running jobs.
+ * @param runners Runners to stop.
+ */
+ def tryStop(runners: Set[Lsf706JobRunner]) {
+ lsfLibLock.synchronized {
+ // lsb_killbulkjobs does not seem to forward SIGTERM,
+ // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
+ for (runner <- runners.filterNot(_.jobId < 0)) {
+ try {
+ if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
+ logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
+ } catch {
+ case e =>
+ logger.error("Unable to kill job " + runner.jobId, e)
+ }
+ }
+ }
+ }
+
+
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
- def getRlimitRun(queue: String) = {
+ private def getRlimitRun(queue: String) = {
lsfLibLock.synchronized {
if (queue == null) {
if (defaultQueue != null) {
@@ -171,64 +260,6 @@ object Lsf706JobRunner extends Logging {
}
}
- /**
- * Updates the status of a list of jobs.
- */
- def updateStatus(runners: List[Lsf706JobRunner]) {
- var updatedRunners = List.empty[Lsf706JobRunner]
-
- Lsf706JobRunner.lsfLibLock.synchronized {
- val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB)
- if (result < 0) {
- logger.error(LibBat.lsb_sperror("Unable to check LSF job info"))
- } else {
- try {
- val more = new IntByReference(result)
- while (more.getValue > 0) {
- val jobInfo = LibBat.lsb_readjobinfo(more)
- if (jobInfo == null) {
- logger.error(LibBat.lsb_sperror("Unable to read LSF job info"))
- more.setValue(0)
- } else {
- runners.find(runner => runner.jobId == jobInfo.jobId) match {
- case Some(runner) =>
- updateRunnerStatus(runner, jobInfo)
- updatedRunners :+= runner
- case None => /* not our job */
- }
- }
- }
- } finally {
- LibBat.lsb_closejobinfo()
- }
- }
- }
-
- for (runner <- runners.diff(updatedRunners)) {
- checkUnknownStatus(runner)
- }
- }
-
- /**
- * Tries to stop any running jobs.
- * @param runners Runners to stop.
- */
- def tryStop(runners: List[Lsf706JobRunner]) {
- lsfLibLock.synchronized {
- // lsb_killbulkjobs does not seem to forward SIGTERM,
- // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
- for (runner <- runners.filterNot(_.jobId < 0)) {
- try {
- if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
- logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
- } catch {
- case e =>
- logger.error("Unable to kill job " + runner.jobId, e)
- }
- }
- }
- }
-
private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) {
val jobStatus = jobInfo.status
val exitStatus = jobInfo.exitStatus
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala
new file mode 100644
index 000000000..c5c8d719c
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobManager.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.shell
+
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+import org.broadinstitute.sting.queue.engine.CommandLineJobManager
+
+class ShellJobManager extends CommandLineJobManager[ShellJobRunner] {
+ def runnerType = classOf[ShellJobRunner]
+ def create(function: CommandLineFunction) = new ShellJobRunner(function)
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala
new file mode 100755
index 000000000..603511a30
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2011, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.engine.shell
+
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+import org.broadinstitute.sting.queue.util.ShellJob
+import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
+
+/**
+ * Runs jobs one at a time locally
+ */
+class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner {
+ private var runStatus: RunnerStatus.Value = _
+
+ /**
+ * Runs the function on the local shell.
+ * @param function Command to run.
+ */
+ def start() {
+ val job = new ShellJob
+
+ job.workingDir = function.commandDirectory
+ job.outputFile = function.jobOutputFile
+ job.errorFile = function.jobErrorFile
+
+ job.shellScript = jobScript
+
+ // Allow advanced users to update the job.
+ updateJobRun(job)
+
+ runStatus = RunnerStatus.RUNNING
+ job.run()
+ runStatus = RunnerStatus.DONE
+ }
+
+ def status = runStatus
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
index 707ae8f13..8ba81ea73 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
@@ -14,10 +14,8 @@ import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, R
* Inputs are matched to other outputs by using .equals()
*/
trait QFunction extends Logging {
- /**
- * Analysis function name
- */
- var analysisName: String = _
+ /** A short description of this step in the graph */
+ var analysisName: String = ""
/** Prefix for automatic job name creation */
var jobNamePrefix: String = _