diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala index ce5a0b37a..1126fea19 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala @@ -4,6 +4,6 @@ package org.broadinstitute.sting.queue.engine * Defines a basic command to run * TODO: Allow overriding arguments per command such as the job queue */ -class QCommand(val commandString: String) { +class QCommand(val commandString: String) extends QModelEdge { override def toString = commandString } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala new file mode 100644 index 000000000..580501ef1 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala @@ -0,0 +1,7 @@ +package org.broadinstitute.sting.queue.engine + +/** + * Used for modeling before an edge gets + * replicated into an actual ResourceEdge + */ +class QModelEdge diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala index 8cbf94ca3..4be5e17aa 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala @@ -15,7 +15,7 @@ object ExplicitJobGrapher { } private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = { - val resourceEdge = new ExecEdge(command) + val resourceEdge = new ExecEdge(command.commandString) grapher.jobGraph.addVertex(source) grapher.jobGraph.addVertex(target) grapher.jobGraph.addEdge(source, target, resourceEdge) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala index 149cef2ed..55b77b236 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala @@ -1,32 +1,33 @@ package org.broadinstitute.sting.queue.engine.graphing import org.jgrapht.graph.SimpleDirectedGraph -import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule} -import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, MapResourceNode, ResourceEdge, ResourceNode} import org.jgrapht.alg.BellmanFordShortestPath import org.broadinstitute.sting.queue.{QArguments, QException} import collection.mutable.ListBuffer import collection.JavaConversions._ +import org.broadinstitute.sting.queue.engine.scheduling.{ResourceEdge, ExecEdge, MapResourceNode} +import org.broadinstitute.sting.queue.engine.{QModelEdge, QCommand, QFile, QRule} /** - * A basic job grapher. - * Limitiations: - * - Only walks along graphs with rules that have a single input and a single output. + * Converts a set of rules provided by the user and a list of files into a graph of jobs to run */ class TreeJobGrapher extends JobGrapher { - private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand]) + private val modelGraph = new SimpleDirectedGraph[List[QFile], QModelEdge](classOf[QModelEdge]) private val rules = new ListBuffer[QRule] private var sourceFiles = List.empty[QFile] private var targetFiles = List.empty[QFile] + // Used to tag a model edge for Element <-> List + private class QCollectionEdge extends QModelEdge + override protected def createJobGraph() = { createModelGraph() var missingPaths = List.empty[Tuple2[QFile,QFile]] for (sourceFile <- sourceFiles) { for (targetFile <- targetFiles) { - var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile) + var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, List(sourceFile), List(targetFile)) if (shortestPath == null) missingPaths = missingPaths ::: List((sourceFile, targetFile)) else @@ -43,40 +44,74 @@ class TreeJobGrapher extends JobGrapher { } private def createModelGraph() = { + + // Look for rules with more than one input or output and add + // internal dependencies between the elements and the list for (rule <- rules) { - if (rule.inputs.size != 1 || (rule.outputs.size != 1)) - throw new QException(this.getClass.getName + " can only process rules with a single input and a single output. " + - "inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command) - var source = rule.inputs.head - var target = rule.outputs.head - modelGraph.addVertex(source) - modelGraph.addVertex(target) - modelGraph.addEdge(source, target, rule.command) + if (rule.inputs.size > 1) { + for (input <- rule.inputs) { + modelGraph.addVertex(List(input)) + modelGraph.addVertex(rule.inputs) + modelGraph.addEdge(List(input), rule.inputs, new QCollectionEdge) + } + } + if (rule.outputs.size > 1) { + for (output <- rule.outputs) { + modelGraph.addVertex(rule.outputs) + modelGraph.addVertex(List(output)) + modelGraph.addEdge(rule.outputs, List(output), new QCollectionEdge) + } + } + } + + // Add the explicit rules + for (rule <- rules) { + modelGraph.addVertex(rule.inputs) + modelGraph.addVertex(rule.outputs) + modelGraph.addEdge(rule.inputs, rule.outputs, rule.command) } } - private def addPaths(shortestPath: java.util.List[QCommand], qArgs: QArguments) { - for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) - if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile)) - addPath(shortestPath, inputFile) + private def addPaths(shortestPath: java.util.List[QModelEdge], qArgs: QArguments) { + for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) { + val source = modelGraph.getEdgeSource(shortestPath.head).head + if (source.matchesFile(inputFile)) { + val baseName = source.baseName(inputFile) + val target = modelGraph.getEdgeTarget(shortestPath.last).head + addPathsToTarget(baseName, List(target)) + } + } } - private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = { - var sourceFile = inputFile - for (command <- shortestPath) { - val source = modelGraph.getEdgeSource(command) - val target = modelGraph.getEdgeTarget(command) - val baseName = source.baseName(sourceFile) - val targetFile = target.fullName(baseName) - val resourceSource = new MapResourceNode(Map(source.extension -> sourceFile)) - val resourceTarget = new MapResourceNode(Map(target.extension -> targetFile)) - val resourceEdge = new ExecEdge(command) - jobGraph.addVertex(resourceSource) - jobGraph.addVertex(resourceTarget) - jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge) - sourceFile = targetFile + private def addPathsToTarget(baseName: String, targets: List[QFile]) : Unit = { + for (command <- modelGraph.incomingEdgesOf(targets)) { + val sources = modelGraph.getEdgeSource(command) + addJobGraphEdge(baseName, sources, targets, command) + addPathsToTarget(baseName, sources) } } + + private def addJobGraphEdge(baseName: String, sources: List[QFile], targets: List[QFile], command: QModelEdge) { + val resourceSource = new MapResourceNode(mapFiles(baseName, sources)) + val resourceTarget = new MapResourceNode(mapFiles(baseName, targets)) + val resourceEdge = command match { + case qCommand: QCommand => new ExecEdge(qCommand.commandString) + case qTransition: QCollectionEdge => new ResourceEdge + } + jobGraph.addVertex(resourceSource) + jobGraph.addVertex(resourceTarget) + jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge) + } + + /** + * Creates a mapping of the files based on the baseName. + * key: file extension + * value: full name + * Used by the JobScheduler to lookup values as the commands are expanded at exec time. + */ + private def mapFiles(baseName: String, files: List[QFile]) : Map[String, String] = { + Map(files.map(file => (file.extension, file.fullName(baseName))):_*) + } } /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala index a4c5340de..fb7450d9c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala @@ -6,7 +6,7 @@ import collection.JavaConversions._ import management.ManagementFactory import java.io.File import java.util.ArrayList -import org.broadinstitute.sting.queue.{QArguments, QException} +import org.broadinstitute.sting.queue.QArguments /** * Dispatches jobs to LSF and then returns. @@ -18,7 +18,7 @@ class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], private val jvmName = ManagementFactory.getRuntimeMXBean.getName private val jobNamePrefix = "Q-" + jvmName - protected def traversedExec(exec: ExecEdge) = { + def processExec(exec: ExecEdge) = { lsfJobIndex += 1 val job = new LocalLsfJob val jobName = jobNamePrefix + "-" + lsfJobIndex @@ -66,8 +66,8 @@ class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], // Stop recursing when we find a job along this edge and return it's job id case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec)) - // Throw error for a new edge type that we don't know how to handle - case default => throw new QException("Unknown edge type: " + default) + // For any other type of edge find the LSF jobs preceeding the edge + case resourceEdge: ResourceEdge => sourceJobs :::= sourceLsfJobs(resourceEdge) } } sourceJobs diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala index 55d40ad4f..ac09bf51f 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala @@ -1,10 +1,9 @@ package org.broadinstitute.sting.queue.engine.scheduling import org.apache.commons.lang.text.{StrLookup, StrSubstitutor} -import org.broadinstitute.sting.queue.engine.QCommand import java.lang.String -class ExecEdge(private val command: QCommand) +class ExecEdge(private val templateCommandString: String) extends ResourceEdge { private var convertedCommandString: String = _ def commandString = convertedCommandString @@ -12,6 +11,7 @@ class ExecEdge(private val command: QCommand) override def traverse(graph: JobScheduler) = { // Lookup any variable using the target node, or any of it's input nodes. val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) }) - convertedCommandString = sub.replace(command.commandString) + convertedCommandString = sub.replace(templateCommandString) + graph.processExec(this) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala index 0a876c761..c3a92b2f7 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala @@ -8,18 +8,33 @@ import org.broadinstitute.sting.queue.QArguments abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge], protected val qArgs: QArguments) extends Logging { + private var missingKeys = Set.empty[String] + def runJobs def numJobs = jobGraph.edgeSet.size + def processExec(exec: ExecEdge) : Unit + /** * Emulates storing of properties per node by looking up values on * the current edge/target-node or any preceding nodes in the graph. */ def lookup(edge: ResourceEdge, key: String, default: String) : String = { - lookupRecursive(edge, key) match { + val value = lookupRecursive(edge, key) match { case Some(value) => value case None => qArgs.argMap.getOrElse(key, default) } + if (value == null) + missingKeys = missingKeys ++ Set(key) + value + } + + protected def logMissingKeys = { + if (qArgs.dryRun && !missingKeys.isEmpty) { + logger.warn("Missing keys:") + for (key <- missingKeys) + logger.warn(" ${" + key + "}") + } } private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala index b376bc59c..a06ca9871 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala @@ -1,6 +1,6 @@ package org.broadinstitute.sting.queue.engine.scheduling -abstract class ResourceEdge { - def traverse(graph: JobScheduler): Unit +class ResourceEdge { + def traverse(graph: JobScheduler): Unit = {} def lookup(key: String) : Option[String] = None } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala index f5029882a..6c1ea43ca 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala @@ -9,7 +9,7 @@ import org.jgrapht.DirectedGraph */ class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) extends TopologicalJobScheduler(jobGraph, qArgs) { - protected def traversedExec(exec: ExecEdge) = { + def processExec(exec: ExecEdge) = { var commandString = exec.commandString logger.info(commandString) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala index c7cb58425..3689ebd6e 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala @@ -15,7 +15,8 @@ abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, Res protected val iterator = new TopologicalOrderIterator(this.jobGraph) iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] { - override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = traversed(event.getEdge) + override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = + event.getEdge.traverse(TopologicalJobScheduler.this) }) override def runJobs = { @@ -23,14 +24,6 @@ abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, Res for (target <- iterator) { // Do nothing for now, let event handler respond } + logMissingKeys } - - protected def traversed(edge: ResourceEdge) = { - edge.traverse(this) - edge match { - case exec: ExecEdge => traversedExec(exec) - } - } - - protected def traversedExec(exec: ExecEdge) } diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala index 31b3a0c26..7a6bf3f27 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala @@ -18,10 +18,10 @@ trait Logging { object Logging { private var configured = false private var isDebug = false - def configureLogging() { + def configureLogging = { if (!configured) { var root = Logger.getRootLogger - root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} %C{1} - %m %n"))) + root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} - %m %n"))) root.setLevel(if(isDebug) Level.DEBUG else Level.INFO) configured = true }