diff --git a/build.xml b/build.xml
index c98b49056..a7ff196a0 100644
--- a/build.xml
+++ b/build.xml
@@ -294,14 +294,12 @@
-
-
-
-
-
-
+
+
+
+
@@ -314,16 +312,61 @@
Building Scala...
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Building Queue...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ivy.xml b/ivy.xml
index a4af79a80..0502fabaf 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -4,6 +4,7 @@
+
@@ -39,7 +40,12 @@
-
-
+
+
+
+
+
+
+
diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
new file mode 100755
index 000000000..2fef2a840
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
@@ -0,0 +1,73 @@
+package org.broadinstitute.sting.queue
+
+import collection.mutable.ListBuffer
+import engine.Pipeline
+import org.broadinstitute.sting.queue.util.Logging
+
+object QArguments {
+ val scripts = new ListBuffer[String]
+
+ def parseArgs(args: Array[String]) {
+ filterArgs(args)
+ }
+
+ /**
+ * Pull out any args that are meant for QCommandLine
+ */
+ private def filterArgs(args: Array[String]) = {
+ var filtered = new ListBuffer[String]
+ filtered.appendAll(args)
+
+ if (isFlagged(filtered, "-debug"))
+ Logging.enableDebug
+
+ if (isFlagged(filtered, "-dry"))
+ Pipeline.dryRun = true
+ if (isFlagged(filtered, "-bsub"))
+ Pipeline.useBsub = true
+ for (arg <- getArgs(filtered, "-P"))
+ Pipeline.addArg(arg)
+ for (arg <- getArgs(filtered, "-I"))
+ Pipeline.addFile(arg)
+ for (arg <- getArgs(filtered, "-S"))
+ scripts.append(arg)
+
+ filtered
+ }
+
+ private def isFlagged(filtered: ListBuffer[String], search: String) = {
+ var found = false
+ var index = 0
+ while (0 <= index && index < filtered.size) {
+ index = filtered.indexOf(search)
+ if (index >= 0) {
+ found = true
+ filtered.remove(index)
+ }
+ }
+ found
+ }
+
+ private def getArgs(filtered: ListBuffer[String], search: String) = {
+ var found = new ListBuffer[String]
+ var index = 0
+ while (0 <= index && index < filtered.size) {
+ index = filtered.indexOf(search)
+ if (index >= 0) {
+ found.append(filtered(index+1))
+ filtered.remove(index, 2)
+ }
+ }
+ found
+ }
+
+
+ def strip(filtered: ListBuffer[String], search: String) = {
+ var index = 0
+ while (0 <= index && index < filtered.size) {
+ index = filtered.indexOf(search)
+ if (index >= 0) {
+ filtered.remove(index, 2)
+ }
+ }
+ }}
diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
new file mode 100755
index 000000000..b0c0bb224
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
@@ -0,0 +1,50 @@
+package org.broadinstitute.sting.queue
+
+import engine.Pipeline
+import tools.nsc.MainGenericRunner
+import org.broadinstitute.sting.queue.util.ClasspathUtils
+import org.apache.log4j._
+import collection.mutable.ListBuffer
+import org.broadinstitute.sting.queue.util.Logging
+
+object QCommandLine extends Application with Logging {
+ var usage = """usage: java -jar Queue.jar [ -P name=value ] [ -P file.properties ] [ -I input.file ] [ -I input_files.list ] [ -bsub ] [ -dry ] [ -debug ] -S pipeline.scala"""
+
+ override def main(args: Array[String]) = {
+ try {
+ QArguments.parseArgs(args.clone)
+ } catch {
+ case e: Exception => {
+ println(usage)
+ System.exit(-1)
+ }
+ }
+
+ var newArgs = new ListBuffer[String]
+ newArgs.appendAll(args)
+
+ QArguments.strip(newArgs, "-S")
+
+ logger.debug("starting")
+
+ if (QArguments.scripts.size == 0) {
+ println("Error: Missing script")
+ println(usage)
+ System.exit(-1)
+ }
+
+ if (Pipeline.inputPaths.size == 0) {
+ println("Error: No inputs specified")
+ println(usage)
+ System.exit(-1)
+ }
+
+ for (script <- QArguments.scripts) {
+ var clone = newArgs.clone
+ clone.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, script)
+ MainGenericRunner.main(clone.toArray)
+ }
+
+ logger.debug("exiting")
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/QException.scala b/scala/src/org/broadinstitute/sting/queue/QException.scala
new file mode 100755
index 000000000..5c439edeb
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/QException.scala
@@ -0,0 +1,4 @@
+package org.broadinstitute.sting.queue
+
+class QException(private val message: String, private val throwable: Throwable = null)
+ extends RuntimeException(message, throwable)
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala
new file mode 100755
index 000000000..421f0e2ec
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala
@@ -0,0 +1,97 @@
+package org.broadinstitute.sting.queue.engine
+
+import graphing.JobGrapher
+import org.broadinstitute.sting.queue.util.Logging
+import scheduling.{DispatchJobScheduler, SimpleJobScheduler}
+import java.lang.String
+import collection.immutable.List
+import collection.JavaConversions._
+import java.util.Properties
+import java.io.{File, FileInputStream}
+import org.broadinstitute.sting.utils.text.XReadLines
+import org.broadinstitute.sting.queue.{QException, QArguments}
+
+/**
+ * Syntactic sugar for filling in a pipeline using a Scala script.
+ */
+object Pipeline extends Logging
+{
+ var inputPaths = List.empty[File]
+ // TODO: Stop using globals and wrap in an execution environment. Will need when overriding values per command.
+ var useBsub = false
+ var dryRun = false
+ private var argMap = Map.empty[String, String]
+ private var rules = List.empty[QRule]
+
+ /**
+ * Sugar that allows addRule( inputs -> outputs, command )
+ */
+ def addRule(rule: (Any, Any), command: String): Unit = {
+ addRule(rule._1, rule._2, command)
+ }
+
+ private def addRule(inputs: Any, outputs: Any, command: String): Unit = {
+ rules :::= List(new QRule(getFiles(inputs), getFiles(outputs), new QCommand(command)))
+ }
+
+ def run(args: Array[String], inputs: Any, outputs: Any) = {
+ QArguments.parseArgs(args)
+
+ var inputFiles = getFiles(inputs)
+ var outputFiles = getFiles(outputs)
+
+ var grapher = new JobGrapher(inputPaths.map(_.getCanonicalPath), argMap, rules, inputFiles, outputFiles)
+
+ val scheduler = useBsub match {
+ case false => new SimpleJobScheduler(grapher.jobs)
+ case true => new DispatchJobScheduler(grapher.jobs)
+ }
+
+ scheduler.runJobs
+ }
+
+ /**
+ * Parses files passed in various sugar forms into a List[QFile]
+ */
+ private def getFiles(files: Any) : List[QFile] = {
+ files match {
+ case null => List.empty[QFile]
+ case Nil => List.empty[QFile]
+ case path: String => List(new QFile(path))
+ case file: QFile => List(file)
+ // Any List or Tuple add the members to this list
+ case product: Product => {
+ var list = List.empty[QFile]
+ for (fileList <- product.productIterator.toList.map(getFiles(_))) {
+ list :::= fileList
+ }
+ list
+ }
+ case x => throw new QException("Unknown file type: " + x)
+ }
+ }
+
+ def addArg(arg: String) = {
+ var file = new File(arg)
+ if (arg.contains("=") && !file.exists) {
+ val tokens = arg.split("=", 2)
+ argMap = argMap.updated(tokens(0), tokens(1))
+ } else if (file.exists && arg.endsWith(".properties")) {
+ var props = new Properties
+ props.load(new FileInputStream(file))
+ for ((name, value) <- props)
+ argMap = argMap.updated(name, value)
+ }
+ }
+
+ def addFile(arg: String): Unit = {
+ var file = new File(arg)
+ if (arg.endsWith(".list")) {
+ for (line <- new XReadLines(file).iterator) {
+ addFile(line)
+ }
+ } else {
+ inputPaths = inputPaths ::: List(file)
+ }
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala
new file mode 100755
index 000000000..ce5a0b37a
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala
@@ -0,0 +1,9 @@
+package org.broadinstitute.sting.queue.engine
+
+/**
+ * Defines a basic command to run
+ * TODO: Allow overriding arguments per command such as the job queue
+ */
+class QCommand(val commandString: String) {
+ override def toString = commandString
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala
new file mode 100755
index 000000000..89d0eb4bc
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala
@@ -0,0 +1,22 @@
+package org.broadinstitute.sting.queue.engine
+
+import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder}
+import java.io.File
+import org.apache.commons.lang.StringUtils
+
+/**
+ * Represents a file extension along with several tags.
+ * TODO: Use the tags to map rules between wildcards, ex: *.vcf -> *.eval
+ */
+class QFile(val fileType: String, val parts: String*) {
+ val extension = (List(parts:_*) ::: List(fileType)).mkString(".")
+ override def toString = extension
+ override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
+ override def hashCode = HashCodeBuilder.reflectionHashCode(this)
+
+ def matchesFile(path: String): Boolean = matchesFile(new File(path))
+ def matchesFile(file: File): Boolean = file.getCanonicalPath.endsWith(extension)
+ def baseName(path: String): String = baseName(new File(path))
+ def baseName(file: File): String = StringUtils.removeEnd(file.getCanonicalPath, extension)
+ def fullName(baseName: String) = baseName + extension
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala b/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala
new file mode 100755
index 000000000..a52e3d363
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala
@@ -0,0 +1,3 @@
+package org.broadinstitute.sting.queue.engine
+
+class QRule (val inputs: List[QFile], val outputs: List[QFile], var command: QCommand)
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala
new file mode 100755
index 000000000..23dedd7a5
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala
@@ -0,0 +1,87 @@
+package org.broadinstitute.sting.queue.engine.graphing
+
+import org.broadinstitute.sting.queue.QException
+import org.jgrapht.graph.SimpleDirectedGraph
+import org.jgrapht.alg.BellmanFordShortestPath
+import collection.JavaConversions._
+import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule}
+import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, ResourceEdge, ResourceNode}
+import org.broadinstitute.sting.queue.util.Logging
+
+/**
+ * A basic job grapher.
+ * Limitiations:
+ * - Only walks along graphs with rules that have a single input and a single output.
+ */
+class JobGrapher(
+ private val inputFiles: List[String],
+ private val argMap: Map[String, String],
+ private val rules: List[QRule],
+ private val sourceFiles: List[QFile],
+ private val targetFiles: List[QFile]) extends Logging {
+
+ private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand])
+ private val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
+
+ createModelGraph()
+ createJobGraph()
+
+ def name = this.getClass.getName
+ def jobs = jobGraph
+
+ private def createJobGraph() = {
+ var missingPaths = List.empty[Tuple2[QFile,QFile]]
+ for (sourceFile <- sourceFiles) {
+ for (targetFile <- targetFiles) {
+ var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile)
+ if (shortestPath == null)
+ missingPaths = missingPaths ::: List((sourceFile, targetFile))
+ else
+ addPaths(shortestPath)
+ }
+ }
+
+ for ((sourceFile, targetFile) <- missingPaths) {
+ logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
+ }
+
+ if (missingPaths.size > 0)
+ throw new QException("Not all inputs and outputs found in the pipeline graph")
+ }
+
+ private def createModelGraph() = {
+ for (rule <- rules) {
+ if (rule.inputs.size != 1 || (rule.outputs.size != 1))
+ throw new QException(this.name + " can only process rules with a single input and a single output. " +
+ "inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command)
+ var source = rule.inputs.head
+ var target = rule.outputs.head
+ modelGraph.addVertex(source)
+ modelGraph.addVertex(target)
+ modelGraph.addEdge(source, target, rule.command)
+ }
+ }
+
+ private def addPaths(shortestPath: java.util.List[QCommand]) {
+ for (inputFile <- inputFiles)
+ if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile))
+ addPath(shortestPath, inputFile)
+ }
+
+ private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = {
+ var sourceFile = inputFile
+ for (command <- shortestPath) {
+ val source = modelGraph.getEdgeSource(command)
+ val target = modelGraph.getEdgeTarget(command)
+ val baseName = source.baseName(sourceFile)
+ val targetFile = target.fullName(baseName)
+ val resourceSource = new ResourceNode(Map(source.extension -> sourceFile))
+ val resourceTarget = new ResourceNode(Map(target.extension -> targetFile))
+ val resourceEdge = new ExecEdge(argMap, command)
+ jobGraph.addVertex(resourceSource)
+ jobGraph.addVertex(resourceTarget)
+ jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
+ sourceFile = targetFile
+ }
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala
new file mode 100755
index 000000000..99236657f
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala
@@ -0,0 +1,81 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.graph.SimpleDirectedGraph
+import edu.mit.broad.core.lsf.LocalLsfJob
+import collection.JavaConversions._
+import management.ManagementFactory
+import org.broadinstitute.sting.queue.QException
+import java.io.File
+import java.util.{ArrayList, Properties}
+import org.broadinstitute.sting.queue.engine.Pipeline
+
+/**
+ * Dispatches jobs to LSF and then returns.
+ */
+class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
+ private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob]
+ private var lsfJobIndex = 0
+ private val jvmName = ManagementFactory.getRuntimeMXBean.getName
+ private val jobNamePrefix = "Q-" + jvmName
+
+ protected def traversedExec(exec: ExecEdge) = {
+ lsfJobIndex += 1
+ val props = new Properties
+ exec.args.foreach(x => props.put(x._1, x._2))
+ val job = new LocalLsfJob
+ val jobName = jobNamePrefix + "-" + lsfJobIndex
+ val outputFile = jobName + ".out"
+ val errorFile = jobName + ".err"
+ val workingDir = props.getProperty("jobWorkingDir", ".")
+ val lsfProject = props.getProperty("jobProject", "Queue")
+ val queue = props.getProperty("jobQueue", "broad")
+ val memory = props.getProperty("jobMemory", "2")
+
+ var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]")
+
+ val sourceJobs = sourceLsfJobs(exec)
+ if (sourceJobs.size > 0) {
+ extraArgs :::= List("-w", dependencyExpression(sourceJobs))
+ }
+ job.setName(jobName)
+ job.setExtraBsubArgs(new ArrayList(extraArgs))
+ job.setProject(lsfProject)
+ job.setWorkingDir(new File(workingDir))
+ job.setProject(lsfProject)
+ job.setCommand(exec.commandString)
+ job.setOutputFile(new File(workingDir, outputFile))
+ job.setErrFile(new File(workingDir, errorFile))
+ job.setQueue(queue)
+
+ lsfJobs = lsfJobs.updated(exec, job)
+
+ logger.info(job.getBsubCommand.mkString(" "))
+
+ if (!Pipeline.dryRun)
+ job.start
+ }
+
+ /**
+ * Walks up the graph looking for the previous LsfJobs for this node
+ */
+ private def sourceLsfJobs(edge: ResourceEdge) : List[LocalLsfJob] = {
+ var sourceJobs = List.empty[LocalLsfJob]
+
+ val source = this.jobGraph.getEdgeSource(edge)
+ for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
+ incomingEdge match {
+
+ // Stop recursing when we find a job along this edge and return it's job id
+ case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec))
+
+ // Throw error for a new edge type that we don't know how to handle
+ case default => throw new QException("Unknown edge type: " + default)
+ }
+ }
+ sourceJobs
+ }
+
+ private def dependencyExpression(jobs: List[LocalLsfJob]) = {
+ jobs.toSet[LocalLsfJob].map(_.getName).mkString("done(\"", "\") && done(\"", "\")")
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala
new file mode 100755
index 000000000..3fda1db03
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala
@@ -0,0 +1,44 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.DirectedGraph
+import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
+import collection.JavaConversions._
+import org.broadinstitute.sting.queue.engine.QCommand
+
+class ExecEdge(val args: Map[String, String], private val command: QCommand) extends ResourceEdge {
+ private var convertedCommandString: String = _
+ def commandString = convertedCommandString
+
+ override def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]) = {
+ // Lookup any variable using the target node, or any of it's input nodes.
+ val sub = new StrSubstitutor(new NodeLookup(graph.getEdgeTarget(this), graph))
+ convertedCommandString = sub.replace(command.commandString)
+ }
+
+ class NodeLookup(private val targetNode: ResourceNode, private val graph: DirectedGraph[ResourceNode, ResourceEdge]) extends StrLookup {
+
+ def lookup(key: String) = {
+ var value: String = null
+ if (args.contains(key))
+ value = args(key)
+ else
+ value = lookup(key, targetNode)
+ value
+ }
+
+ private def lookup(key: String, node: ResourceNode): String = {
+ var value: String = null
+ if (node.resources.contains(key)) {
+ value = node.resources(key)
+ } else {
+ for (edge <- graph.incomingEdgesOf(node)) {
+ lookup(key, graph.getEdgeSource(edge)) match {
+ case null => {}
+ case found => value = found
+ }
+ }
+ }
+ value
+ }
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala
new file mode 100755
index 000000000..c6a1e30a9
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala
@@ -0,0 +1,10 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.graph.SimpleDirectedGraph
+import org.broadinstitute.sting.queue.util.Logging
+
+abstract class JobScheduler(protected val jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge])
+ extends Logging {
+ def runJobs
+ def numJobs = jobGraph.edgeSet.size
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala
new file mode 100755
index 000000000..a7d58e503
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala
@@ -0,0 +1,7 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.DirectedGraph
+
+abstract class ResourceEdge {
+ def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]): Unit
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala
new file mode 100755
index 000000000..0a61a40f1
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala
@@ -0,0 +1,8 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder}
+
+class ResourceNode(val resources: Map[String,String]) {
+ override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
+ override def hashCode = HashCodeBuilder.reflectionHashCode(this)
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala
new file mode 100755
index 000000000..939f6c336
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala
@@ -0,0 +1,19 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.graph.SimpleDirectedGraph
+import org.broadinstitute.sting.queue.util.ProcessUtils
+import org.broadinstitute.sting.queue.engine.Pipeline
+
+/**
+ * Runs jobs one at a time locally
+ */
+class SimpleJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
+ protected def traversedExec(exec: ExecEdge) = {
+ var commandString = exec.commandString
+ logger.info(commandString)
+
+ // TODO: Pre-print the commands?
+ if (!Pipeline.dryRun)
+ ProcessUtils.runCommandAndWait(commandString, Map.empty[String, String])
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala
new file mode 100755
index 000000000..65003052f
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala
@@ -0,0 +1,36 @@
+package org.broadinstitute.sting.queue.engine.scheduling
+
+import org.jgrapht.graph.SimpleDirectedGraph
+import org.jgrapht.traverse.TopologicalOrderIterator
+import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
+import collection.JavaConversions._
+
+/**
+ * Loops over the job graph running jobs as the edges are traversed
+ */
+abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends JobScheduler(jobGraph) {
+
+ protected val iterator = new TopologicalOrderIterator(this.jobGraph)
+
+ iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
+ override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = {
+ traversed(event.getEdge)
+ }
+ })
+
+ override def runJobs = {
+ logger.info(String.format("Running %s jobs.", this.numJobs.toString))
+ for (target <- iterator) {
+ // Do nothing for now, let event handler respond
+ }
+ }
+
+ protected def traversed(edge: ResourceEdge) = {
+ edge.traverse(this.jobGraph)
+ edge match {
+ case exec: ExecEdge => traversedExec(exec)
+ }
+ }
+
+ protected def traversedExec(exec: ExecEdge)
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala
new file mode 100755
index 000000000..8b79c9cf1
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/ClasspathUtils.scala
@@ -0,0 +1,18 @@
+package org.broadinstitute.sting.queue.util
+
+import collection.JavaConversions._
+import org.reflections.util.ManifestAwareClasspathHelper
+import java.io.File
+import javax.print.URIException
+import org.apache.commons.lang.StringUtils
+
+/**
+ * Builds the correct class path by examining the manifests
+ */
+object ClasspathUtils {
+ def manifestAwareClassPath = {
+ var urls = ManifestAwareClasspathHelper.getUrlsForManifestCurrentClasspath
+ var files = urls.map(url => try {new File(url.toURI)} catch {case urie: URIException => new File(url.getPath)})
+ files.mkString(File.pathSeparator)
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala
new file mode 100755
index 000000000..48a989614
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala
@@ -0,0 +1,31 @@
+package org.broadinstitute.sting.queue.util
+
+import org.apache.log4j._
+
+/**
+ * A mixin to add logging to a class
+ */
+trait Logging {
+ val className = this.getClass.getName
+ lazy val logger = configuredLogger
+
+ def configuredLogger = {
+ Logging.configureLogging
+ Logger.getLogger(className)
+ }
+}
+
+object Logging {
+ private var configured = false
+ private var isDebug = false
+ def configureLogging() {
+ if (!configured) {
+ var root = Logger.getRootLogger
+ root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} %C{1} - %m %n")))
+ root.setLevel(if(isDebug) Level.DEBUG else Level.INFO)
+ configured = true
+ }
+ }
+
+ def enableDebug = {isDebug = true; Logger.getRootLogger.setLevel(Level.DEBUG)}
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala
new file mode 100755
index 000000000..31cea807d
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala
@@ -0,0 +1,46 @@
+package org.broadinstitute.sting.queue.util
+
+import org.broadinstitute.sting.utils.text.XReadLines
+import collection.mutable.ListBuffer
+import collection.JavaConversions._
+
+object ProcessUtils extends Logging {
+
+ Runtime.getRuntime.addShutdownHook(new Thread {
+ override def run = for (process <- running.clone) {
+ logger.warn("Killing: " + process)
+ process.destroy
+ }
+ })
+
+ val running = new ListBuffer[Process]()
+
+ def runCommandAndWait(command: String, environment: Map[String, String]) = {
+ logger.debug("Running command: " + command)
+
+ var builder = new ProcessBuilder("sh", "-c", command)
+ for ((key, value) <- environment) {
+ logger.debug(String.format("adding env: %s = %s", key, value))
+ builder.environment.put(key, value)
+ }
+
+ var process = builder.start
+ running += process
+ var result = process.waitFor
+ running -= process
+
+ if (logger.isDebugEnabled) {
+ for (line <- new XReadLines(process.getInputStream).iterator) {
+ logger.debug("command: " + line)
+ }
+
+ for (line <- new XReadLines(process.getErrorStream).iterator) {
+ logger.error("command: " + line)
+ }
+ }
+
+ logger.debug("Command exited with result: " + result)
+
+ result
+ }
+}
diff --git a/settings/repository/edu.mit.broad/broad-core-all-2.8.jar b/settings/repository/edu.mit.broad/broad-core-all-2.8.jar
new file mode 100644
index 000000000..715288886
Binary files /dev/null and b/settings/repository/edu.mit.broad/broad-core-all-2.8.jar differ
diff --git a/settings/repository/edu.mit.broad/broad-core-all-2.8.xml b/settings/repository/edu.mit.broad/broad-core-all-2.8.xml
new file mode 100644
index 000000000..7e7b31e80
--- /dev/null
+++ b/settings/repository/edu.mit.broad/broad-core-all-2.8.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+