diff --git a/ivy.xml b/ivy.xml
index f0f0498ec..433bd5ef5 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -42,8 +42,8 @@
-
-
+
+
diff --git a/java/src/org/broadinstitute/sting/queue/util/ClassType.java b/java/src/org/broadinstitute/sting/queue/util/ClassType.java
new file mode 100644
index 000000000..ae8bcdaef
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/ClassType.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies the type of an input our output field.
+ * Retains it during runtime to work around type erasure.
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface ClassType {
+ Class value();
+}
diff --git a/java/src/org/broadinstitute/sting/queue/util/Input.java b/java/src/org/broadinstitute/sting/queue/util/Input.java
new file mode 100644
index 000000000..15fa28cb7
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Input.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies an input to a QueueFunction
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Input {
+}
diff --git a/java/src/org/broadinstitute/sting/queue/util/Internal.java b/java/src/org/broadinstitute/sting/queue/util/Internal.java
new file mode 100644
index 000000000..842cc9d1a
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Internal.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies an internal setting for a QueueFunction.
+ * Not an input or output but should be copied with a function.
+ * Internals should have default values that should be handled, i.e. they are always @Optional
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Internal {
+}
diff --git a/java/src/org/broadinstitute/sting/queue/util/Optional.java b/java/src/org/broadinstitute/sting/queue/util/Optional.java
new file mode 100644
index 000000000..e31df349b
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Optional.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies an input or output to a QueueFunction is optional
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Optional {
+}
diff --git a/java/src/org/broadinstitute/sting/queue/util/Output.java b/java/src/org/broadinstitute/sting/queue/util/Output.java
new file mode 100644
index 000000000..5b8c5c46d
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Output.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies an output to a QueueFunction
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Output {
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
index d17e7ab4a..b0f091b90 100755
--- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
@@ -8,13 +8,13 @@ import java.io.{FileInputStream, File}
import java.util.Properties
class QArguments(args: Array[String]) {
- var useBsub = false
+ var bsubAllJobs = false
var dryRun = false
val scripts = new ListBuffer[String]
var inputPaths = List.empty[File]
var argMap = Map.empty[String, String]
- parseArgs(args)
+ val userArgs = parseArgs(args)
private def parseArgs(args: Array[String]) = {
var filtered = new ListBuffer[String]
@@ -26,13 +26,15 @@ class QArguments(args: Array[String]) {
if (isFlagged(filtered, "-dry"))
dryRun = true
if (isFlagged(filtered, "-bsub"))
- useBsub = true
+ bsubAllJobs = true
for (arg <- getArgs(filtered, "-P"))
addArg(arg)
for (arg <- getArgs(filtered, "-I"))
addFile(arg)
for (arg <- getArgs(filtered, "-S"))
scripts.append(arg)
+
+ List(filtered:_*)
}
private def isFlagged(filtered: ListBuffer[String], search: String) = {
@@ -65,22 +67,24 @@ class QArguments(args: Array[String]) {
var file = new File(arg)
if (arg.contains("=") && !file.exists) {
val tokens = arg.split("=", 2)
- argMap = argMap.updated(tokens(0), tokens(1))
- } else if (file.exists && arg.endsWith(".properties")) {
+ argMap += tokens(0) -> tokens(1)
+ } else if (arg.endsWith(".properties")) {
+ if (!file.exists)
+ throw new QException("File not found: " + file.getAbsolutePath)
var props = new Properties
props.load(new FileInputStream(file))
for ((name, value) <- props)
- argMap = argMap.updated(name, value)
+ argMap += name -> value
+ } else {
+ throw new QException("Invalid property: " + arg)
}
}
def addFile(arg: String): Unit = {
var file = new File(arg)
- if (arg.endsWith(".list")) {
+ inputPaths :+= file
+ if (arg.endsWith(".list"))
new XReadLines(file).iterator.foreach(addFile(_))
- } else {
- inputPaths = inputPaths ::: List(file)
- }
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
index c9e9ea694..79f030070 100755
--- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala
@@ -13,6 +13,7 @@ object QCommandLine extends Application with Logging {
new QArguments(args)
} catch {
case exception => {
+ println(exception)
println(usage)
System.exit(-1)
}
@@ -34,20 +35,13 @@ object QCommandLine extends Application with Logging {
System.exit(-1)
}
- if (qArgs.inputPaths.size == 0) {
- println("Error: No inputs specified")
- println(usage)
- System.exit(-1)
- }
-
-
val newArgs = new ListBuffer[String]
newArgs.appendAll(args)
QArguments.strip(newArgs, "-S")
newArgs.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, qArgs.scripts.head)
MainGenericRunner.main(newArgs.toArray)
- // NOTE: This line is not reached because something in MainGenericRunner is exiting the VM.
+ // NOTE: This line is not reached because the MainGenericRunner exits the VM.
logger.debug("exiting")
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala
new file mode 100755
index 000000000..d466c85f4
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala
@@ -0,0 +1,103 @@
+package org.broadinstitute.sting.queue
+
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+import org.broadinstitute.sting.queue.engine.QGraph
+
+/**
+ * Syntactic sugar for filling in a pipeline using a Scala script.
+ */
+object QScript {
+ // Type aliases so users don't have to import
+ type File = java.io.File
+ type Input = org.broadinstitute.sting.queue.util.Input
+ type Output = org.broadinstitute.sting.queue.util.Output
+ type Optional = org.broadinstitute.sting.queue.util.Optional
+ type ClassType = org.broadinstitute.sting.queue.util.ClassType
+ type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
+ type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction
+
+ // The arguments for executing pipelines
+ private var qArgs: QArguments = _
+
+ // A default pipeline. Can also use multiple 'new Pipeline()'
+ private val pipeline = new Pipeline
+
+ /**
+ * Initializes the QArguments and returns a list of the rest of the user args.
+ */
+ def setArgs(params: Array[String]) = {
+ qArgs = new QArguments(params)
+ qArgs.userArgs
+ }
+
+ /**
+ * Returns a list of files that were specified with "-I " on the command line
+ * or inside a .list file.
+ */
+ def inputs(extension: String) = qArgs.inputPaths.filter(_.getName.endsWith(extension))
+
+ /**
+ * Exchanges the extension on a file.
+ */
+ def swapExt(file: File, oldExtension: String, newExtension: String) =
+ new File(file.getName.stripSuffix(oldExtension) + newExtension)
+
+ /**
+ * Adds one or more command line functions for dispatch later during run()
+ */
+ def add(functions: CommandLineFunction*) = pipeline.add(functions:_*)
+
+ /**
+ * Sets the @Input and @Output values for all the functions
+ */
+ def setParams(): Unit = pipeline.setParams()
+
+ /**
+ * Sets the @Input and @Output values for a single function
+ */
+ def setParams(function: CommandLineFunction): Unit = pipeline.setParams(function)
+
+ /**
+ * Executes functions that have been added to the pipeline.
+ */
+ def run() = pipeline.run()
+
+
+ /**
+ * Encapsulates a set of functions to run together.
+ */
+ protected class Pipeline {
+ private var functions = List.empty[CommandLineFunction]
+
+ /**
+ * Adds one or more command line functions for dispatch later during run()
+ */
+ def add(functions: CommandLineFunction*) =
+ this.functions :::= List(functions:_*)
+
+ /**
+ * Sets the @Input and @Output values for all the functions
+ */
+ def setParams(): Unit =
+ for (function <- functions) setParams(function)
+
+ /**
+ * Sets the @Input and @Output values for a single function
+ */
+ def setParams(function: CommandLineFunction): Unit =
+ for ((name, value) <- qArgs.argMap) function.setValue(name, value)
+
+ /**
+ * Executes functions that have been added to the pipeline.
+ */
+ def run() = {
+ val qGraph = new QGraph
+ qGraph.dryRun = qArgs.dryRun
+ qGraph.bsubAllJobs = qArgs.bsubAllJobs
+ for (function <- functions)
+ qGraph.add(function)
+ qGraph.fillIn
+ qGraph.run
+ }
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala
new file mode 100755
index 000000000..318624d3c
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala
@@ -0,0 +1,17 @@
+package org.broadinstitute.sting.queue.engine
+
+import org.broadinstitute.sting.queue.util.{Logging, ProcessUtils}
+import org.broadinstitute.sting.queue.function.CommandLineFunction
+
+/**
+ * Runs jobs one at a time locally
+ */
+trait CommandLineRunner extends Logging {
+ def run(function: CommandLineFunction, qGraph: QGraph) = {
+ var commandLine = function.commandLine
+ logger.info(commandLine)
+
+ if (!qGraph.dryRun)
+ ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory)
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala
new file mode 100755
index 000000000..8966945b1
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala
@@ -0,0 +1,56 @@
+package org.broadinstitute.sting.queue.engine
+
+import edu.mit.broad.core.lsf.LocalLsfJob
+import collection.JavaConversions._
+import management.ManagementFactory
+import java.io.File
+import java.util.ArrayList
+import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction}
+
+trait DispatchJobRunner {
+ type DispatchJobType
+ private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType]
+
+ protected def newJobName = DispatchJobRunner.nextJobName
+
+ def dispatch(function: DispatchFunction, qGraph: QGraph)
+
+ protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) =
+ dispatchJobs += function -> dispatchJob
+
+ /**
+ * Walks up the graph looking for the previous LsfJobs
+ */
+ protected def previousJobs(function: QFunction, qGraph: QGraph) : List[DispatchJobType] = {
+ var previous = List.empty[DispatchJobType]
+
+ val source = qGraph.jobGraph.getEdgeSource(function)
+ for (incomingEdge <- qGraph.jobGraph.incomingEdgesOf(source)) {
+ incomingEdge match {
+
+ // Stop recursing when we find a job along the edge and return its job id
+ case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction)
+
+ // For any other type of edge find the LSF jobs preceding the edge
+ case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph)
+ }
+ }
+ previous
+ }
+}
+
+object DispatchJobRunner {
+ private val jobNamePrefix = "Q-" + {
+ var prefix = ManagementFactory.getRuntimeMXBean.getName
+ val index = prefix.indexOf(".")
+ if (index >= 0)
+ prefix = prefix.substring(0, index)
+ prefix
+ }
+ private var jobIndex = 0
+
+ private def nextJobName = {
+ jobIndex += 1
+ jobNamePrefix + "-" + jobIndex
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
new file mode 100644
index 000000000..edfcaf179
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
@@ -0,0 +1,57 @@
+package org.broadinstitute.sting.queue.engine
+
+import collection.JavaConversions._
+import edu.mit.broad.core.lsf.LocalLsfJob
+import org.broadinstitute.sting.queue.function.DispatchFunction
+import java.io.File
+import java.util.ArrayList
+import org.broadinstitute.sting.queue.util.Logging
+
+trait LsfJobRunner extends DispatchJobRunner with Logging {
+ type DispatchJobType = LocalLsfJob
+
+ def dispatch(function: DispatchFunction, qGraph: QGraph) = {
+ var jobName = function.jobName
+ if (jobName == null)
+ jobName = newJobName
+
+ var jobOutputFile = function.jobOutputFile
+ if (jobOutputFile == null)
+ jobOutputFile = new File(jobName + ".out")
+
+ var jobErrorFile = function.jobErrorFile
+ if (jobErrorFile == null)
+ jobErrorFile = new File(jobName + ".err")
+
+ val job = new LocalLsfJob
+ job.setName(jobName)
+ job.setOutputFile(jobOutputFile)
+ job.setErrFile(jobErrorFile)
+ job.setWorkingDir(function.commandDirectory)
+ job.setProject(function.jobProject)
+ job.setQueue(function.jobQueue)
+ job.setCommand(function.commandLine)
+
+ var extraArgs = List("-r")
+
+ if (function.memoryLimit.isDefined)
+ extraArgs :::= List("-R", "rusage[mem=" + function.memoryLimit.get + "]")
+
+ val previous = previousJobs(function, qGraph)
+ if (previous.size > 0)
+ extraArgs :::= List("-w", dependencyExpression(previous))
+
+ job.setExtraBsubArgs(new ArrayList(extraArgs))
+
+ addJob(function, job)
+
+ logger.info(job.getBsubCommand.mkString(" "))
+
+ if (!qGraph.dryRun)
+ job.start
+ }
+
+ private def dependencyExpression(jobs: List[LocalLsfJob]) = {
+ jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")")
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala
deleted file mode 100755
index fff54849b..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-import graphing.{ExplicitJobGrapher, TreeJobGrapher}
-import scheduling.ResourceNode
-
-/**
- * Syntactic sugar for filling in a pipeline using a Scala script.
- */
-object Pipeline {
- def addRule(rule: (Any, Any), commandString: String): Unit = TreeJobGrapher.addRule(rule, commandString)
- def run(args: Array[String], sources: Any, targets: Any): Unit = TreeJobGrapher.run(args, sources, targets)
-
- def node() = ExplicitJobGrapher.node()
- def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = ExplicitJobGrapher.addEdge(rule, commandString)
- def run(): Unit = ExplicitJobGrapher.run()
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala
deleted file mode 100755
index 1126fea19..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-/**
- * Defines a basic command to run
- * TODO: Allow overriding arguments per command such as the job queue
- */
-class QCommand(val commandString: String) extends QModelEdge {
- override def toString = commandString
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala
deleted file mode 100755
index 27c11b545..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder}
-import java.io.File
-import org.apache.commons.lang.StringUtils
-import org.broadinstitute.sting.queue.QException
-
-/**
- * Represents a file extension along with several tags.
- * TODO: Use the tags to map rules between wildcards, ex: *.vcf -> *.eval
- */
-class QFile(val fileType: String, val parts: String*) {
- val extension = (List(parts:_*) ::: List(fileType)).mkString(".")
- override def toString = extension
- override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
- override def hashCode = HashCodeBuilder.reflectionHashCode(this)
-
- def matchesFile(path: String): Boolean = matchesFile(new File(path))
- def matchesFile(file: File): Boolean = file.getAbsolutePath.endsWith(extension)
- def baseName(path: String): String = baseName(new File(path))
- def baseName(file: File): String = StringUtils.removeEnd(file.getAbsolutePath, extension)
- def fullName(baseName: String) = baseName + extension
-}
-
-object QFile {
- def getFiles(files: Any) : List[QFile] = {
- files match {
- case null => List.empty[QFile]
- case Nil => List.empty[QFile]
- case path: String => List(new QFile(path))
- case file: QFile => List(file)
- // Any List or Tuple add the members to this list
- case product: Product => {
- var list = List.empty[QFile]
- for (fileList <- product.productIterator.toList.map(getFiles(_))) {
- list :::= fileList
- }
- list
- }
- case x => throw new QException("Unknown file type: " + x)
- }
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
new file mode 100755
index 000000000..f54a8f0ab
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
@@ -0,0 +1,71 @@
+package org.broadinstitute.sting.queue.engine
+
+import org.jgrapht.graph.SimpleDirectedGraph
+import scala.collection.JavaConversions
+import scala.collection.immutable.ListMap
+import org.broadinstitute.sting.queue.util.Logging
+import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction}
+
+class QGraph extends Logging {
+ var dryRun = true
+ var bsubAllJobs = false
+ val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction])
+ def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
+
+ def add(command: CommandLineFunction) {
+ add(command, true)
+ }
+
+ /**
+ * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs.
+ */
+ def fillIn = {
+ // clone since edgeSet is backed by the graph
+ for (function <- JavaConversions.asSet(jobGraph.edgeSet).clone) {
+ val inputs = function.inputs
+ val outputs = function.outputs
+
+ if (inputs.size > 1)
+ for ((name, input) <- inputs)
+ addNullEdge(ListMap(name -> input), inputs)
+
+ if (outputs.size > 1)
+ for ((name, output) <- outputs)
+ addNullEdge(outputs, ListMap(name -> output))
+ }
+ }
+
+ def run = {
+ var isReady = true
+ for (function <- JavaConversions.asSet(jobGraph.edgeSet)) {
+ val missingValues = function.missingValues
+ if (missingValues.size > 0) {
+ isReady = false
+ logger.error(function match {
+ case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine)
+ case x => "Missing values:"
+ })
+ for (missing <- missingValues) {
+ logger.error(" " + missing)
+ }
+ }
+ }
+
+ if (isReady || this.dryRun)
+ (new TopologicalJobScheduler(this) with LsfJobRunner).runJobs
+ }
+
+ private def add(f: QFunction, replace: Boolean) {
+ val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
+ val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
+ jobGraph.addVertex(inputs)
+ jobGraph.addVertex(outputs)
+ if (replace)
+ jobGraph.removeAllEdges(inputs, outputs)
+ jobGraph.addEdge(inputs, outputs, f)
+ }
+
+ private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = {
+ add(new MappingFunction(input, output), false)
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala
deleted file mode 100644
index 580501ef1..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-/**
- * Used for modeling before an edge gets
- * replicated into an actual ResourceEdge
- */
-class QModelEdge
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala
new file mode 100644
index 000000000..15a405d82
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala
@@ -0,0 +1,6 @@
+package org.broadinstitute.sting.queue.engine
+
+/**
+ * Represents a state between QFunctions the directed acyclic QGraph
+ */
+case class QNode (private val items: Set[Any])
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala b/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala
deleted file mode 100755
index a52e3d363..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package org.broadinstitute.sting.queue.engine
-
-class QRule (val inputs: List[QFile], val outputs: List[QFile], var command: QCommand)
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala
new file mode 100755
index 000000000..882f6977c
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala
@@ -0,0 +1,31 @@
+package org.broadinstitute.sting.queue.engine
+
+import org.jgrapht.traverse.TopologicalOrderIterator
+import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
+import collection.JavaConversions._
+import org.broadinstitute.sting.queue.util.Logging
+import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, DispatchFunction, QFunction}
+
+/**
+ * Loops over the job graph running jobs as the edges are traversed
+ */
+abstract class TopologicalJobScheduler(private val qGraph: QGraph)
+ extends CommandLineRunner with DispatchJobRunner with Logging {
+
+ protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph)
+
+ iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
+ override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
+ case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph)
+ case f: CommandLineFunction => run(f, qGraph)
+ case f: MappingFunction => /* do nothing for mapping functions */
+ }
+ })
+
+ def runJobs = {
+ logger.info("Number of jobs: %s".format(this.qGraph.numJobs))
+ for (target <- iterator) {
+ // Do nothing for now, let event handler respond
+ }
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala
deleted file mode 100644
index 4be5e17aa..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.broadinstitute.sting.queue.engine.graphing
-
-import org.broadinstitute.sting.queue.engine.scheduling.{ResourceNode, ExecEdge}
-import org.broadinstitute.sting.queue.engine.QCommand
-
-class ExplicitJobGrapher extends JobGrapher
-
-object ExplicitJobGrapher {
- private val grapher = new ExplicitJobGrapher
-
- def node() = new ResourceNode
-
- def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = {
- addEdge(rule._1, rule._2, new QCommand(commandString))
- }
-
- private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = {
- val resourceEdge = new ExecEdge(command.commandString)
- grapher.jobGraph.addVertex(source)
- grapher.jobGraph.addVertex(target)
- grapher.jobGraph.addEdge(source, target, resourceEdge)
- }
-
- def run() = {
- grapher.run()
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala
deleted file mode 100755
index 4fc1c0cc5..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.broadinstitute.sting.queue.engine.graphing
-
-import org.broadinstitute.sting.queue.QArguments
-import org.broadinstitute.sting.queue.engine.scheduling._
-import org.broadinstitute.sting.queue.util.Logging
-import org.jgrapht.graph.SimpleDirectedGraph
-
-abstract class JobGrapher() extends Logging {
- /**
- * Jobs to be run.
- * Can be populated adhoc or during createJobGraph()
- */
- protected val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
-
- var qArgs: QArguments = _
-
- def run() = {
- createJobGraph()
- val scheduler = createScheduler()
- scheduler.runJobs
- }
-
- protected def createJobGraph() = {}
-
- private def createScheduler() : JobScheduler = {
- qArgs.useBsub match {
- case false => new SimpleJobScheduler(jobGraph, qArgs)
- case true => new DispatchJobScheduler(jobGraph, qArgs)
- }
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala
deleted file mode 100644
index 55b77b236..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.broadinstitute.sting.queue.engine.graphing
-
-import org.jgrapht.graph.SimpleDirectedGraph
-import org.jgrapht.alg.BellmanFordShortestPath
-import org.broadinstitute.sting.queue.{QArguments, QException}
-import collection.mutable.ListBuffer
-import collection.JavaConversions._
-import org.broadinstitute.sting.queue.engine.scheduling.{ResourceEdge, ExecEdge, MapResourceNode}
-import org.broadinstitute.sting.queue.engine.{QModelEdge, QCommand, QFile, QRule}
-
-/**
- * Converts a set of rules provided by the user and a list of files into a graph of jobs to run
- */
-class TreeJobGrapher extends JobGrapher {
- private val modelGraph = new SimpleDirectedGraph[List[QFile], QModelEdge](classOf[QModelEdge])
-
- private val rules = new ListBuffer[QRule]
- private var sourceFiles = List.empty[QFile]
- private var targetFiles = List.empty[QFile]
-
- // Used to tag a model edge for Element <-> List
- private class QCollectionEdge extends QModelEdge
-
- override protected def createJobGraph() = {
- createModelGraph()
-
- var missingPaths = List.empty[Tuple2[QFile,QFile]]
- for (sourceFile <- sourceFiles) {
- for (targetFile <- targetFiles) {
- var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, List(sourceFile), List(targetFile))
- if (shortestPath == null)
- missingPaths = missingPaths ::: List((sourceFile, targetFile))
- else
- addPaths(shortestPath, qArgs)
- }
- }
-
- for ((sourceFile, targetFile) <- missingPaths) {
- logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
- }
-
- if (missingPaths.size > 0)
- throw new QException("Not all inputs and outputs found in the pipeline graph")
- }
-
- private def createModelGraph() = {
-
- // Look for rules with more than one input or output and add
- // internal dependencies between the elements and the list
- for (rule <- rules) {
- if (rule.inputs.size > 1) {
- for (input <- rule.inputs) {
- modelGraph.addVertex(List(input))
- modelGraph.addVertex(rule.inputs)
- modelGraph.addEdge(List(input), rule.inputs, new QCollectionEdge)
- }
- }
- if (rule.outputs.size > 1) {
- for (output <- rule.outputs) {
- modelGraph.addVertex(rule.outputs)
- modelGraph.addVertex(List(output))
- modelGraph.addEdge(rule.outputs, List(output), new QCollectionEdge)
- }
- }
- }
-
- // Add the explicit rules
- for (rule <- rules) {
- modelGraph.addVertex(rule.inputs)
- modelGraph.addVertex(rule.outputs)
- modelGraph.addEdge(rule.inputs, rule.outputs, rule.command)
- }
- }
-
- private def addPaths(shortestPath: java.util.List[QModelEdge], qArgs: QArguments) {
- for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) {
- val source = modelGraph.getEdgeSource(shortestPath.head).head
- if (source.matchesFile(inputFile)) {
- val baseName = source.baseName(inputFile)
- val target = modelGraph.getEdgeTarget(shortestPath.last).head
- addPathsToTarget(baseName, List(target))
- }
- }
- }
-
- private def addPathsToTarget(baseName: String, targets: List[QFile]) : Unit = {
- for (command <- modelGraph.incomingEdgesOf(targets)) {
- val sources = modelGraph.getEdgeSource(command)
- addJobGraphEdge(baseName, sources, targets, command)
- addPathsToTarget(baseName, sources)
- }
- }
-
- private def addJobGraphEdge(baseName: String, sources: List[QFile], targets: List[QFile], command: QModelEdge) {
- val resourceSource = new MapResourceNode(mapFiles(baseName, sources))
- val resourceTarget = new MapResourceNode(mapFiles(baseName, targets))
- val resourceEdge = command match {
- case qCommand: QCommand => new ExecEdge(qCommand.commandString)
- case qTransition: QCollectionEdge => new ResourceEdge
- }
- jobGraph.addVertex(resourceSource)
- jobGraph.addVertex(resourceTarget)
- jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
- }
-
- /**
- * Creates a mapping of the files based on the baseName.
- * key: file extension
- * value: full name
- * Used by the JobScheduler to lookup values as the commands are expanded at exec time.
- */
- private def mapFiles(baseName: String, files: List[QFile]) : Map[String, String] = {
- Map(files.map(file => (file.extension, file.fullName(baseName))):_*)
- }
-}
-
-/**
- * Syntactic sugar for filling in a pipeline using a Scala script.
- */
-object TreeJobGrapher {
- private val grapher = new TreeJobGrapher
-
- /**
- * Sugar that allows addRule( inputs -> outputs, command )
- */
- def addRule(rule: (Any, Any), commandString: String): Unit = {
- val inputs = QFile.getFiles(rule._1)
- val outputs = QFile.getFiles(rule._2)
- val command = new QCommand(commandString)
- addRule(inputs, outputs, command)
- }
-
- private def addRule(inputs: List[QFile], outputs: List[QFile], command: QCommand): Unit = {
- grapher.rules += new QRule(inputs, outputs, command)
- }
-
- def run(args: Array[String], sources: Any, targets: Any) = {
- grapher.qArgs = new QArguments(args)
- grapher.sourceFiles = QFile.getFiles(sources)
- grapher.targetFiles = QFile.getFiles(targets)
- grapher.run()
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala
deleted file mode 100755
index fb7450d9c..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.jgrapht.DirectedGraph
-import edu.mit.broad.core.lsf.LocalLsfJob
-import collection.JavaConversions._
-import management.ManagementFactory
-import java.io.File
-import java.util.ArrayList
-import org.broadinstitute.sting.queue.QArguments
-
-/**
- * Dispatches jobs to LSF and then returns.
- */
-class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
- extends TopologicalJobScheduler(jobGraph, qArgs) {
- private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob]
- private var lsfJobIndex = 0
- private val jvmName = ManagementFactory.getRuntimeMXBean.getName
- private val jobNamePrefix = "Q-" + jvmName
-
- def processExec(exec: ExecEdge) = {
- lsfJobIndex += 1
- val job = new LocalLsfJob
- val jobName = jobNamePrefix + "-" + lsfJobIndex
- val outputFile = jobName + ".out"
- val errorFile = jobName + ".err"
- val workingDir = lookup(exec, "jobWorkingDir", ".")
- val lsfProject = lookup(exec, "jobProject", "Queue")
- val queue = lookup(exec, "jobQueue", "broad")
- val memory = lookup(exec, "jobMemory", "2")
-
- var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]")
-
- val sourceJobs = sourceLsfJobs(exec)
- if (sourceJobs.size > 0) {
- extraArgs :::= List("-w", dependencyExpression(sourceJobs))
- }
- job.setName(jobName)
- job.setExtraBsubArgs(new ArrayList(extraArgs))
- job.setProject(lsfProject)
- job.setWorkingDir(new File(workingDir))
- job.setProject(lsfProject)
- job.setCommand(exec.commandString)
- job.setOutputFile(new File(workingDir, outputFile))
- job.setErrFile(new File(workingDir, errorFile))
- job.setQueue(queue)
-
- lsfJobs = lsfJobs.updated(exec, job)
-
- logger.info(job.getBsubCommand.mkString(" "))
-
- if (!qArgs.dryRun)
- job.start
- }
-
- /**
- * Walks up the graph looking for the previous LsfJobs for this node
- */
- private def sourceLsfJobs(edge: ResourceEdge) : List[LocalLsfJob] = {
- var sourceJobs = List.empty[LocalLsfJob]
-
- val source = this.jobGraph.getEdgeSource(edge)
- for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
- incomingEdge match {
-
- // Stop recursing when we find a job along this edge and return it's job id
- case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec))
-
- // For any other type of edge find the LSF jobs preceeding the edge
- case resourceEdge: ResourceEdge => sourceJobs :::= sourceLsfJobs(resourceEdge)
- }
- }
- sourceJobs
- }
-
- private def dependencyExpression(jobs: List[LocalLsfJob]) = {
- jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")")
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala
deleted file mode 100755
index ac09bf51f..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
-import java.lang.String
-
-class ExecEdge(private val templateCommandString: String)
- extends ResourceEdge {
- private var convertedCommandString: String = _
- def commandString = convertedCommandString
-
- override def traverse(graph: JobScheduler) = {
- // Lookup any variable using the target node, or any of it's input nodes.
- val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) })
- convertedCommandString = sub.replace(templateCommandString)
- graph.processExec(this)
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala
deleted file mode 100755
index c3a92b2f7..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.jgrapht.DirectedGraph
-import org.broadinstitute.sting.queue.util.Logging
-import collection.JavaConversions._
-import org.broadinstitute.sting.queue.QArguments
-
-abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
- protected val qArgs: QArguments) extends Logging {
-
- private var missingKeys = Set.empty[String]
-
- def runJobs
- def numJobs = jobGraph.edgeSet.size
-
- def processExec(exec: ExecEdge) : Unit
-
- /**
- * Emulates storing of properties per node by looking up values on
- * the current edge/target-node or any preceding nodes in the graph.
- */
- def lookup(edge: ResourceEdge, key: String, default: String) : String = {
- val value = lookupRecursive(edge, key) match {
- case Some(value) => value
- case None => qArgs.argMap.getOrElse(key, default)
- }
- if (value == null)
- missingKeys = missingKeys ++ Set(key)
- value
- }
-
- protected def logMissingKeys = {
- if (qArgs.dryRun && !missingKeys.isEmpty) {
- logger.warn("Missing keys:")
- for (key <- missingKeys)
- logger.warn(" ${" + key + "}")
- }
- }
-
- private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = {
- var value = edge.lookup(key)
- if (value.isDefined)
- return value
-
- value = this.jobGraph.getEdgeTarget(edge).lookup(key)
- if (value.isDefined)
- return value
-
- return lookupRecursive(this.jobGraph.getEdgeSource(edge), key)
- }
-
- private def lookupRecursive(node: ResourceNode, key: String) : Option[String] = {
- var value = node.lookup(key)
- if (value.isDefined)
- return value
-
- for (edge <- this.jobGraph.incomingEdgesOf(node)) {
- value = lookupRecursive(edge, key)
- if (value.isDefined)
- return value
- }
-
- return None
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala
deleted file mode 100644
index 59155a610..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-class MapResourceNode (private val resources: Map[String,String]) extends ResourceNode {
- override def lookup(key: String) : Option[String] = this.resources.get(key)
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala
deleted file mode 100755
index a06ca9871..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-class ResourceEdge {
- def traverse(graph: JobScheduler): Unit = {}
- def lookup(key: String) : Option[String] = None
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala
deleted file mode 100755
index 475224ef8..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder}
-
-class ResourceNode() {
- override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
- override def hashCode = HashCodeBuilder.reflectionHashCode(this)
- def lookup(key: String) : Option[String] = None
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala
deleted file mode 100755
index 6c1ea43ca..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.broadinstitute.sting.queue.util.ProcessUtils
-import org.broadinstitute.sting.queue.QArguments
-import org.jgrapht.DirectedGraph
-
-/**
- * Runs jobs one at a time locally
- */
-class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
- extends TopologicalJobScheduler(jobGraph, qArgs) {
- def processExec(exec: ExecEdge) = {
- var commandString = exec.commandString
- logger.info(commandString)
-
- if (!qArgs.dryRun)
- ProcessUtils.runCommandAndWait(commandString)
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala
deleted file mode 100755
index 3689ebd6e..000000000
--- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.broadinstitute.sting.queue.engine.scheduling
-
-import org.jgrapht.DirectedGraph
-import org.jgrapht.traverse.TopologicalOrderIterator
-import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
-import collection.JavaConversions._
-import org.broadinstitute.sting.queue.QArguments
-
-/**
- * Loops over the job graph running jobs as the edges are traversed
- */
-abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
- extends JobScheduler(jobGraph, qArgs) {
-
- protected val iterator = new TopologicalOrderIterator(this.jobGraph)
-
- iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
- override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) =
- event.getEdge.traverse(TopologicalJobScheduler.this)
- })
-
- override def runJobs = {
- logger.info(String.format("Running %s jobs.", this.numJobs.toString))
- for (target <- iterator) {
- // Do nothing for now, let event handler respond
- }
- logMissingKeys
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
new file mode 100644
index 000000000..4288cf8b7
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
@@ -0,0 +1,92 @@
+package org.broadinstitute.sting.queue.function
+
+import java.io.File
+import org.broadinstitute.sting.queue.util._
+import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph}
+
+trait CommandLineFunction extends QFunction with DispatchFunction {
+
+ /**
+ * The command line to run locally or via grid computing.
+ */
+ def commandLine: String
+
+ /**
+ * The directory where the command should run.
+ */
+ @Internal
+ var commandDirectory: File = new File(".")
+
+ /**
+ * Repeats parameters with a prefix/suffix if they are set otherwise returns "".
+ * Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
+ */
+ protected def repeat(prefix: String, params: Seq[_], suffix: String = "", separator: String = "") =
+ params.filter(param => hasValue(param)).map(param => prefix + toValue(param) + suffix).mkString(separator)
+
+ /**
+ * Returns parameter with a prefix/suffix if it is set otherwise returns "".
+ * Does not output null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
+ */
+ protected def optional(prefix: String, param: Any, suffix: String = "") =
+ if (hasValue(param)) prefix + toValue(param) + suffix else ""
+
+ /**
+ * Sets a field value using the name of the field.
+ * Field must be annotated with @Input, @Output, or @Internal
+ * @returns true if the value was found and set
+ */
+ def setValue(name: String, value: String) = {
+ ReflectionUtils.getField(this, name) match {
+ case Some(field) =>
+ val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
+ val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
+ val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
+ if (isInput || isOutput || isInternal) {
+ ReflectionUtils.setValue(this, field, value)
+ }
+ true
+ case None => false
+ }
+ }
+
+ private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
+ private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal])
+ def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input])
+ def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output])
+
+ override def missingValues = {
+ val missingInputs = missingFields(inputs)
+ val missingOutputs = missingFields(outputs)
+ missingInputs | missingOutputs
+ }
+
+ private def missingFields(fields: Map[String, Any]) = {
+ var missing = Set.empty[String]
+ for ((name, value) <- fields) {
+ val isOptional = ReflectionUtils.getField(this, name) match {
+ case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional])
+ case None => false
+ }
+ if (!isOptional)
+ if (!hasValue(value))
+ missing += name
+ }
+ missing
+ }
+
+ private def hasValue(param: Any) = param match {
+ case null => false
+ case Nil => false
+ case None => false
+ case _ => true
+ }
+
+ private def toValue(param: Any): String = param match {
+ case null => ""
+ case Nil => ""
+ case None => ""
+ case Some(x) => x.toString
+ case x => x.toString
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala
new file mode 100644
index 000000000..024cee4f5
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala
@@ -0,0 +1,19 @@
+package org.broadinstitute.sting.queue.function
+
+import java.io.File
+import org.broadinstitute.sting.queue.util.Internal
+
+trait DispatchFunction extends QFunction with MemoryLimitedFunction {
+ def commandLine: String
+ var commandDirectory: File
+
+ var jobName: String = _
+ var jobOutputFile: File = _
+ var jobErrorFile: File = _
+
+ @Internal
+ var jobProject = "Queue"
+
+ @Internal
+ var jobQueue = "broad"
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala
new file mode 100644
index 000000000..06ba9f3e8
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala
@@ -0,0 +1,6 @@
+package org.broadinstitute.sting.queue.function
+
+trait IntervalFunction {
+ type Intervals = String
+ var intervals: Intervals
+}
\ No newline at end of file
diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala
new file mode 100644
index 000000000..509c7cad0
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala
@@ -0,0 +1,14 @@
+package org.broadinstitute.sting.queue.function
+
+import org.broadinstitute.sting.queue.engine.QGraph
+import scala.collection.immutable.ListMap
+
+/**
+ * Utility class to map a set of inputs to set of outputs.
+ * The QGraph uses this function internally to return
+ */
+class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction {
+ def inputs = in
+ def outputs = out
+ def run(qGraph: QGraph) = null
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala
new file mode 100644
index 000000000..7f9d98542
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala
@@ -0,0 +1,10 @@
+package org.broadinstitute.sting.queue.function
+
+import org.broadinstitute.sting.queue.util.{Input, Optional, ClassType}
+
+trait MemoryLimitedFunction {
+ @Input
+ @Optional
+ @ClassType(classOf[Int])
+ var memoryLimit: Option[Int] = None
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
new file mode 100644
index 000000000..bef3257d9
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
@@ -0,0 +1,10 @@
+package org.broadinstitute.sting.queue.function
+
+import org.broadinstitute.sting.queue.engine.QGraph
+import scala.collection.immutable.ListMap
+
+trait QFunction {
+ def inputs: ListMap[String, Any]
+ def outputs: ListMap[String, Any]
+ def missingValues = Set.empty[String]
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala
new file mode 100644
index 000000000..6d6fb7b8e
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala
@@ -0,0 +1,35 @@
+package org.broadinstitute.sting.queue.function.gatk
+
+import java.io.File
+import org.broadinstitute.sting.queue.util.{ClassType, Input, Optional}
+import org.broadinstitute.sting.queue.function.{MemoryLimitedFunction, IntervalFunction, CommandLineFunction}
+
+trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with IntervalFunction {
+ @Input
+ @Optional
+ var javaTmpDir: String = _
+
+ @Input
+ var gatkJar: String = _
+
+ @Input
+ var referenceFile: String = _
+
+ @Input
+ @Optional
+ @ClassType(classOf[File])
+ var bamFiles: List[File] = Nil
+
+ @Input
+ @Optional
+ var dbsnp: File = _
+
+ @Input
+ @Optional
+ var intervals: Intervals = new Intervals("all")
+
+ protected def gatkCommandLine(walker: String) =
+ "java%s%s -jar %s -T %s -R %s%s%s "
+ .format(optional(" -Xmx", memoryLimit, "g"), optional(" -Djava.io.tmpdir=", javaTmpDir),
+ gatkJar, walker, referenceFile, repeat(" -I ", bamFiles), optional(" -D ", dbsnp), optional(" -L ", intervals))
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala
index 7a6bf3f27..6861a31d6 100755
--- a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala
+++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala
@@ -7,7 +7,7 @@ import org.apache.log4j._
*/
trait Logging {
private val className = this.getClass.getName
- lazy val logger = configuredLogger
+ protected lazy val logger = configuredLogger
def configuredLogger = {
Logging.configureLogging
diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala
index fdb90f733..f79a4f33d 100755
--- a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala
+++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala
@@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.utils.text.XReadLines
import collection.mutable.ListBuffer
import collection.JavaConversions._
+import java.io.File
object ProcessUtils extends Logging {
@@ -15,10 +16,10 @@ object ProcessUtils extends Logging {
val running = new ListBuffer[Process]()
- def runCommandAndWait(command: String) = {
+ def runCommandAndWait(command: String, directory: File) = {
logger.debug("Running command: " + command)
- var builder = new ProcessBuilder("sh", "-c", command)
+ var builder = new ProcessBuilder("sh", "-c", command).directory(directory)
var process = builder.start
running += process
diff --git a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala
new file mode 100644
index 000000000..cfc10f409
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala
@@ -0,0 +1,96 @@
+package org.broadinstitute.sting.queue.util
+
+import java.lang.reflect.Field
+import org.broadinstitute.sting.queue.QException
+import java.lang.annotation.Annotation
+import scala.concurrent.JavaConversions
+import scala.concurrent.JavaConversions._
+import scala.collection.immutable.ListMap
+
+object ReflectionUtils {
+ def getField(obj: AnyRef, name: String) = getAllFields(obj.getClass).find(_.getName == name)
+
+ def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null
+
+ def getFieldsAnnotatedWith(obj: AnyRef, fields: List[Field], annotation: Class[_ <: Annotation]) =
+ ListMap(fields.filter(field => hasAnnotation(field, annotation))
+ .map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*)
+
+ def getAllTypes(clazz: Class[_]) = {
+ var types = List.empty[Class[_]]
+ var c = clazz
+ while (c != null) {
+ types :+= c
+ c = c.getSuperclass
+ }
+ types
+ }
+
+ def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList)
+
+ def setValue(obj: AnyRef, field: Field, value: String) = {
+
+ val getter = fieldGetter(field)
+ val setter = fieldSetter(field)
+
+ if (getter == null)
+ throw new QException("Field may be private? Unable to find getter for field: " + field)
+
+ if (getter == null)
+ throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field)
+
+ if (classOf[Seq[_]].isAssignableFrom(field.getType)) {
+
+ if (!field.isAnnotationPresent(classOf[ClassType]))
+ throw new QException("@ClassType must be specified due to type erasure for field: " + field)
+
+ val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value
+ val typeValue = coerce(fieldType, value)
+
+ var list = getter.invoke(obj).asInstanceOf[Seq[_]]
+ list :+= typeValue
+ setter.invoke(obj, list)
+
+ } else if (classOf[Option[_]].isAssignableFrom(field.getType)) {
+
+ if (!field.isAnnotationPresent(classOf[ClassType]))
+ throw new QException("@ClassType must be specified due to type erasure for field: " + field)
+
+ val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value
+ val typeValue = coerce(fieldType, value)
+
+ setter.invoke(obj, Some(typeValue))
+
+ } else {
+
+ val fieldType = field.getType
+ val typeValue = coerce(fieldType, value)
+
+ setter.invoke(obj, typeValue.asInstanceOf[AnyRef])
+ }
+ }
+
+ private[util] def fieldGetter(field: Field) = field.getDeclaringClass.getMethod(field.getName)
+ private[util] def fieldSetter(field: Field) = field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType)
+
+ private def coerce(clazz: Class[_], value: String) = {
+ if (classOf[String] == clazz) value
+ else if (classOf[Boolean] == clazz) value.toBoolean
+ else if (classOf[Byte] == clazz) value.toByte
+ else if (classOf[Short] == clazz) value.toShort
+ else if (classOf[Int] == clazz) value.toInt
+ else if (classOf[Long] == clazz) value.toLong
+ else if (classOf[Float] == clazz) value.toFloat
+ else if (classOf[Double] == clazz) value.toDouble
+ else if (hasStringConstructor(clazz))
+ clazz.getConstructor(classOf[String]).newInstance(value)
+ else throw new QException("Unable to coerce value '%s' to type '%s'.".format(value, clazz))
+ }
+
+ private def hasStringConstructor(clazz: Class[_]) = {
+ clazz.getConstructors.exists(constructor => {
+ val parameters = constructor.getParameterTypes
+ parameters.size == 1 && parameters.head == classOf[String]
+ })
+ }
+}