Multiple inputs / outputs for rules.

Cleanup.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3464 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-05-30 08:28:16 +00:00
parent e96bcf2128
commit beb8a83ca4
11 changed files with 108 additions and 58 deletions

View File

@ -4,6 +4,6 @@ package org.broadinstitute.sting.queue.engine
* Defines a basic command to run * Defines a basic command to run
* TODO: Allow overriding arguments per command such as the job queue * TODO: Allow overriding arguments per command such as the job queue
*/ */
class QCommand(val commandString: String) { class QCommand(val commandString: String) extends QModelEdge {
override def toString = commandString override def toString = commandString
} }

View File

@ -0,0 +1,7 @@
package org.broadinstitute.sting.queue.engine
/**
* Used for modeling before an edge gets
* replicated into an actual ResourceEdge
*/
class QModelEdge

View File

@ -15,7 +15,7 @@ object ExplicitJobGrapher {
} }
private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = { private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = {
val resourceEdge = new ExecEdge(command) val resourceEdge = new ExecEdge(command.commandString)
grapher.jobGraph.addVertex(source) grapher.jobGraph.addVertex(source)
grapher.jobGraph.addVertex(target) grapher.jobGraph.addVertex(target)
grapher.jobGraph.addEdge(source, target, resourceEdge) grapher.jobGraph.addEdge(source, target, resourceEdge)

View File

@ -1,32 +1,33 @@
package org.broadinstitute.sting.queue.engine.graphing package org.broadinstitute.sting.queue.engine.graphing
import org.jgrapht.graph.SimpleDirectedGraph import org.jgrapht.graph.SimpleDirectedGraph
import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule}
import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, MapResourceNode, ResourceEdge, ResourceNode}
import org.jgrapht.alg.BellmanFordShortestPath import org.jgrapht.alg.BellmanFordShortestPath
import org.broadinstitute.sting.queue.{QArguments, QException} import org.broadinstitute.sting.queue.{QArguments, QException}
import collection.mutable.ListBuffer import collection.mutable.ListBuffer
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.scheduling.{ResourceEdge, ExecEdge, MapResourceNode}
import org.broadinstitute.sting.queue.engine.{QModelEdge, QCommand, QFile, QRule}
/** /**
* A basic job grapher. * Converts a set of rules provided by the user and a list of files into a graph of jobs to run
* Limitiations:
* - Only walks along graphs with rules that have a single input and a single output.
*/ */
class TreeJobGrapher extends JobGrapher { class TreeJobGrapher extends JobGrapher {
private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand]) private val modelGraph = new SimpleDirectedGraph[List[QFile], QModelEdge](classOf[QModelEdge])
private val rules = new ListBuffer[QRule] private val rules = new ListBuffer[QRule]
private var sourceFiles = List.empty[QFile] private var sourceFiles = List.empty[QFile]
private var targetFiles = List.empty[QFile] private var targetFiles = List.empty[QFile]
// Used to tag a model edge for Element <-> List
private class QCollectionEdge extends QModelEdge
override protected def createJobGraph() = { override protected def createJobGraph() = {
createModelGraph() createModelGraph()
var missingPaths = List.empty[Tuple2[QFile,QFile]] var missingPaths = List.empty[Tuple2[QFile,QFile]]
for (sourceFile <- sourceFiles) { for (sourceFile <- sourceFiles) {
for (targetFile <- targetFiles) { for (targetFile <- targetFiles) {
var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile) var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, List(sourceFile), List(targetFile))
if (shortestPath == null) if (shortestPath == null)
missingPaths = missingPaths ::: List((sourceFile, targetFile)) missingPaths = missingPaths ::: List((sourceFile, targetFile))
else else
@ -43,40 +44,74 @@ class TreeJobGrapher extends JobGrapher {
} }
private def createModelGraph() = { private def createModelGraph() = {
// Look for rules with more than one input or output and add
// internal dependencies between the elements and the list
for (rule <- rules) { for (rule <- rules) {
if (rule.inputs.size != 1 || (rule.outputs.size != 1)) if (rule.inputs.size > 1) {
throw new QException(this.getClass.getName + " can only process rules with a single input and a single output. " + for (input <- rule.inputs) {
"inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command) modelGraph.addVertex(List(input))
var source = rule.inputs.head modelGraph.addVertex(rule.inputs)
var target = rule.outputs.head modelGraph.addEdge(List(input), rule.inputs, new QCollectionEdge)
modelGraph.addVertex(source) }
modelGraph.addVertex(target) }
modelGraph.addEdge(source, target, rule.command) if (rule.outputs.size > 1) {
for (output <- rule.outputs) {
modelGraph.addVertex(rule.outputs)
modelGraph.addVertex(List(output))
modelGraph.addEdge(rule.outputs, List(output), new QCollectionEdge)
}
}
}
// Add the explicit rules
for (rule <- rules) {
modelGraph.addVertex(rule.inputs)
modelGraph.addVertex(rule.outputs)
modelGraph.addEdge(rule.inputs, rule.outputs, rule.command)
} }
} }
private def addPaths(shortestPath: java.util.List[QCommand], qArgs: QArguments) { private def addPaths(shortestPath: java.util.List[QModelEdge], qArgs: QArguments) {
for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) {
if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile)) val source = modelGraph.getEdgeSource(shortestPath.head).head
addPath(shortestPath, inputFile) if (source.matchesFile(inputFile)) {
val baseName = source.baseName(inputFile)
val target = modelGraph.getEdgeTarget(shortestPath.last).head
addPathsToTarget(baseName, List(target))
}
}
} }
private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = { private def addPathsToTarget(baseName: String, targets: List[QFile]) : Unit = {
var sourceFile = inputFile for (command <- modelGraph.incomingEdgesOf(targets)) {
for (command <- shortestPath) { val sources = modelGraph.getEdgeSource(command)
val source = modelGraph.getEdgeSource(command) addJobGraphEdge(baseName, sources, targets, command)
val target = modelGraph.getEdgeTarget(command) addPathsToTarget(baseName, sources)
val baseName = source.baseName(sourceFile)
val targetFile = target.fullName(baseName)
val resourceSource = new MapResourceNode(Map(source.extension -> sourceFile))
val resourceTarget = new MapResourceNode(Map(target.extension -> targetFile))
val resourceEdge = new ExecEdge(command)
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
sourceFile = targetFile
} }
} }
private def addJobGraphEdge(baseName: String, sources: List[QFile], targets: List[QFile], command: QModelEdge) {
val resourceSource = new MapResourceNode(mapFiles(baseName, sources))
val resourceTarget = new MapResourceNode(mapFiles(baseName, targets))
val resourceEdge = command match {
case qCommand: QCommand => new ExecEdge(qCommand.commandString)
case qTransition: QCollectionEdge => new ResourceEdge
}
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
}
/**
* Creates a mapping of the files based on the baseName.
* key: file extension
* value: full name
* Used by the JobScheduler to lookup values as the commands are expanded at exec time.
*/
private def mapFiles(baseName: String, files: List[QFile]) : Map[String, String] = {
Map(files.map(file => (file.extension, file.fullName(baseName))):_*)
}
} }
/** /**

View File

@ -6,7 +6,7 @@ import collection.JavaConversions._
import management.ManagementFactory import management.ManagementFactory
import java.io.File import java.io.File
import java.util.ArrayList import java.util.ArrayList
import org.broadinstitute.sting.queue.{QArguments, QException} import org.broadinstitute.sting.queue.QArguments
/** /**
* Dispatches jobs to LSF and then returns. * Dispatches jobs to LSF and then returns.
@ -18,7 +18,7 @@ class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
private val jvmName = ManagementFactory.getRuntimeMXBean.getName private val jvmName = ManagementFactory.getRuntimeMXBean.getName
private val jobNamePrefix = "Q-" + jvmName private val jobNamePrefix = "Q-" + jvmName
protected def traversedExec(exec: ExecEdge) = { def processExec(exec: ExecEdge) = {
lsfJobIndex += 1 lsfJobIndex += 1
val job = new LocalLsfJob val job = new LocalLsfJob
val jobName = jobNamePrefix + "-" + lsfJobIndex val jobName = jobNamePrefix + "-" + lsfJobIndex
@ -66,8 +66,8 @@ class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
// Stop recursing when we find a job along this edge and return it's job id // Stop recursing when we find a job along this edge and return it's job id
case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec)) case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec))
// Throw error for a new edge type that we don't know how to handle // For any other type of edge find the LSF jobs preceeding the edge
case default => throw new QException("Unknown edge type: " + default) case resourceEdge: ResourceEdge => sourceJobs :::= sourceLsfJobs(resourceEdge)
} }
} }
sourceJobs sourceJobs

View File

@ -1,10 +1,9 @@
package org.broadinstitute.sting.queue.engine.scheduling package org.broadinstitute.sting.queue.engine.scheduling
import org.apache.commons.lang.text.{StrLookup, StrSubstitutor} import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
import org.broadinstitute.sting.queue.engine.QCommand
import java.lang.String import java.lang.String
class ExecEdge(private val command: QCommand) class ExecEdge(private val templateCommandString: String)
extends ResourceEdge { extends ResourceEdge {
private var convertedCommandString: String = _ private var convertedCommandString: String = _
def commandString = convertedCommandString def commandString = convertedCommandString
@ -12,6 +11,7 @@ class ExecEdge(private val command: QCommand)
override def traverse(graph: JobScheduler) = { override def traverse(graph: JobScheduler) = {
// Lookup any variable using the target node, or any of it's input nodes. // Lookup any variable using the target node, or any of it's input nodes.
val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) }) val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) })
convertedCommandString = sub.replace(command.commandString) convertedCommandString = sub.replace(templateCommandString)
graph.processExec(this)
} }
} }

View File

@ -8,18 +8,33 @@ import org.broadinstitute.sting.queue.QArguments
abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge], abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
protected val qArgs: QArguments) extends Logging { protected val qArgs: QArguments) extends Logging {
private var missingKeys = Set.empty[String]
def runJobs def runJobs
def numJobs = jobGraph.edgeSet.size def numJobs = jobGraph.edgeSet.size
def processExec(exec: ExecEdge) : Unit
/** /**
* Emulates storing of properties per node by looking up values on * Emulates storing of properties per node by looking up values on
* the current edge/target-node or any preceding nodes in the graph. * the current edge/target-node or any preceding nodes in the graph.
*/ */
def lookup(edge: ResourceEdge, key: String, default: String) : String = { def lookup(edge: ResourceEdge, key: String, default: String) : String = {
lookupRecursive(edge, key) match { val value = lookupRecursive(edge, key) match {
case Some(value) => value case Some(value) => value
case None => qArgs.argMap.getOrElse(key, default) case None => qArgs.argMap.getOrElse(key, default)
} }
if (value == null)
missingKeys = missingKeys ++ Set(key)
value
}
protected def logMissingKeys = {
if (qArgs.dryRun && !missingKeys.isEmpty) {
logger.warn("Missing keys:")
for (key <- missingKeys)
logger.warn(" ${" + key + "}")
}
} }
private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = { private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = {

View File

@ -1,6 +1,6 @@
package org.broadinstitute.sting.queue.engine.scheduling package org.broadinstitute.sting.queue.engine.scheduling
abstract class ResourceEdge { class ResourceEdge {
def traverse(graph: JobScheduler): Unit def traverse(graph: JobScheduler): Unit = {}
def lookup(key: String) : Option[String] = None def lookup(key: String) : Option[String] = None
} }

View File

@ -9,7 +9,7 @@ import org.jgrapht.DirectedGraph
*/ */
class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends TopologicalJobScheduler(jobGraph, qArgs) { extends TopologicalJobScheduler(jobGraph, qArgs) {
protected def traversedExec(exec: ExecEdge) = { def processExec(exec: ExecEdge) = {
var commandString = exec.commandString var commandString = exec.commandString
logger.info(commandString) logger.info(commandString)

View File

@ -15,7 +15,8 @@ abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, Res
protected val iterator = new TopologicalOrderIterator(this.jobGraph) protected val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] { iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = traversed(event.getEdge) override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) =
event.getEdge.traverse(TopologicalJobScheduler.this)
}) })
override def runJobs = { override def runJobs = {
@ -23,14 +24,6 @@ abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, Res
for (target <- iterator) { for (target <- iterator) {
// Do nothing for now, let event handler respond // Do nothing for now, let event handler respond
} }
logMissingKeys
} }
protected def traversed(edge: ResourceEdge) = {
edge.traverse(this)
edge match {
case exec: ExecEdge => traversedExec(exec)
}
}
protected def traversedExec(exec: ExecEdge)
} }

View File

@ -18,10 +18,10 @@ trait Logging {
object Logging { object Logging {
private var configured = false private var configured = false
private var isDebug = false private var isDebug = false
def configureLogging() { def configureLogging = {
if (!configured) { if (!configured) {
var root = Logger.getRootLogger var root = Logger.getRootLogger
root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} %C{1} - %m %n"))) root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} - %m %n")))
root.setLevel(if(isDebug) Level.DEBUG else Level.INFO) root.setLevel(if(isDebug) Level.DEBUG else Level.INFO)
configured = true configured = true
} }