diff --git a/build.xml b/build.xml
index bd1c0c925..7a9ffe58b 100644
--- a/build.xml
+++ b/build.xml
@@ -3,9 +3,10 @@
-
+
+
@@ -22,7 +23,7 @@
-
+
@@ -33,7 +34,7 @@
-
+
@@ -51,7 +52,7 @@
-
+
@@ -101,30 +102,31 @@
-
+
+
-
-
+
-
-
+ additionalparam="-build-timestamp "${build.timestamp}" -version-suffix .${build.version} -out ${basedir}/${java.classes}/${resource.file}">
+
-
@@ -136,12 +138,12 @@
-
+
-
+
@@ -153,11 +155,11 @@
-
+
-
+
@@ -167,7 +169,7 @@
-
+
@@ -176,7 +178,7 @@
-
+
@@ -240,14 +242,14 @@
-
+
-
-
-
+
+
+
-
+
@@ -260,8 +262,8 @@
-
-
+
+
@@ -279,7 +281,7 @@
Building Scala...
-
+
@@ -296,34 +298,29 @@
-
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
+
+
Building Queue...
-
+
+
+
+
+
+
@@ -340,9 +337,9 @@
-
+
-
+
@@ -357,14 +354,14 @@
-
+
-
+
-
+
@@ -376,19 +373,19 @@
-
+
-
+
-
+
@@ -401,7 +398,7 @@
-
+
@@ -454,7 +451,7 @@
-
+
diff --git a/ivy.xml b/ivy.xml
index 433bd5ef5..a266b179d 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -42,8 +42,8 @@
-
-
+
+
diff --git a/java/src/org/broadinstitute/sting/queue/util/Gather.java b/java/src/org/broadinstitute/sting/queue/util/Gather.java
new file mode 100644
index 000000000..9928b3ad3
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Gather.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+/**
+ * Specifies the class to gather an output of a QFunction.
+ * Not an input or output but should be copied with a function.
+ * Internals should have default values that should be handled, i.e. they are always @Optional
+ * A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc.
+ * or to name part of a function: farm job name
+ * Written in java because scala doesn't support RetentionPolicy.RUNTIME
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Gather {
+ Class value();
+}
diff --git a/java/src/org/broadinstitute/sting/queue/util/Input.java b/java/src/org/broadinstitute/sting/queue/util/Input.java
index 15fa28cb7..941945fc2 100644
--- a/java/src/org/broadinstitute/sting/queue/util/Input.java
+++ b/java/src/org/broadinstitute/sting/queue/util/Input.java
@@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
- * Specifies an input to a QueueFunction
+ * Specifies an input to a QFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
diff --git a/java/src/org/broadinstitute/sting/queue/util/Internal.java b/java/src/org/broadinstitute/sting/queue/util/Internal.java
index 842cc9d1a..758f28b6d 100644
--- a/java/src/org/broadinstitute/sting/queue/util/Internal.java
+++ b/java/src/org/broadinstitute/sting/queue/util/Internal.java
@@ -27,9 +27,11 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
- * Specifies an internal setting for a QueueFunction.
+ * Specifies an internal setting for a QFunction.
* Not an input or output but should be copied with a function.
* Internals should have default values that should be handled, i.e. they are always @Optional
+ * A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc.
+ * or to name part of a function: farm job name
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
diff --git a/java/src/org/broadinstitute/sting/queue/util/Optional.java b/java/src/org/broadinstitute/sting/queue/util/Optional.java
index e31df349b..07ee36031 100644
--- a/java/src/org/broadinstitute/sting/queue/util/Optional.java
+++ b/java/src/org/broadinstitute/sting/queue/util/Optional.java
@@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
- * Specifies an input or output to a QueueFunction is optional
+ * Specifies an input or output to a QFunction is optional
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
diff --git a/java/src/org/broadinstitute/sting/queue/util/Output.java b/java/src/org/broadinstitute/sting/queue/util/Output.java
index 5b8c5c46d..ca0ba8dac 100644
--- a/java/src/org/broadinstitute/sting/queue/util/Output.java
+++ b/java/src/org/broadinstitute/sting/queue/util/Output.java
@@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
- * Specifies an output to a QueueFunction
+ * Specifies an output to a QFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
diff --git a/java/src/org/broadinstitute/sting/queue/util/Scatter.java b/java/src/org/broadinstitute/sting/queue/util/Scatter.java
new file mode 100644
index 000000000..8314bea9c
--- /dev/null
+++ b/java/src/org/broadinstitute/sting/queue/util/Scatter.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2010, The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.queue.util;
+
+import java.lang.annotation.*;
+
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface Scatter {
+ Class value();
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
index b0f091b90..23dfac88d 100755
--- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala
@@ -21,8 +21,9 @@ class QArguments(args: Array[String]) {
filtered.appendAll(args)
if (isFlagged(filtered, "-debug"))
- Logging.enableDebug
-
+ Logging.setDebug
+ if (isFlagged(filtered, "-trace"))
+ Logging.setTrace
if (isFlagged(filtered, "-dry"))
dryRun = true
if (isFlagged(filtered, "-bsub"))
diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala
index d466c85f4..4151c7802 100755
--- a/scala/src/org/broadinstitute/sting/queue/QScript.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala
@@ -15,6 +15,11 @@ object QScript {
type ClassType = org.broadinstitute.sting.queue.util.ClassType
type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction
+ type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
+ type Scatter = org.broadinstitute.sting.queue.util.Scatter
+ type Gather = org.broadinstitute.sting.queue.util.Gather
+ type BamGatherFunction = org.broadinstitute.sting.queue.function.scattergather.BamGatherFunction
+ type SimpleTextGatherFunction = org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
// The arguments for executing pipelines
private var qArgs: QArguments = _
@@ -85,7 +90,7 @@ object QScript {
* Sets the @Input and @Output values for a single function
*/
def setParams(function: CommandLineFunction): Unit =
- for ((name, value) <- qArgs.argMap) function.setValue(name, value)
+ for ((name, value) <- qArgs.argMap) function.addOrUpdateWithStringValue(name, value)
/**
* Executes functions that have been added to the pipeline.
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala
index 318624d3c..da23d3766 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala
@@ -8,8 +8,11 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction
*/
trait CommandLineRunner extends Logging {
def run(function: CommandLineFunction, qGraph: QGraph) = {
- var commandLine = function.commandLine
- logger.info(commandLine)
+ if (logger.isDebugEnabled) {
+ logger.debug(function.commandDirectory + " > " + function.commandLine)
+ } else {
+ logger.info(function.commandLine)
+ }
if (!qGraph.dryRun)
ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory)
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala
index 8966945b1..5df3d17b0 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala
@@ -1,18 +1,12 @@
package org.broadinstitute.sting.queue.engine
-import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
-import management.ManagementFactory
-import java.io.File
-import java.util.ArrayList
import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction}
trait DispatchJobRunner {
type DispatchJobType
private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType]
- protected def newJobName = DispatchJobRunner.nextJobName
-
def dispatch(function: DispatchFunction, qGraph: QGraph)
protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) =
@@ -32,25 +26,9 @@ trait DispatchJobRunner {
case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction)
// For any other type of edge find the LSF jobs preceding the edge
- case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph)
+ case qFunction: QFunction => previous = previousJobs(qFunction, qGraph) ::: previous
}
}
previous
}
}
-
-object DispatchJobRunner {
- private val jobNamePrefix = "Q-" + {
- var prefix = ManagementFactory.getRuntimeMXBean.getName
- val index = prefix.indexOf(".")
- if (index >= 0)
- prefix = prefix.substring(0, index)
- prefix
- }
- private var jobIndex = 0
-
- private def nextJobName = {
- jobIndex += 1
- jobNamePrefix + "-" + jobIndex
- }
-}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
index edfcaf179..5f29f35f0 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
@@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.engine
import collection.JavaConversions._
import edu.mit.broad.core.lsf.LocalLsfJob
import org.broadinstitute.sting.queue.function.DispatchFunction
-import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.util.Logging
@@ -11,22 +10,10 @@ trait LsfJobRunner extends DispatchJobRunner with Logging {
type DispatchJobType = LocalLsfJob
def dispatch(function: DispatchFunction, qGraph: QGraph) = {
- var jobName = function.jobName
- if (jobName == null)
- jobName = newJobName
-
- var jobOutputFile = function.jobOutputFile
- if (jobOutputFile == null)
- jobOutputFile = new File(jobName + ".out")
-
- var jobErrorFile = function.jobErrorFile
- if (jobErrorFile == null)
- jobErrorFile = new File(jobName + ".err")
-
val job = new LocalLsfJob
- job.setName(jobName)
- job.setOutputFile(jobOutputFile)
- job.setErrFile(jobErrorFile)
+ job.setName(function.jobName)
+ job.setOutputFile(function.jobOutputFile)
+ job.setErrFile(function.jobErrorFile)
job.setWorkingDir(function.commandDirectory)
job.setProject(function.jobProject)
job.setQueue(function.jobQueue)
@@ -45,7 +32,11 @@ trait LsfJobRunner extends DispatchJobRunner with Logging {
addJob(function, job)
- logger.info(job.getBsubCommand.mkString(" "))
+ if (logger.isDebugEnabled) {
+ logger.debug(function.commandDirectory + " > " + job.getBsubCommand.mkString(" "))
+ } else {
+ logger.info(job.getBsubCommand.mkString(" "))
+ }
if (!qGraph.dryRun)
job.start
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
index f54a8f0ab..3c27cfa58 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala
@@ -2,14 +2,19 @@ package org.broadinstitute.sting.queue.engine
import org.jgrapht.graph.SimpleDirectedGraph
import scala.collection.JavaConversions
+import scala.collection.JavaConversions._
import scala.collection.immutable.ListMap
-import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction}
+import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
+import org.broadinstitute.sting.queue.util.{CollectionUtils, Logging}
+import org.broadinstitute.sting.queue.QException
+import org.jgrapht.alg.CycleDetector
+import org.jgrapht.EdgeFactory
class QGraph extends Logging {
var dryRun = true
var bsubAllJobs = false
- val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction])
+ val jobGraph = newGraph
def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
def add(command: CommandLineFunction) {
@@ -25,47 +30,135 @@ class QGraph extends Logging {
val inputs = function.inputs
val outputs = function.outputs
- if (inputs.size > 1)
- for ((name, input) <- inputs)
- addNullEdge(ListMap(name -> input), inputs)
+ for ((name, input) <- inputs) {
+ addCollectionInputs(name, input)
+ if (inputs.size > 1)
+ addMappingEdge(ListMap(name -> input), inputs)
+ }
- if (outputs.size > 1)
- for ((name, output) <- outputs)
- addNullEdge(outputs, ListMap(name -> output))
+ for ((name, output) <- outputs) {
+ addCollectionOutputs(name, output)
+ if (outputs.size > 1)
+ addMappingEdge(outputs, ListMap(name -> output))
+ }
}
+
+ var pruning = true
+ while (pruning) {
+ pruning = false
+ val filler = jobGraph.edgeSet.filter(isFiller(_))
+ if (filler.size > 0) {
+ jobGraph.removeAllEdges(filler)
+ pruning = true
+ }
+ }
+
+ jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_)))
}
def run = {
var isReady = true
for (function <- JavaConversions.asSet(jobGraph.edgeSet)) {
- val missingValues = function.missingValues
- if (missingValues.size > 0) {
- isReady = false
- logger.error(function match {
- case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine)
- case x => "Missing values:"
- })
- for (missing <- missingValues) {
- logger.error(" " + missing)
- }
+ function match {
+ case cmd: CommandLineFunction =>
+ val missingValues = cmd.missingValues
+ if (missingValues.size > 0) {
+ isReady = false
+ logger.error("Missing values for function: %s".format(cmd.commandLine))
+ for (missing <- missingValues)
+ logger.error(" " + missing)
+ }
+ case _ =>
}
}
-
+
+ val detector = new CycleDetector(jobGraph)
+ if (detector.detectCycles) {
+ logger.error("Cycles were detected in the graph:")
+ for (cycle <- detector.findCycles)
+ logger.error(" " + cycle)
+ isReady = false
+ }
+
if (isReady || this.dryRun)
(new TopologicalJobScheduler(this) with LsfJobRunner).runJobs
}
- private def add(f: QFunction, replace: Boolean) {
- val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
- val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
- jobGraph.addVertex(inputs)
- jobGraph.addVertex(outputs)
- if (replace)
- jobGraph.removeAllEdges(inputs, outputs)
- jobGraph.addEdge(inputs, outputs, f)
+ private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] {
+ def createEdge(input: QNode, output: QNode) = new MappingFunction(input.valueMap, output.valueMap)})
+
+ private def add(f: QFunction, replace: Boolean): Unit = {
+ try {
+ f.freeze
+
+ f match {
+ case scatterGather: ScatterGatherableFunction if (bsubAllJobs && scatterGather.scatterGatherable) =>
+ val functions = scatterGather.generateFunctions()
+ if (logger.isTraceEnabled)
+ logger.trace("Scattered into %d parts: %s".format(functions.size, functions))
+ functions.foreach(add(_))
+ case _ =>
+ val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
+ val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
+ val newSource = jobGraph.addVertex(inputs)
+ val newTarget = jobGraph.addVertex(outputs)
+ val removedEdges = if (replace) jobGraph.removeAllEdges(inputs, outputs) else Nil
+ val added = jobGraph.addEdge(inputs, outputs, f)
+ if (logger.isTraceEnabled) {
+ logger.trace("Mapped from: " + inputs)
+ logger.trace("Mapped to: " + outputs)
+ logger.trace("Mapped via: " + f)
+ logger.trace("Removed edges: " + removedEdges)
+ logger.trace("New source?: " + newSource)
+ logger.trace("New target?: " + newTarget)
+ logger.trace("")
+ }
+ }
+ } catch {
+ case e: Exception =>
+ throw new QException("Error adding function: " + f, e)
+ }
}
- private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = {
- add(new MappingFunction(input, output), false)
+ private def addCollectionInputs(name: String, value: Any): Unit = {
+ CollectionUtils.foreach(value, (item, collection) =>
+ addMappingEdge(ListMap(name -> item), ListMap(name -> collection)))
}
+
+ private def addCollectionOutputs(name: String, value: Any): Unit = {
+ CollectionUtils.foreach(value, (item, collection) =>
+ addMappingEdge(ListMap(name -> collection), ListMap(name -> item)))
+ }
+
+ private def addMappingEdge(input: ListMap[String, Any], output: ListMap[String, Any]) =
+ add(new MappingFunction(input, output), false)
+
+ private def isMappingEdge(edge: QFunction) =
+ edge.isInstanceOf[MappingFunction]
+
+ private def isFiller(edge: QFunction) = {
+ if (isMappingEdge(edge)) {
+ val source = jobGraph.getEdgeSource(edge)
+ val target = jobGraph.getEdgeTarget(edge)
+ if (jobGraph.outgoingEdgesOf(target).size == 0 || jobGraph.incomingEdgesOf(source).size == 0)
+ true
+ else if (isLoopback(source) || isLoopback(target))
+ true
+ else false
+ } else false
+ }
+
+ private def isLoopback(node: QNode) = {
+ var loopback = false
+ val incoming = jobGraph.incomingEdgesOf(node)
+ val outgoing = jobGraph.outgoingEdgesOf(node)
+ if (incoming.size == 1 && outgoing.size == 1)
+ if (isMappingEdge(incoming.head) && isMappingEdge(outgoing.head))
+ if (jobGraph.getEdgeSource(incoming.head) == jobGraph.getEdgeTarget(outgoing.head))
+ loopback = true
+ loopback
+ }
+
+ private def isOrphan(node: QNode) =
+ (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala
index 15a405d82..48d74dae1 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala
@@ -1,6 +1,20 @@
package org.broadinstitute.sting.queue.engine
+import scala.collection.immutable.ListMap
+
/**
* Represents a state between QFunctions the directed acyclic QGraph
*/
-case class QNode (private val items: Set[Any])
+case class QNode (private val items: Set[Any]) {
+ /**
+ * Used during QGraph error reporting.
+ * The EdgeFactory uses the valueMap to create new edges for the CycleDetector.
+ */
+ def valueMap = {
+ var map = ListMap.empty[String, Any]
+ for (item <- items)
+ if (item != null)
+ map += item.toString -> item
+ map
+ }
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala
index 882f6977c..3e2c5a46c 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala
@@ -12,20 +12,28 @@ import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunc
abstract class TopologicalJobScheduler(private val qGraph: QGraph)
extends CommandLineRunner with DispatchJobRunner with Logging {
- protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph)
+ protected val iterator = new TopologicalOrderIterator(qGraph.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
- case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph)
+ case f: DispatchFunction if (qGraph.bsubAllJobs) => dispatch(f, qGraph)
case f: CommandLineFunction => run(f, qGraph)
case f: MappingFunction => /* do nothing for mapping functions */
}
})
def runJobs = {
- logger.info("Number of jobs: %s".format(this.qGraph.numJobs))
+ logger.info("Number of jobs: %s".format(qGraph.numJobs))
+ if (logger.isTraceEnabled)
+ logger.trace("Number of nodes: %s".format(qGraph.jobGraph.vertexSet.size))
+ var numNodes = 0
for (target <- iterator) {
+ if (logger.isTraceEnabled)
+ logger.trace("Visiting: " + target)
+ numNodes += 1
// Do nothing for now, let event handler respond
}
+ if (logger.isTraceEnabled)
+ logger.trace("Done walking %s nodes.".format(numNodes))
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
index 4288cf8b7..5b0700fc6 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
@@ -3,20 +3,9 @@ package org.broadinstitute.sting.queue.function
import java.io.File
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph}
+import java.lang.reflect.Field
-trait CommandLineFunction extends QFunction with DispatchFunction {
-
- /**
- * The command line to run locally or via grid computing.
- */
- def commandLine: String
-
- /**
- * The directory where the command should run.
- */
- @Internal
- var commandDirectory: File = new File(".")
-
+trait CommandLineFunction extends InputOutputFunction with DispatchFunction {
/**
* Repeats parameters with a prefix/suffix if they are set otherwise returns "".
* Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
@@ -31,50 +20,25 @@ trait CommandLineFunction extends QFunction with DispatchFunction {
protected def optional(prefix: String, param: Any, suffix: String = "") =
if (hasValue(param)) prefix + toValue(param) + suffix else ""
- /**
- * Sets a field value using the name of the field.
- * Field must be annotated with @Input, @Output, or @Internal
- * @returns true if the value was found and set
- */
- def setValue(name: String, value: String) = {
- ReflectionUtils.getField(this, name) match {
- case Some(field) =>
- val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
- val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
- val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
- if (isInput || isOutput || isInternal) {
- ReflectionUtils.setValue(this, field, value)
- }
- true
- case None => false
- }
- }
-
- private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
- private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal])
- def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input])
- def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output])
-
- override def missingValues = {
- val missingInputs = missingFields(inputs)
- val missingOutputs = missingFields(outputs)
+ def missingValues = {
+ val missingInputs = missingFields(inputFields)
+ val missingOutputs = missingFields(outputFields)
missingInputs | missingOutputs
}
- private def missingFields(fields: Map[String, Any]) = {
+ private def missingFields(fields: List[Field]) = {
var missing = Set.empty[String]
- for ((name, value) <- fields) {
- val isOptional = ReflectionUtils.getField(this, name) match {
- case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional])
- case None => false
- }
+ for (field <- fields) {
+ val isOptional = ReflectionUtils.hasAnnotation(field, classOf[Optional])
if (!isOptional)
- if (!hasValue(value))
- missing += name
+ if (!hasValue(ReflectionUtils.getValue(this, field)))
+ missing += field.getName
}
missing
}
+ protected def hasFieldValue(field: Field) = hasValue(this.getFieldValue(field))
+
private def hasValue(param: Any) = param match {
case null => false
case Nil => false
diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala
index 024cee4f5..8d441ab10 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala
@@ -1,14 +1,35 @@
package org.broadinstitute.sting.queue.function
import java.io.File
-import org.broadinstitute.sting.queue.util.Internal
+import java.lang.management.ManagementFactory
+import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
+import org.broadinstitute.sting.queue.util._
-trait DispatchFunction extends QFunction with MemoryLimitedFunction {
+trait DispatchFunction extends InputOutputFunction {
def commandLine: String
- var commandDirectory: File
+ @Input
+ @Optional
+ var memoryLimit: Option[Int] = None
+
+ /**
+ * The directory where the command should run.
+ */
+ @Internal
+ var commandDirectory: File = IOUtils.CURRENT_DIR
+
+ @Internal
+ var jobNamePrefix: String = _
+
+ @Internal
var jobName: String = _
+
+ @Output
+ @Gather(classOf[SimpleTextGatherFunction])
var jobOutputFile: File = _
+
+ @Output
+ @Gather(classOf[SimpleTextGatherFunction])
var jobErrorFile: File = _
@Internal
@@ -16,4 +37,54 @@ trait DispatchFunction extends QFunction with MemoryLimitedFunction {
@Internal
var jobQueue = "broad"
+
+ override def freeze = {
+ if (jobNamePrefix == null)
+ jobNamePrefix = DispatchFunction.processNamePrefix
+
+ if (jobName == null)
+ jobName = DispatchFunction.nextJobName(jobNamePrefix)
+
+ if (jobOutputFile == null)
+ jobOutputFile = new File(jobName + ".out")
+
+ if (jobErrorFile == null)
+ jobErrorFile = new File(jobName + ".err")
+
+ commandDirectory = IOUtils.absolute(IOUtils.CURRENT_DIR, commandDirectory)
+
+ super.freeze
+ }
+
+ /**
+ * Override the canon function to change any relative path to an absolute path.
+ */
+ override protected def canon(value: Any) = {
+ value match {
+ case file: File => IOUtils.absolute(commandDirectory, file)
+ case x => super.canon(x)
+ }
+ }
+
+ def absolute(file: File) = IOUtils.absolute(commandDirectory, file)
+ def temp(subDir: String) = IOUtils.sub(commandDirectory, jobName + "-" + subDir)
+
+ override def toString = commandLine
+}
+
+object DispatchFunction {
+ private val processNamePrefix = "Q-" + {
+ var prefix = ManagementFactory.getRuntimeMXBean.getName
+ val index = prefix.indexOf(".")
+ if (index >= 0)
+ prefix = prefix.substring(0, index)
+ prefix
+ }
+
+ private var jobIndex = 0
+
+ private def nextJobName(prefix: String) = {
+ jobIndex += 1
+ prefix + "-" + jobIndex
+ }
}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala
new file mode 100644
index 000000000..61b298f16
--- /dev/null
+++ b/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala
@@ -0,0 +1,67 @@
+package org.broadinstitute.sting.queue.function
+
+import java.lang.reflect.Field
+import org.broadinstitute.sting.queue.util._
+
+/**
+ * A function with @Inputs and @Outputs tagging fields that can be set by the user in a QScript
+ */
+trait InputOutputFunction extends QFunction with Cloneable {
+ def getFieldValue(field: Field) = ReflectionUtils.getValue(this, field)
+ def setFieldValue(field: Field, value: Any) = ReflectionUtils.setValue(this, field, value)
+
+ def functionFields: List[Field] = inputFields ::: outputFields ::: internalFields
+ def inputFields = ReflectionUtils.filterFields(fields, classOf[Input])
+ def outputFields = ReflectionUtils.filterFields(fields, classOf[Output])
+ def internalFields = ReflectionUtils.filterFields(fields, classOf[Internal])
+
+ private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
+ def inputs = ReflectionUtils.getFieldNamesValues(this, inputFields)
+ def outputs = ReflectionUtils.getFieldNamesValues(this, outputFields)
+ def internals = ReflectionUtils.getFieldNamesValues(this, internalFields)
+
+ /**
+ * Sets a field value using the name of the field.
+ * Field must be annotated with @Input, @Output, or @Internal
+ * @returns true if the value was found and set
+ */
+ def addOrUpdateWithStringValue(name: String, value: String) = {
+ fields.find(_.getName == name) match {
+ case Some(field) =>
+ val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
+ val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
+ val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
+ if (isInput || isOutput || isInternal) {
+ ReflectionUtils.addOrUpdateWithStringValue(this, field, value)
+ }
+ true
+ case None => false
+ }
+ }
+
+ def cloneFunction() = clone.asInstanceOf[this.type]
+ // explicitly overriden so that trait function cloneFunction can use this.clone
+ override protected def clone = super.clone
+
+ /**
+ * As the function is frozen, changes all fields to their canonical forms.
+ */
+ override def freeze = {
+ for (field <- this.functionFields)
+ mapField(field, canon)
+ super.freeze
+ }
+
+ def mapField(field: Field, f: Any => Any): Any = {
+ var fieldValue = this.getFieldValue(field)
+ fieldValue = CollectionUtils.updated(fieldValue, f).asInstanceOf[AnyRef]
+ this.setFieldValue(field, fieldValue)
+ fieldValue
+ }
+
+ /**
+ * Set value to a uniform value across functions.
+ * The biggest example is file paths relative to the command directory in DispatchFunction
+ */
+ protected def canon(value: Any): Any = value
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala
index 06ba9f3e8..e525f58b6 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala
@@ -1,6 +1,8 @@
package org.broadinstitute.sting.queue.function
-trait IntervalFunction {
- type Intervals = String
- var intervals: Intervals
-}
\ No newline at end of file
+import java.io.File
+
+trait IntervalFunction extends InputOutputFunction {
+ var referenceFile: File
+ var intervals: File
+}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala
index 509c7cad0..affeb7cc4 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala
@@ -10,5 +10,5 @@ import scala.collection.immutable.ListMap
class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction {
def inputs = in
def outputs = out
- def run(qGraph: QGraph) = null
+ override def toString = "