From ed4d8ddd0506c06603e939ca0ea5558f90ee54fe Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 25 May 2010 22:52:29 +0000 Subject: [PATCH] Refactoring and sugar to give lower level access to the job graph. Will add more sugar / glue depending on how much of a graph the python generator outputs. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3435 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/queue/QArguments.scala | 56 ++++++--- .../sting/queue/QCommandLine.scala | 37 +++--- .../sting/queue/engine/Pipeline.scala | 97 ++-------------- .../sting/queue/engine/QFile.scala | 21 ++++ .../engine/graphing/ExplicitJobGrapher.scala | 27 +++++ .../queue/engine/graphing/JobGrapher.scala | 94 +++------------ .../engine/graphing/TreeJobGrapher.scala | 108 ++++++++++++++++++ .../scheduling/DispatchJobScheduler.scala | 22 ++-- .../queue/engine/scheduling/ExecEdge.scala | 37 +----- .../engine/scheduling/JobScheduler.scala | 46 +++++++- .../engine/scheduling/MapResourceNode.scala | 5 + .../engine/scheduling/ResourceEdge.scala | 5 +- .../engine/scheduling/ResourceNode.scala | 3 +- .../scheduling/SimpleJobScheduler.scala | 12 +- .../scheduling/TopologicalJobScheduler.scala | 12 +- .../sting/queue/util/ClasspathUtils.scala | 1 - .../sting/queue/util/Logging.scala | 2 +- .../sting/queue/util/ProcessUtils.scala | 6 +- 18 files changed, 324 insertions(+), 267 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala index 2fef2a840..d17e7ab4a 100755 --- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala +++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala @@ -1,20 +1,22 @@ package org.broadinstitute.sting.queue import collection.mutable.ListBuffer -import engine.Pipeline +import collection.JavaConversions._ import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.utils.text.XReadLines +import java.io.{FileInputStream, File} +import java.util.Properties -object QArguments { +class QArguments(args: Array[String]) { + var useBsub = false + var dryRun = false val scripts = new ListBuffer[String] + var inputPaths = List.empty[File] + var argMap = Map.empty[String, String] - def parseArgs(args: Array[String]) { - filterArgs(args) - } + parseArgs(args) - /** - * Pull out any args that are meant for QCommandLine - */ - private def filterArgs(args: Array[String]) = { + private def parseArgs(args: Array[String]) = { var filtered = new ListBuffer[String] filtered.appendAll(args) @@ -22,17 +24,15 @@ object QArguments { Logging.enableDebug if (isFlagged(filtered, "-dry")) - Pipeline.dryRun = true + dryRun = true if (isFlagged(filtered, "-bsub")) - Pipeline.useBsub = true + useBsub = true for (arg <- getArgs(filtered, "-P")) - Pipeline.addArg(arg) + addArg(arg) for (arg <- getArgs(filtered, "-I")) - Pipeline.addFile(arg) + addFile(arg) for (arg <- getArgs(filtered, "-S")) scripts.append(arg) - - filtered } private def isFlagged(filtered: ListBuffer[String], search: String) = { @@ -61,7 +61,30 @@ object QArguments { found } + def addArg(arg: String) = { + var file = new File(arg) + if (arg.contains("=") && !file.exists) { + val tokens = arg.split("=", 2) + argMap = argMap.updated(tokens(0), tokens(1)) + } else if (file.exists && arg.endsWith(".properties")) { + var props = new Properties + props.load(new FileInputStream(file)) + for ((name, value) <- props) + argMap = argMap.updated(name, value) + } + } + def addFile(arg: String): Unit = { + var file = new File(arg) + if (arg.endsWith(".list")) { + new XReadLines(file).iterator.foreach(addFile(_)) + } else { + inputPaths = inputPaths ::: List(file) + } + } +} + +object QArguments { def strip(filtered: ListBuffer[String], search: String) = { var index = 0 while (0 <= index && index < filtered.size) { @@ -70,4 +93,5 @@ object QArguments { filtered.remove(index, 2) } } - }} + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index b0c0bb224..c9e9ea694 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -1,9 +1,7 @@ package org.broadinstitute.sting.queue -import engine.Pipeline import tools.nsc.MainGenericRunner import org.broadinstitute.sting.queue.util.ClasspathUtils -import org.apache.log4j._ import collection.mutable.ListBuffer import org.broadinstitute.sting.queue.util.Logging @@ -11,40 +9,45 @@ object QCommandLine extends Application with Logging { var usage = """usage: java -jar Queue.jar [ -P name=value ] [ -P file.properties ] [ -I input.file ] [ -I input_files.list ] [ -bsub ] [ -dry ] [ -debug ] -S pipeline.scala""" override def main(args: Array[String]) = { - try { - QArguments.parseArgs(args.clone) + val qArgs: QArguments = try { + new QArguments(args) } catch { - case e: Exception => { + case exception => { println(usage) System.exit(-1) } + null } - var newArgs = new ListBuffer[String] - newArgs.appendAll(args) - - QArguments.strip(newArgs, "-S") - logger.debug("starting") - if (QArguments.scripts.size == 0) { + if (qArgs.scripts.size == 0) { println("Error: Missing script") println(usage) System.exit(-1) } - if (Pipeline.inputPaths.size == 0) { + // NOTE: Something in MainGenericRunner is exiting the VM. + if (qArgs.scripts.size != 1) { + println("Error: Only one script can be run at a time") + println(usage) + System.exit(-1) + } + + if (qArgs.inputPaths.size == 0) { println("Error: No inputs specified") println(usage) System.exit(-1) } - for (script <- QArguments.scripts) { - var clone = newArgs.clone - clone.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, script) - MainGenericRunner.main(clone.toArray) - } + val newArgs = new ListBuffer[String] + newArgs.appendAll(args) + QArguments.strip(newArgs, "-S") + newArgs.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, qArgs.scripts.head) + MainGenericRunner.main(newArgs.toArray) + + // NOTE: This line is not reached because something in MainGenericRunner is exiting the VM. logger.debug("exiting") } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala index 421f0e2ec..fff54849b 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala @@ -1,97 +1,16 @@ package org.broadinstitute.sting.queue.engine -import graphing.JobGrapher -import org.broadinstitute.sting.queue.util.Logging -import scheduling.{DispatchJobScheduler, SimpleJobScheduler} -import java.lang.String -import collection.immutable.List -import collection.JavaConversions._ -import java.util.Properties -import java.io.{File, FileInputStream} -import org.broadinstitute.sting.utils.text.XReadLines -import org.broadinstitute.sting.queue.{QException, QArguments} +import graphing.{ExplicitJobGrapher, TreeJobGrapher} +import scheduling.ResourceNode /** * Syntactic sugar for filling in a pipeline using a Scala script. */ -object Pipeline extends Logging -{ - var inputPaths = List.empty[File] - // TODO: Stop using globals and wrap in an execution environment. Will need when overriding values per command. - var useBsub = false - var dryRun = false - private var argMap = Map.empty[String, String] - private var rules = List.empty[QRule] +object Pipeline { + def addRule(rule: (Any, Any), commandString: String): Unit = TreeJobGrapher.addRule(rule, commandString) + def run(args: Array[String], sources: Any, targets: Any): Unit = TreeJobGrapher.run(args, sources, targets) - /** - * Sugar that allows addRule( inputs -> outputs, command ) - */ - def addRule(rule: (Any, Any), command: String): Unit = { - addRule(rule._1, rule._2, command) - } - - private def addRule(inputs: Any, outputs: Any, command: String): Unit = { - rules :::= List(new QRule(getFiles(inputs), getFiles(outputs), new QCommand(command))) - } - - def run(args: Array[String], inputs: Any, outputs: Any) = { - QArguments.parseArgs(args) - - var inputFiles = getFiles(inputs) - var outputFiles = getFiles(outputs) - - var grapher = new JobGrapher(inputPaths.map(_.getCanonicalPath), argMap, rules, inputFiles, outputFiles) - - val scheduler = useBsub match { - case false => new SimpleJobScheduler(grapher.jobs) - case true => new DispatchJobScheduler(grapher.jobs) - } - - scheduler.runJobs - } - - /** - * Parses files passed in various sugar forms into a List[QFile] - */ - private def getFiles(files: Any) : List[QFile] = { - files match { - case null => List.empty[QFile] - case Nil => List.empty[QFile] - case path: String => List(new QFile(path)) - case file: QFile => List(file) - // Any List or Tuple add the members to this list - case product: Product => { - var list = List.empty[QFile] - for (fileList <- product.productIterator.toList.map(getFiles(_))) { - list :::= fileList - } - list - } - case x => throw new QException("Unknown file type: " + x) - } - } - - def addArg(arg: String) = { - var file = new File(arg) - if (arg.contains("=") && !file.exists) { - val tokens = arg.split("=", 2) - argMap = argMap.updated(tokens(0), tokens(1)) - } else if (file.exists && arg.endsWith(".properties")) { - var props = new Properties - props.load(new FileInputStream(file)) - for ((name, value) <- props) - argMap = argMap.updated(name, value) - } - } - - def addFile(arg: String): Unit = { - var file = new File(arg) - if (arg.endsWith(".list")) { - for (line <- new XReadLines(file).iterator) { - addFile(line) - } - } else { - inputPaths = inputPaths ::: List(file) - } - } + def node() = ExplicitJobGrapher.node() + def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = ExplicitJobGrapher.addEdge(rule, commandString) + def run(): Unit = ExplicitJobGrapher.run() } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala index 89d0eb4bc..435cb0938 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala @@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.engine import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder} import java.io.File import org.apache.commons.lang.StringUtils +import org.broadinstitute.sting.queue.QException /** * Represents a file extension along with several tags. @@ -20,3 +21,23 @@ class QFile(val fileType: String, val parts: String*) { def baseName(file: File): String = StringUtils.removeEnd(file.getCanonicalPath, extension) def fullName(baseName: String) = baseName + extension } + +object QFile { + def getFiles(files: Any) : List[QFile] = { + files match { + case null => List.empty[QFile] + case Nil => List.empty[QFile] + case path: String => List(new QFile(path)) + case file: QFile => List(file) + // Any List or Tuple add the members to this list + case product: Product => { + var list = List.empty[QFile] + for (fileList <- product.productIterator.toList.map(getFiles(_))) { + list :::= fileList + } + list + } + case x => throw new QException("Unknown file type: " + x) + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala new file mode 100644 index 000000000..8cbf94ca3 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala @@ -0,0 +1,27 @@ +package org.broadinstitute.sting.queue.engine.graphing + +import org.broadinstitute.sting.queue.engine.scheduling.{ResourceNode, ExecEdge} +import org.broadinstitute.sting.queue.engine.QCommand + +class ExplicitJobGrapher extends JobGrapher + +object ExplicitJobGrapher { + private val grapher = new ExplicitJobGrapher + + def node() = new ResourceNode + + def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = { + addEdge(rule._1, rule._2, new QCommand(commandString)) + } + + private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = { + val resourceEdge = new ExecEdge(command) + grapher.jobGraph.addVertex(source) + grapher.jobGraph.addVertex(target) + grapher.jobGraph.addEdge(source, target, resourceEdge) + } + + def run() = { + grapher.run() + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala index 23dedd7a5..4fc1c0cc5 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala @@ -1,87 +1,31 @@ package org.broadinstitute.sting.queue.engine.graphing -import org.broadinstitute.sting.queue.QException -import org.jgrapht.graph.SimpleDirectedGraph -import org.jgrapht.alg.BellmanFordShortestPath -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule} -import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, ResourceEdge, ResourceNode} +import org.broadinstitute.sting.queue.QArguments +import org.broadinstitute.sting.queue.engine.scheduling._ import org.broadinstitute.sting.queue.util.Logging +import org.jgrapht.graph.SimpleDirectedGraph -/** - * A basic job grapher. - * Limitiations: - * - Only walks along graphs with rules that have a single input and a single output. - */ -class JobGrapher( - private val inputFiles: List[String], - private val argMap: Map[String, String], - private val rules: List[QRule], - private val sourceFiles: List[QFile], - private val targetFiles: List[QFile]) extends Logging { +abstract class JobGrapher() extends Logging { + /** + * Jobs to be run. + * Can be populated adhoc or during createJobGraph() + */ + protected val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge]) - private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand]) - private val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge]) + var qArgs: QArguments = _ - createModelGraph() - createJobGraph() - - def name = this.getClass.getName - def jobs = jobGraph - - private def createJobGraph() = { - var missingPaths = List.empty[Tuple2[QFile,QFile]] - for (sourceFile <- sourceFiles) { - for (targetFile <- targetFiles) { - var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile) - if (shortestPath == null) - missingPaths = missingPaths ::: List((sourceFile, targetFile)) - else - addPaths(shortestPath) - } - } - - for ((sourceFile, targetFile) <- missingPaths) { - logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile)) - } - - if (missingPaths.size > 0) - throw new QException("Not all inputs and outputs found in the pipeline graph") + def run() = { + createJobGraph() + val scheduler = createScheduler() + scheduler.runJobs } - private def createModelGraph() = { - for (rule <- rules) { - if (rule.inputs.size != 1 || (rule.outputs.size != 1)) - throw new QException(this.name + " can only process rules with a single input and a single output. " + - "inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command) - var source = rule.inputs.head - var target = rule.outputs.head - modelGraph.addVertex(source) - modelGraph.addVertex(target) - modelGraph.addEdge(source, target, rule.command) - } - } + protected def createJobGraph() = {} - private def addPaths(shortestPath: java.util.List[QCommand]) { - for (inputFile <- inputFiles) - if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile)) - addPath(shortestPath, inputFile) - } - - private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = { - var sourceFile = inputFile - for (command <- shortestPath) { - val source = modelGraph.getEdgeSource(command) - val target = modelGraph.getEdgeTarget(command) - val baseName = source.baseName(sourceFile) - val targetFile = target.fullName(baseName) - val resourceSource = new ResourceNode(Map(source.extension -> sourceFile)) - val resourceTarget = new ResourceNode(Map(target.extension -> targetFile)) - val resourceEdge = new ExecEdge(argMap, command) - jobGraph.addVertex(resourceSource) - jobGraph.addVertex(resourceTarget) - jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge) - sourceFile = targetFile + private def createScheduler() : JobScheduler = { + qArgs.useBsub match { + case false => new SimpleJobScheduler(jobGraph, qArgs) + case true => new DispatchJobScheduler(jobGraph, qArgs) } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala new file mode 100644 index 000000000..ed8e2d05c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala @@ -0,0 +1,108 @@ +package org.broadinstitute.sting.queue.engine.graphing + +import org.jgrapht.graph.SimpleDirectedGraph +import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule} +import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, MapResourceNode, ResourceEdge, ResourceNode} +import org.jgrapht.alg.BellmanFordShortestPath +import org.broadinstitute.sting.queue.{QArguments, QException} +import collection.mutable.ListBuffer +import collection.JavaConversions._ + +/** + * A basic job grapher. + * Limitiations: + * - Only walks along graphs with rules that have a single input and a single output. + */ +class TreeJobGrapher extends JobGrapher { + private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand]) + + private val rules = new ListBuffer[QRule] + private var sourceFiles = List.empty[QFile] + private var targetFiles = List.empty[QFile] + + override protected def createJobGraph() = { + createModelGraph() + + var missingPaths = List.empty[Tuple2[QFile,QFile]] + for (sourceFile <- sourceFiles) { + for (targetFile <- targetFiles) { + var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile) + if (shortestPath == null) + missingPaths = missingPaths ::: List((sourceFile, targetFile)) + else + addPaths(shortestPath, qArgs) + } + } + + for ((sourceFile, targetFile) <- missingPaths) { + logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile)) + } + + if (missingPaths.size > 0) + throw new QException("Not all inputs and outputs found in the pipeline graph") + } + + private def createModelGraph() = { + for (rule <- rules) { + if (rule.inputs.size != 1 || (rule.outputs.size != 1)) + throw new QException(this.getClass.getName + " can only process rules with a single input and a single output. " + + "inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command) + var source = rule.inputs.head + var target = rule.outputs.head + modelGraph.addVertex(source) + modelGraph.addVertex(target) + modelGraph.addEdge(source, target, rule.command) + } + } + + private def addPaths(shortestPath: java.util.List[QCommand], qArgs: QArguments) { + for (inputFile <- qArgs.inputPaths.map(_.getCanonicalPath)) + if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile)) + addPath(shortestPath, inputFile) + } + + private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = { + var sourceFile = inputFile + for (command <- shortestPath) { + val source = modelGraph.getEdgeSource(command) + val target = modelGraph.getEdgeTarget(command) + val baseName = source.baseName(sourceFile) + val targetFile = target.fullName(baseName) + val resourceSource = new MapResourceNode(Map(source.extension -> sourceFile)) + val resourceTarget = new MapResourceNode(Map(target.extension -> targetFile)) + val resourceEdge = new ExecEdge(command) + jobGraph.addVertex(resourceSource) + jobGraph.addVertex(resourceTarget) + jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge) + sourceFile = targetFile + } + } +} + +/** + * Syntactic sugar for filling in a pipeline using a Scala script. + */ +object TreeJobGrapher { + private val grapher = new TreeJobGrapher + + /** + * Sugar that allows addRule( inputs -> outputs, command ) + */ + def addRule(rule: (Any, Any), commandString: String): Unit = { + val inputs = QFile.getFiles(rule._1) + val outputs = QFile.getFiles(rule._2) + val command = new QCommand(commandString) + addRule(inputs, outputs, command) + } + + private def addRule(inputs: List[QFile], outputs: List[QFile], command: QCommand): Unit = { + grapher.rules += new QRule(inputs, outputs, command) + } + + def run(args: Array[String], sources: Any, targets: Any) = { + grapher.qArgs = new QArguments(args) + grapher.sourceFiles = QFile.getFiles(sources) + grapher.targetFiles = QFile.getFiles(targets) + grapher.run() + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala index 99236657f..c3f30c6cc 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala @@ -1,18 +1,18 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.graph.SimpleDirectedGraph +import org.jgrapht.DirectedGraph import edu.mit.broad.core.lsf.LocalLsfJob import collection.JavaConversions._ import management.ManagementFactory -import org.broadinstitute.sting.queue.QException import java.io.File -import java.util.{ArrayList, Properties} -import org.broadinstitute.sting.queue.engine.Pipeline +import java.util.ArrayList +import org.broadinstitute.sting.queue.{QArguments, QException} /** * Dispatches jobs to LSF and then returns. */ -class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) { +class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) + extends TopologicalJobScheduler(jobGraph, qArgs) { private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob] private var lsfJobIndex = 0 private val jvmName = ManagementFactory.getRuntimeMXBean.getName @@ -20,16 +20,14 @@ class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceE protected def traversedExec(exec: ExecEdge) = { lsfJobIndex += 1 - val props = new Properties - exec.args.foreach(x => props.put(x._1, x._2)) val job = new LocalLsfJob val jobName = jobNamePrefix + "-" + lsfJobIndex val outputFile = jobName + ".out" val errorFile = jobName + ".err" - val workingDir = props.getProperty("jobWorkingDir", ".") - val lsfProject = props.getProperty("jobProject", "Queue") - val queue = props.getProperty("jobQueue", "broad") - val memory = props.getProperty("jobMemory", "2") + val workingDir = lookup(exec, "jobWorkingDir", ".") + val lsfProject = lookup(exec, "jobProject", "Queue") + val queue = lookup(exec, "jobQueue", "broad") + val memory = lookup(exec, "jobMemory", "2") var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]") @@ -51,7 +49,7 @@ class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceE logger.info(job.getBsubCommand.mkString(" ")) - if (!Pipeline.dryRun) + if (!qArgs.dryRun) job.start } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala index 3fda1db03..55d40ad4f 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala @@ -1,44 +1,17 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.DirectedGraph import org.apache.commons.lang.text.{StrLookup, StrSubstitutor} -import collection.JavaConversions._ import org.broadinstitute.sting.queue.engine.QCommand +import java.lang.String -class ExecEdge(val args: Map[String, String], private val command: QCommand) extends ResourceEdge { +class ExecEdge(private val command: QCommand) + extends ResourceEdge { private var convertedCommandString: String = _ def commandString = convertedCommandString - override def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]) = { + override def traverse(graph: JobScheduler) = { // Lookup any variable using the target node, or any of it's input nodes. - val sub = new StrSubstitutor(new NodeLookup(graph.getEdgeTarget(this), graph)) + val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) }) convertedCommandString = sub.replace(command.commandString) } - - class NodeLookup(private val targetNode: ResourceNode, private val graph: DirectedGraph[ResourceNode, ResourceEdge]) extends StrLookup { - - def lookup(key: String) = { - var value: String = null - if (args.contains(key)) - value = args(key) - else - value = lookup(key, targetNode) - value - } - - private def lookup(key: String, node: ResourceNode): String = { - var value: String = null - if (node.resources.contains(key)) { - value = node.resources(key) - } else { - for (edge <- graph.incomingEdgesOf(node)) { - lookup(key, graph.getEdgeSource(edge)) match { - case null => {} - case found => value = found - } - } - } - value - } - } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala index c6a1e30a9..0a876c761 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala @@ -1,10 +1,50 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.graph.SimpleDirectedGraph +import org.jgrapht.DirectedGraph import org.broadinstitute.sting.queue.util.Logging +import collection.JavaConversions._ +import org.broadinstitute.sting.queue.QArguments + +abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge], + protected val qArgs: QArguments) extends Logging { -abstract class JobScheduler(protected val jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) - extends Logging { def runJobs def numJobs = jobGraph.edgeSet.size + + /** + * Emulates storing of properties per node by looking up values on + * the current edge/target-node or any preceding nodes in the graph. + */ + def lookup(edge: ResourceEdge, key: String, default: String) : String = { + lookupRecursive(edge, key) match { + case Some(value) => value + case None => qArgs.argMap.getOrElse(key, default) + } + } + + private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = { + var value = edge.lookup(key) + if (value.isDefined) + return value + + value = this.jobGraph.getEdgeTarget(edge).lookup(key) + if (value.isDefined) + return value + + return lookupRecursive(this.jobGraph.getEdgeSource(edge), key) + } + + private def lookupRecursive(node: ResourceNode, key: String) : Option[String] = { + var value = node.lookup(key) + if (value.isDefined) + return value + + for (edge <- this.jobGraph.incomingEdgesOf(node)) { + value = lookupRecursive(edge, key) + if (value.isDefined) + return value + } + + return None + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala new file mode 100644 index 000000000..59155a610 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala @@ -0,0 +1,5 @@ +package org.broadinstitute.sting.queue.engine.scheduling + +class MapResourceNode (private val resources: Map[String,String]) extends ResourceNode { + override def lookup(key: String) : Option[String] = this.resources.get(key) +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala index a7d58e503..b376bc59c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala @@ -1,7 +1,6 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.DirectedGraph - abstract class ResourceEdge { - def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]): Unit + def traverse(graph: JobScheduler): Unit + def lookup(key: String) : Option[String] = None } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala index 0a61a40f1..475224ef8 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala @@ -2,7 +2,8 @@ package org.broadinstitute.sting.queue.engine.scheduling import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder} -class ResourceNode(val resources: Map[String,String]) { +class ResourceNode() { override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1) override def hashCode = HashCodeBuilder.reflectionHashCode(this) + def lookup(key: String) : Option[String] = None } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala index 939f6c336..f5029882a 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala @@ -1,19 +1,19 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.graph.SimpleDirectedGraph import org.broadinstitute.sting.queue.util.ProcessUtils -import org.broadinstitute.sting.queue.engine.Pipeline +import org.broadinstitute.sting.queue.QArguments +import org.jgrapht.DirectedGraph /** * Runs jobs one at a time locally */ -class SimpleJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) { +class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) + extends TopologicalJobScheduler(jobGraph, qArgs) { protected def traversedExec(exec: ExecEdge) = { var commandString = exec.commandString logger.info(commandString) - // TODO: Pre-print the commands? - if (!Pipeline.dryRun) - ProcessUtils.runCommandAndWait(commandString, Map.empty[String, String]) + if (!qArgs.dryRun) + ProcessUtils.runCommandAndWait(commandString) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala index 65003052f..c7cb58425 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala @@ -1,21 +1,21 @@ package org.broadinstitute.sting.queue.engine.scheduling -import org.jgrapht.graph.SimpleDirectedGraph +import org.jgrapht.DirectedGraph import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter} import collection.JavaConversions._ +import org.broadinstitute.sting.queue.QArguments /** * Loops over the job graph running jobs as the edges are traversed */ -abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends JobScheduler(jobGraph) { +abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) + extends JobScheduler(jobGraph, qArgs) { protected val iterator = new TopologicalOrderIterator(this.jobGraph) iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] { - override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = { - traversed(event.getEdge) - } + override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = traversed(event.getEdge) }) override def runJobs = { @@ -26,7 +26,7 @@ abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNod } protected def traversed(edge: ResourceEdge) = { - edge.traverse(this.jobGraph) + edge.traverse(this) edge match { case exec: ExecEdge => traversedExec(exec) } diff --git a/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala index 8b79c9cf1..f3a0f43b5 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala @@ -4,7 +4,6 @@ import collection.JavaConversions._ import org.reflections.util.ManifestAwareClasspathHelper import java.io.File import javax.print.URIException -import org.apache.commons.lang.StringUtils /** * Builds the correct class path by examining the manifests diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala index 48a989614..31b3a0c26 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala @@ -6,7 +6,7 @@ import org.apache.log4j._ * A mixin to add logging to a class */ trait Logging { - val className = this.getClass.getName + private val className = this.getClass.getName lazy val logger = configuredLogger def configuredLogger = { diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala index 31cea807d..fdb90f733 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala @@ -15,14 +15,10 @@ object ProcessUtils extends Logging { val running = new ListBuffer[Process]() - def runCommandAndWait(command: String, environment: Map[String, String]) = { + def runCommandAndWait(command: String) = { logger.debug("Running command: " + command) var builder = new ProcessBuilder("sh", "-c", command) - for ((key, value) <- environment) { - logger.debug(String.format("adding env: %s = %s", key, value)) - builder.environment.put(key, value) - } var process = builder.start running += process