Refactoring and sugar to give lower level access to the job graph.

Will add more sugar / glue depending on how much of a graph the python generator outputs.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3435 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-05-25 22:52:29 +00:00
parent 062b316881
commit ed4d8ddd05
18 changed files with 324 additions and 267 deletions

View File

@ -1,20 +1,22 @@
package org.broadinstitute.sting.queue
import collection.mutable.ListBuffer
import engine.Pipeline
import collection.JavaConversions._
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.utils.text.XReadLines
import java.io.{FileInputStream, File}
import java.util.Properties
object QArguments {
class QArguments(args: Array[String]) {
var useBsub = false
var dryRun = false
val scripts = new ListBuffer[String]
var inputPaths = List.empty[File]
var argMap = Map.empty[String, String]
def parseArgs(args: Array[String]) {
filterArgs(args)
}
parseArgs(args)
/**
* Pull out any args that are meant for QCommandLine
*/
private def filterArgs(args: Array[String]) = {
private def parseArgs(args: Array[String]) = {
var filtered = new ListBuffer[String]
filtered.appendAll(args)
@ -22,17 +24,15 @@ object QArguments {
Logging.enableDebug
if (isFlagged(filtered, "-dry"))
Pipeline.dryRun = true
dryRun = true
if (isFlagged(filtered, "-bsub"))
Pipeline.useBsub = true
useBsub = true
for (arg <- getArgs(filtered, "-P"))
Pipeline.addArg(arg)
addArg(arg)
for (arg <- getArgs(filtered, "-I"))
Pipeline.addFile(arg)
addFile(arg)
for (arg <- getArgs(filtered, "-S"))
scripts.append(arg)
filtered
}
private def isFlagged(filtered: ListBuffer[String], search: String) = {
@ -61,7 +61,30 @@ object QArguments {
found
}
def addArg(arg: String) = {
var file = new File(arg)
if (arg.contains("=") && !file.exists) {
val tokens = arg.split("=", 2)
argMap = argMap.updated(tokens(0), tokens(1))
} else if (file.exists && arg.endsWith(".properties")) {
var props = new Properties
props.load(new FileInputStream(file))
for ((name, value) <- props)
argMap = argMap.updated(name, value)
}
}
def addFile(arg: String): Unit = {
var file = new File(arg)
if (arg.endsWith(".list")) {
new XReadLines(file).iterator.foreach(addFile(_))
} else {
inputPaths = inputPaths ::: List(file)
}
}
}
object QArguments {
def strip(filtered: ListBuffer[String], search: String) = {
var index = 0
while (0 <= index && index < filtered.size) {
@ -70,4 +93,5 @@ object QArguments {
filtered.remove(index, 2)
}
}
}}
}
}

View File

@ -1,9 +1,7 @@
package org.broadinstitute.sting.queue
import engine.Pipeline
import tools.nsc.MainGenericRunner
import org.broadinstitute.sting.queue.util.ClasspathUtils
import org.apache.log4j._
import collection.mutable.ListBuffer
import org.broadinstitute.sting.queue.util.Logging
@ -11,40 +9,45 @@ object QCommandLine extends Application with Logging {
var usage = """usage: java -jar Queue.jar [ -P name=value ] [ -P file.properties ] [ -I input.file ] [ -I input_files.list ] [ -bsub ] [ -dry ] [ -debug ] -S pipeline.scala"""
override def main(args: Array[String]) = {
try {
QArguments.parseArgs(args.clone)
val qArgs: QArguments = try {
new QArguments(args)
} catch {
case e: Exception => {
case exception => {
println(usage)
System.exit(-1)
}
null
}
var newArgs = new ListBuffer[String]
newArgs.appendAll(args)
QArguments.strip(newArgs, "-S")
logger.debug("starting")
if (QArguments.scripts.size == 0) {
if (qArgs.scripts.size == 0) {
println("Error: Missing script")
println(usage)
System.exit(-1)
}
if (Pipeline.inputPaths.size == 0) {
// NOTE: Something in MainGenericRunner is exiting the VM.
if (qArgs.scripts.size != 1) {
println("Error: Only one script can be run at a time")
println(usage)
System.exit(-1)
}
if (qArgs.inputPaths.size == 0) {
println("Error: No inputs specified")
println(usage)
System.exit(-1)
}
for (script <- QArguments.scripts) {
var clone = newArgs.clone
clone.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, script)
MainGenericRunner.main(clone.toArray)
}
val newArgs = new ListBuffer[String]
newArgs.appendAll(args)
QArguments.strip(newArgs, "-S")
newArgs.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, qArgs.scripts.head)
MainGenericRunner.main(newArgs.toArray)
// NOTE: This line is not reached because something in MainGenericRunner is exiting the VM.
logger.debug("exiting")
}
}

View File

@ -1,97 +1,16 @@
package org.broadinstitute.sting.queue.engine
import graphing.JobGrapher
import org.broadinstitute.sting.queue.util.Logging
import scheduling.{DispatchJobScheduler, SimpleJobScheduler}
import java.lang.String
import collection.immutable.List
import collection.JavaConversions._
import java.util.Properties
import java.io.{File, FileInputStream}
import org.broadinstitute.sting.utils.text.XReadLines
import org.broadinstitute.sting.queue.{QException, QArguments}
import graphing.{ExplicitJobGrapher, TreeJobGrapher}
import scheduling.ResourceNode
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object Pipeline extends Logging
{
var inputPaths = List.empty[File]
// TODO: Stop using globals and wrap in an execution environment. Will need when overriding values per command.
var useBsub = false
var dryRun = false
private var argMap = Map.empty[String, String]
private var rules = List.empty[QRule]
object Pipeline {
def addRule(rule: (Any, Any), commandString: String): Unit = TreeJobGrapher.addRule(rule, commandString)
def run(args: Array[String], sources: Any, targets: Any): Unit = TreeJobGrapher.run(args, sources, targets)
/**
* Sugar that allows addRule( inputs -> outputs, command )
*/
def addRule(rule: (Any, Any), command: String): Unit = {
addRule(rule._1, rule._2, command)
}
private def addRule(inputs: Any, outputs: Any, command: String): Unit = {
rules :::= List(new QRule(getFiles(inputs), getFiles(outputs), new QCommand(command)))
}
def run(args: Array[String], inputs: Any, outputs: Any) = {
QArguments.parseArgs(args)
var inputFiles = getFiles(inputs)
var outputFiles = getFiles(outputs)
var grapher = new JobGrapher(inputPaths.map(_.getCanonicalPath), argMap, rules, inputFiles, outputFiles)
val scheduler = useBsub match {
case false => new SimpleJobScheduler(grapher.jobs)
case true => new DispatchJobScheduler(grapher.jobs)
}
scheduler.runJobs
}
/**
* Parses files passed in various sugar forms into a List[QFile]
*/
private def getFiles(files: Any) : List[QFile] = {
files match {
case null => List.empty[QFile]
case Nil => List.empty[QFile]
case path: String => List(new QFile(path))
case file: QFile => List(file)
// Any List or Tuple add the members to this list
case product: Product => {
var list = List.empty[QFile]
for (fileList <- product.productIterator.toList.map(getFiles(_))) {
list :::= fileList
}
list
}
case x => throw new QException("Unknown file type: " + x)
}
}
def addArg(arg: String) = {
var file = new File(arg)
if (arg.contains("=") && !file.exists) {
val tokens = arg.split("=", 2)
argMap = argMap.updated(tokens(0), tokens(1))
} else if (file.exists && arg.endsWith(".properties")) {
var props = new Properties
props.load(new FileInputStream(file))
for ((name, value) <- props)
argMap = argMap.updated(name, value)
}
}
def addFile(arg: String): Unit = {
var file = new File(arg)
if (arg.endsWith(".list")) {
for (line <- new XReadLines(file).iterator) {
addFile(line)
}
} else {
inputPaths = inputPaths ::: List(file)
}
}
def node() = ExplicitJobGrapher.node()
def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = ExplicitJobGrapher.addEdge(rule, commandString)
def run(): Unit = ExplicitJobGrapher.run()
}

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.engine
import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder}
import java.io.File
import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.QException
/**
* Represents a file extension along with several tags.
@ -20,3 +21,23 @@ class QFile(val fileType: String, val parts: String*) {
def baseName(file: File): String = StringUtils.removeEnd(file.getCanonicalPath, extension)
def fullName(baseName: String) = baseName + extension
}
object QFile {
def getFiles(files: Any) : List[QFile] = {
files match {
case null => List.empty[QFile]
case Nil => List.empty[QFile]
case path: String => List(new QFile(path))
case file: QFile => List(file)
// Any List or Tuple add the members to this list
case product: Product => {
var list = List.empty[QFile]
for (fileList <- product.productIterator.toList.map(getFiles(_))) {
list :::= fileList
}
list
}
case x => throw new QException("Unknown file type: " + x)
}
}
}

View File

@ -0,0 +1,27 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.broadinstitute.sting.queue.engine.scheduling.{ResourceNode, ExecEdge}
import org.broadinstitute.sting.queue.engine.QCommand
class ExplicitJobGrapher extends JobGrapher
object ExplicitJobGrapher {
private val grapher = new ExplicitJobGrapher
def node() = new ResourceNode
def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = {
addEdge(rule._1, rule._2, new QCommand(commandString))
}
private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = {
val resourceEdge = new ExecEdge(command)
grapher.jobGraph.addVertex(source)
grapher.jobGraph.addVertex(target)
grapher.jobGraph.addEdge(source, target, resourceEdge)
}
def run() = {
grapher.run()
}
}

View File

@ -1,87 +1,31 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.broadinstitute.sting.queue.QException
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.alg.BellmanFordShortestPath
import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule}
import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, ResourceEdge, ResourceNode}
import org.broadinstitute.sting.queue.QArguments
import org.broadinstitute.sting.queue.engine.scheduling._
import org.broadinstitute.sting.queue.util.Logging
import org.jgrapht.graph.SimpleDirectedGraph
/**
* A basic job grapher.
* Limitiations:
* - Only walks along graphs with rules that have a single input and a single output.
*/
class JobGrapher(
private val inputFiles: List[String],
private val argMap: Map[String, String],
private val rules: List[QRule],
private val sourceFiles: List[QFile],
private val targetFiles: List[QFile]) extends Logging {
abstract class JobGrapher() extends Logging {
/**
* Jobs to be run.
* Can be populated adhoc or during createJobGraph()
*/
protected val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand])
private val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
var qArgs: QArguments = _
createModelGraph()
createJobGraph()
def name = this.getClass.getName
def jobs = jobGraph
private def createJobGraph() = {
var missingPaths = List.empty[Tuple2[QFile,QFile]]
for (sourceFile <- sourceFiles) {
for (targetFile <- targetFiles) {
var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile)
if (shortestPath == null)
missingPaths = missingPaths ::: List((sourceFile, targetFile))
else
addPaths(shortestPath)
}
}
for ((sourceFile, targetFile) <- missingPaths) {
logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
}
if (missingPaths.size > 0)
throw new QException("Not all inputs and outputs found in the pipeline graph")
def run() = {
createJobGraph()
val scheduler = createScheduler()
scheduler.runJobs
}
private def createModelGraph() = {
for (rule <- rules) {
if (rule.inputs.size != 1 || (rule.outputs.size != 1))
throw new QException(this.name + " can only process rules with a single input and a single output. " +
"inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command)
var source = rule.inputs.head
var target = rule.outputs.head
modelGraph.addVertex(source)
modelGraph.addVertex(target)
modelGraph.addEdge(source, target, rule.command)
}
}
protected def createJobGraph() = {}
private def addPaths(shortestPath: java.util.List[QCommand]) {
for (inputFile <- inputFiles)
if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile))
addPath(shortestPath, inputFile)
}
private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = {
var sourceFile = inputFile
for (command <- shortestPath) {
val source = modelGraph.getEdgeSource(command)
val target = modelGraph.getEdgeTarget(command)
val baseName = source.baseName(sourceFile)
val targetFile = target.fullName(baseName)
val resourceSource = new ResourceNode(Map(source.extension -> sourceFile))
val resourceTarget = new ResourceNode(Map(target.extension -> targetFile))
val resourceEdge = new ExecEdge(argMap, command)
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
sourceFile = targetFile
private def createScheduler() : JobScheduler = {
qArgs.useBsub match {
case false => new SimpleJobScheduler(jobGraph, qArgs)
case true => new DispatchJobScheduler(jobGraph, qArgs)
}
}
}

View File

@ -0,0 +1,108 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.jgrapht.graph.SimpleDirectedGraph
import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule}
import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, MapResourceNode, ResourceEdge, ResourceNode}
import org.jgrapht.alg.BellmanFordShortestPath
import org.broadinstitute.sting.queue.{QArguments, QException}
import collection.mutable.ListBuffer
import collection.JavaConversions._
/**
* A basic job grapher.
* Limitiations:
* - Only walks along graphs with rules that have a single input and a single output.
*/
class TreeJobGrapher extends JobGrapher {
private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand])
private val rules = new ListBuffer[QRule]
private var sourceFiles = List.empty[QFile]
private var targetFiles = List.empty[QFile]
override protected def createJobGraph() = {
createModelGraph()
var missingPaths = List.empty[Tuple2[QFile,QFile]]
for (sourceFile <- sourceFiles) {
for (targetFile <- targetFiles) {
var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile)
if (shortestPath == null)
missingPaths = missingPaths ::: List((sourceFile, targetFile))
else
addPaths(shortestPath, qArgs)
}
}
for ((sourceFile, targetFile) <- missingPaths) {
logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
}
if (missingPaths.size > 0)
throw new QException("Not all inputs and outputs found in the pipeline graph")
}
private def createModelGraph() = {
for (rule <- rules) {
if (rule.inputs.size != 1 || (rule.outputs.size != 1))
throw new QException(this.getClass.getName + " can only process rules with a single input and a single output. " +
"inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command)
var source = rule.inputs.head
var target = rule.outputs.head
modelGraph.addVertex(source)
modelGraph.addVertex(target)
modelGraph.addEdge(source, target, rule.command)
}
}
private def addPaths(shortestPath: java.util.List[QCommand], qArgs: QArguments) {
for (inputFile <- qArgs.inputPaths.map(_.getCanonicalPath))
if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile))
addPath(shortestPath, inputFile)
}
private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = {
var sourceFile = inputFile
for (command <- shortestPath) {
val source = modelGraph.getEdgeSource(command)
val target = modelGraph.getEdgeTarget(command)
val baseName = source.baseName(sourceFile)
val targetFile = target.fullName(baseName)
val resourceSource = new MapResourceNode(Map(source.extension -> sourceFile))
val resourceTarget = new MapResourceNode(Map(target.extension -> targetFile))
val resourceEdge = new ExecEdge(command)
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
sourceFile = targetFile
}
}
}
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object TreeJobGrapher {
private val grapher = new TreeJobGrapher
/**
* Sugar that allows addRule( inputs -> outputs, command )
*/
def addRule(rule: (Any, Any), commandString: String): Unit = {
val inputs = QFile.getFiles(rule._1)
val outputs = QFile.getFiles(rule._2)
val command = new QCommand(commandString)
addRule(inputs, outputs, command)
}
private def addRule(inputs: List[QFile], outputs: List[QFile], command: QCommand): Unit = {
grapher.rules += new QRule(inputs, outputs, command)
}
def run(args: Array[String], sources: Any, targets: Any) = {
grapher.qArgs = new QArguments(args)
grapher.sourceFiles = QFile.getFiles(sources)
grapher.targetFiles = QFile.getFiles(targets)
grapher.run()
}
}

View File

@ -1,18 +1,18 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.DirectedGraph
import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
import management.ManagementFactory
import org.broadinstitute.sting.queue.QException
import java.io.File
import java.util.{ArrayList, Properties}
import org.broadinstitute.sting.queue.engine.Pipeline
import java.util.ArrayList
import org.broadinstitute.sting.queue.{QArguments, QException}
/**
* Dispatches jobs to LSF and then returns.
*/
class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends TopologicalJobScheduler(jobGraph, qArgs) {
private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob]
private var lsfJobIndex = 0
private val jvmName = ManagementFactory.getRuntimeMXBean.getName
@ -20,16 +20,14 @@ class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceE
protected def traversedExec(exec: ExecEdge) = {
lsfJobIndex += 1
val props = new Properties
exec.args.foreach(x => props.put(x._1, x._2))
val job = new LocalLsfJob
val jobName = jobNamePrefix + "-" + lsfJobIndex
val outputFile = jobName + ".out"
val errorFile = jobName + ".err"
val workingDir = props.getProperty("jobWorkingDir", ".")
val lsfProject = props.getProperty("jobProject", "Queue")
val queue = props.getProperty("jobQueue", "broad")
val memory = props.getProperty("jobMemory", "2")
val workingDir = lookup(exec, "jobWorkingDir", ".")
val lsfProject = lookup(exec, "jobProject", "Queue")
val queue = lookup(exec, "jobQueue", "broad")
val memory = lookup(exec, "jobMemory", "2")
var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]")
@ -51,7 +49,7 @@ class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceE
logger.info(job.getBsubCommand.mkString(" "))
if (!Pipeline.dryRun)
if (!qArgs.dryRun)
job.start
}

View File

@ -1,44 +1,17 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.QCommand
import java.lang.String
class ExecEdge(val args: Map[String, String], private val command: QCommand) extends ResourceEdge {
class ExecEdge(private val command: QCommand)
extends ResourceEdge {
private var convertedCommandString: String = _
def commandString = convertedCommandString
override def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]) = {
override def traverse(graph: JobScheduler) = {
// Lookup any variable using the target node, or any of it's input nodes.
val sub = new StrSubstitutor(new NodeLookup(graph.getEdgeTarget(this), graph))
val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) })
convertedCommandString = sub.replace(command.commandString)
}
class NodeLookup(private val targetNode: ResourceNode, private val graph: DirectedGraph[ResourceNode, ResourceEdge]) extends StrLookup {
def lookup(key: String) = {
var value: String = null
if (args.contains(key))
value = args(key)
else
value = lookup(key, targetNode)
value
}
private def lookup(key: String, node: ResourceNode): String = {
var value: String = null
if (node.resources.contains(key)) {
value = node.resources(key)
} else {
for (edge <- graph.incomingEdgesOf(node)) {
lookup(key, graph.getEdgeSource(edge)) match {
case null => {}
case found => value = found
}
}
}
value
}
}
}

View File

@ -1,10 +1,50 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.DirectedGraph
import org.broadinstitute.sting.queue.util.Logging
import collection.JavaConversions._
import org.broadinstitute.sting.queue.QArguments
abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
protected val qArgs: QArguments) extends Logging {
abstract class JobScheduler(protected val jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge])
extends Logging {
def runJobs
def numJobs = jobGraph.edgeSet.size
/**
* Emulates storing of properties per node by looking up values on
* the current edge/target-node or any preceding nodes in the graph.
*/
def lookup(edge: ResourceEdge, key: String, default: String) : String = {
lookupRecursive(edge, key) match {
case Some(value) => value
case None => qArgs.argMap.getOrElse(key, default)
}
}
private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = {
var value = edge.lookup(key)
if (value.isDefined)
return value
value = this.jobGraph.getEdgeTarget(edge).lookup(key)
if (value.isDefined)
return value
return lookupRecursive(this.jobGraph.getEdgeSource(edge), key)
}
private def lookupRecursive(node: ResourceNode, key: String) : Option[String] = {
var value = node.lookup(key)
if (value.isDefined)
return value
for (edge <- this.jobGraph.incomingEdgesOf(node)) {
value = lookupRecursive(edge, key)
if (value.isDefined)
return value
}
return None
}
}

View File

@ -0,0 +1,5 @@
package org.broadinstitute.sting.queue.engine.scheduling
class MapResourceNode (private val resources: Map[String,String]) extends ResourceNode {
override def lookup(key: String) : Option[String] = this.resources.get(key)
}

View File

@ -1,7 +1,6 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
abstract class ResourceEdge {
def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]): Unit
def traverse(graph: JobScheduler): Unit
def lookup(key: String) : Option[String] = None
}

View File

@ -2,7 +2,8 @@ package org.broadinstitute.sting.queue.engine.scheduling
import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder}
class ResourceNode(val resources: Map[String,String]) {
class ResourceNode() {
override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
override def hashCode = HashCodeBuilder.reflectionHashCode(this)
def lookup(key: String) : Option[String] = None
}

View File

@ -1,19 +1,19 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.broadinstitute.sting.queue.util.ProcessUtils
import org.broadinstitute.sting.queue.engine.Pipeline
import org.broadinstitute.sting.queue.QArguments
import org.jgrapht.DirectedGraph
/**
* Runs jobs one at a time locally
*/
class SimpleJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends TopologicalJobScheduler(jobGraph, qArgs) {
protected def traversedExec(exec: ExecEdge) = {
var commandString = exec.commandString
logger.info(commandString)
// TODO: Pre-print the commands?
if (!Pipeline.dryRun)
ProcessUtils.runCommandAndWait(commandString, Map.empty[String, String])
if (!qArgs.dryRun)
ProcessUtils.runCommandAndWait(commandString)
}
}

View File

@ -1,21 +1,21 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.DirectedGraph
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.QArguments
/**
* Loops over the job graph running jobs as the edges are traversed
*/
abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends JobScheduler(jobGraph) {
abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends JobScheduler(jobGraph, qArgs) {
protected val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = {
traversed(event.getEdge)
}
override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = traversed(event.getEdge)
})
override def runJobs = {
@ -26,7 +26,7 @@ abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNod
}
protected def traversed(edge: ResourceEdge) = {
edge.traverse(this.jobGraph)
edge.traverse(this)
edge match {
case exec: ExecEdge => traversedExec(exec)
}

View File

@ -4,7 +4,6 @@ import collection.JavaConversions._
import org.reflections.util.ManifestAwareClasspathHelper
import java.io.File
import javax.print.URIException
import org.apache.commons.lang.StringUtils
/**
* Builds the correct class path by examining the manifests

View File

@ -6,7 +6,7 @@ import org.apache.log4j._
* A mixin to add logging to a class
*/
trait Logging {
val className = this.getClass.getName
private val className = this.getClass.getName
lazy val logger = configuredLogger
def configuredLogger = {

View File

@ -15,14 +15,10 @@ object ProcessUtils extends Logging {
val running = new ListBuffer[Process]()
def runCommandAndWait(command: String, environment: Map[String, String]) = {
def runCommandAndWait(command: String) = {
logger.debug("Running command: " + command)
var builder = new ProcessBuilder("sh", "-c", command)
for ((key, value) <- environment) {
logger.debug(String.format("adding env: %s = %s", key, value))
builder.environment.put(key, value)
}
var process = builder.start
running += process