From 30cf78fdc016a0fd7965ccbeb3935ced718a58a1 Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 22 Jun 2010 18:39:20 +0000 Subject: [PATCH] Refactoring for a first version of scatter gather api with basic shell script implementations. Modified build script so that queue is cleaned during "ant clean". git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3611 348d0f76-0448-11de-a6fe-93d51630548a --- build.xml | 101 ++++++------ ivy.xml | 4 +- .../sting/queue/util/Gather.java | 43 +++++ .../sting/queue/util/Input.java | 2 +- .../sting/queue/util/Internal.java | 4 +- .../sting/queue/util/Optional.java | 2 +- .../sting/queue/util/Output.java | 2 +- .../sting/queue/util/Scatter.java | 35 ++++ .../sting/queue/QArguments.scala | 5 +- .../broadinstitute/sting/queue/QScript.scala | 7 +- .../queue/engine/CommandLineRunner.scala | 7 +- .../queue/engine/DispatchJobRunner.scala | 24 +-- .../sting/queue/engine/LsfJobRunner.scala | 25 +-- .../sting/queue/engine/QGraph.scala | 151 ++++++++++++++---- .../sting/queue/engine/QNode.scala | 16 +- .../engine/TopologicalJobScheduler.scala | 14 +- .../queue/function/CommandLineFunction.scala | 60 ++----- .../queue/function/DispatchFunction.scala | 77 ++++++++- .../queue/function/InputOutputFunction.scala | 67 ++++++++ .../queue/function/IntervalFunction.scala | 10 +- .../queue/function/MappingFunction.scala | 2 +- .../function/MemoryLimitedFunction.scala | 9 -- .../sting/queue/function/QFunction.scala | 22 ++- .../queue/function/gatk/GatkFunction.scala | 18 ++- .../scattergather/BamGatherFunction.scala | 9 ++ .../CleanupTempDirsFunction.scala | 15 ++ .../CreateTempDirsFunction.scala | 15 ++ .../scattergather/GatherFunction.scala | 16 ++ .../IntervalScatterFunction.scala | 21 +++ .../scattergather/ScatterFunction.scala | 20 +++ .../ScatterGatherableFunction.scala | 135 ++++++++++++++++ .../SimpleTextGatherFunction.scala | 10 ++ .../sting/queue/util/CollectionUtils.scala | 45 ++++++ .../sting/queue/util/IOUtils.scala | 31 ++++ .../sting/queue/util/Logging.scala | 12 +- .../sting/queue/util/ReflectionUtils.scala | 44 +++-- shell/mergeText.sh | 36 +++++ shell/splitIntervals.sh | 55 +++++++ 38 files changed, 938 insertions(+), 233 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/queue/util/Gather.java create mode 100644 java/src/org/broadinstitute/sting/queue/util/Scatter.java create mode 100644 scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/BamGatherFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/IntervalScatterFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala create mode 100755 shell/mergeText.sh create mode 100755 shell/splitIntervals.sh diff --git a/build.xml b/build.xml index bd1c0c925..7a9ffe58b 100644 --- a/build.xml +++ b/build.xml @@ -3,9 +3,10 @@ - + + @@ -22,7 +23,7 @@ - + @@ -33,7 +34,7 @@ - + @@ -51,7 +52,7 @@ - + @@ -101,30 +102,31 @@ - + + - - + - - + additionalparam="-build-timestamp "${build.timestamp}" -version-suffix .${build.version} -out ${basedir}/${java.classes}/${resource.file}"> + - @@ -136,12 +138,12 @@ - + - + @@ -153,11 +155,11 @@ - + - + @@ -167,7 +169,7 @@ - + @@ -176,7 +178,7 @@ - + @@ -240,14 +242,14 @@ - + - - - + + + - + @@ -260,8 +262,8 @@ - - + + @@ -279,7 +281,7 @@ Building Scala... - + @@ -296,34 +298,29 @@ - - - - - - - + + + + - - - - - - - - - + + Building Queue... - + + + + + + @@ -340,9 +337,9 @@ - + - + @@ -357,14 +354,14 @@ - + - + - + @@ -376,19 +373,19 @@ - + - + - + @@ -401,7 +398,7 @@ - + @@ -454,7 +451,7 @@ - + diff --git a/ivy.xml b/ivy.xml index 433bd5ef5..a266b179d 100644 --- a/ivy.xml +++ b/ivy.xml @@ -42,8 +42,8 @@ - - + + diff --git a/java/src/org/broadinstitute/sting/queue/util/Gather.java b/java/src/org/broadinstitute/sting/queue/util/Gather.java new file mode 100644 index 000000000..9928b3ad3 --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Gather.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies the class to gather an output of a QFunction. + * Not an input or output but should be copied with a function. + * Internals should have default values that should be handled, i.e. they are always @Optional + * A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc. + * or to name part of a function: farm job name + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Gather { + Class value(); +} diff --git a/java/src/org/broadinstitute/sting/queue/util/Input.java b/java/src/org/broadinstitute/sting/queue/util/Input.java index 15fa28cb7..941945fc2 100644 --- a/java/src/org/broadinstitute/sting/queue/util/Input.java +++ b/java/src/org/broadinstitute/sting/queue/util/Input.java @@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util; import java.lang.annotation.*; /** - * Specifies an input to a QueueFunction + * Specifies an input to a QFunction * Written in java because scala doesn't support RetentionPolicy.RUNTIME */ @Documented diff --git a/java/src/org/broadinstitute/sting/queue/util/Internal.java b/java/src/org/broadinstitute/sting/queue/util/Internal.java index 842cc9d1a..758f28b6d 100644 --- a/java/src/org/broadinstitute/sting/queue/util/Internal.java +++ b/java/src/org/broadinstitute/sting/queue/util/Internal.java @@ -27,9 +27,11 @@ package org.broadinstitute.sting.queue.util; import java.lang.annotation.*; /** - * Specifies an internal setting for a QueueFunction. + * Specifies an internal setting for a QFunction. * Not an input or output but should be copied with a function. * Internals should have default values that should be handled, i.e. they are always @Optional + * A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc. + * or to name part of a function: farm job name * Written in java because scala doesn't support RetentionPolicy.RUNTIME */ @Documented diff --git a/java/src/org/broadinstitute/sting/queue/util/Optional.java b/java/src/org/broadinstitute/sting/queue/util/Optional.java index e31df349b..07ee36031 100644 --- a/java/src/org/broadinstitute/sting/queue/util/Optional.java +++ b/java/src/org/broadinstitute/sting/queue/util/Optional.java @@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util; import java.lang.annotation.*; /** - * Specifies an input or output to a QueueFunction is optional + * Specifies an input or output to a QFunction is optional * Written in java because scala doesn't support RetentionPolicy.RUNTIME */ @Documented diff --git a/java/src/org/broadinstitute/sting/queue/util/Output.java b/java/src/org/broadinstitute/sting/queue/util/Output.java index 5b8c5c46d..ca0ba8dac 100644 --- a/java/src/org/broadinstitute/sting/queue/util/Output.java +++ b/java/src/org/broadinstitute/sting/queue/util/Output.java @@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util; import java.lang.annotation.*; /** - * Specifies an output to a QueueFunction + * Specifies an output to a QFunction * Written in java because scala doesn't support RetentionPolicy.RUNTIME */ @Documented diff --git a/java/src/org/broadinstitute/sting/queue/util/Scatter.java b/java/src/org/broadinstitute/sting/queue/util/Scatter.java new file mode 100644 index 000000000..8314bea9c --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Scatter.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Scatter { + Class value(); +} diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala index b0f091b90..23dfac88d 100755 --- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala +++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala @@ -21,8 +21,9 @@ class QArguments(args: Array[String]) { filtered.appendAll(args) if (isFlagged(filtered, "-debug")) - Logging.enableDebug - + Logging.setDebug + if (isFlagged(filtered, "-trace")) + Logging.setTrace if (isFlagged(filtered, "-dry")) dryRun = true if (isFlagged(filtered, "-bsub")) diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index d466c85f4..4151c7802 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -15,6 +15,11 @@ object QScript { type ClassType = org.broadinstitute.sting.queue.util.ClassType type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction + type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction + type Scatter = org.broadinstitute.sting.queue.util.Scatter + type Gather = org.broadinstitute.sting.queue.util.Gather + type BamGatherFunction = org.broadinstitute.sting.queue.function.scattergather.BamGatherFunction + type SimpleTextGatherFunction = org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction // The arguments for executing pipelines private var qArgs: QArguments = _ @@ -85,7 +90,7 @@ object QScript { * Sets the @Input and @Output values for a single function */ def setParams(function: CommandLineFunction): Unit = - for ((name, value) <- qArgs.argMap) function.setValue(name, value) + for ((name, value) <- qArgs.argMap) function.addOrUpdateWithStringValue(name, value) /** * Executes functions that have been added to the pipeline. diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala index 318624d3c..da23d3766 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala @@ -8,8 +8,11 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction */ trait CommandLineRunner extends Logging { def run(function: CommandLineFunction, qGraph: QGraph) = { - var commandLine = function.commandLine - logger.info(commandLine) + if (logger.isDebugEnabled) { + logger.debug(function.commandDirectory + " > " + function.commandLine) + } else { + logger.info(function.commandLine) + } if (!qGraph.dryRun) ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala index 8966945b1..5df3d17b0 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -1,18 +1,12 @@ package org.broadinstitute.sting.queue.engine -import edu.mit.broad.core.lsf.LocalLsfJob import collection.JavaConversions._ -import management.ManagementFactory -import java.io.File -import java.util.ArrayList import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction} trait DispatchJobRunner { type DispatchJobType private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType] - protected def newJobName = DispatchJobRunner.nextJobName - def dispatch(function: DispatchFunction, qGraph: QGraph) protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) = @@ -32,25 +26,9 @@ trait DispatchJobRunner { case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction) // For any other type of edge find the LSF jobs preceding the edge - case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph) + case qFunction: QFunction => previous = previousJobs(qFunction, qGraph) ::: previous } } previous } } - -object DispatchJobRunner { - private val jobNamePrefix = "Q-" + { - var prefix = ManagementFactory.getRuntimeMXBean.getName - val index = prefix.indexOf(".") - if (index >= 0) - prefix = prefix.substring(0, index) - prefix - } - private var jobIndex = 0 - - private def nextJobName = { - jobIndex += 1 - jobNamePrefix + "-" + jobIndex - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index edfcaf179..5f29f35f0 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.engine import collection.JavaConversions._ import edu.mit.broad.core.lsf.LocalLsfJob import org.broadinstitute.sting.queue.function.DispatchFunction -import java.io.File import java.util.ArrayList import org.broadinstitute.sting.queue.util.Logging @@ -11,22 +10,10 @@ trait LsfJobRunner extends DispatchJobRunner with Logging { type DispatchJobType = LocalLsfJob def dispatch(function: DispatchFunction, qGraph: QGraph) = { - var jobName = function.jobName - if (jobName == null) - jobName = newJobName - - var jobOutputFile = function.jobOutputFile - if (jobOutputFile == null) - jobOutputFile = new File(jobName + ".out") - - var jobErrorFile = function.jobErrorFile - if (jobErrorFile == null) - jobErrorFile = new File(jobName + ".err") - val job = new LocalLsfJob - job.setName(jobName) - job.setOutputFile(jobOutputFile) - job.setErrFile(jobErrorFile) + job.setName(function.jobName) + job.setOutputFile(function.jobOutputFile) + job.setErrFile(function.jobErrorFile) job.setWorkingDir(function.commandDirectory) job.setProject(function.jobProject) job.setQueue(function.jobQueue) @@ -45,7 +32,11 @@ trait LsfJobRunner extends DispatchJobRunner with Logging { addJob(function, job) - logger.info(job.getBsubCommand.mkString(" ")) + if (logger.isDebugEnabled) { + logger.debug(function.commandDirectory + " > " + job.getBsubCommand.mkString(" ")) + } else { + logger.info(job.getBsubCommand.mkString(" ")) + } if (!qGraph.dryRun) job.start diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index f54a8f0ab..3c27cfa58 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -2,14 +2,19 @@ package org.broadinstitute.sting.queue.engine import org.jgrapht.graph.SimpleDirectedGraph import scala.collection.JavaConversions +import scala.collection.JavaConversions._ import scala.collection.immutable.ListMap -import org.broadinstitute.sting.queue.util.Logging import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction} +import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction +import org.broadinstitute.sting.queue.util.{CollectionUtils, Logging} +import org.broadinstitute.sting.queue.QException +import org.jgrapht.alg.CycleDetector +import org.jgrapht.EdgeFactory class QGraph extends Logging { var dryRun = true var bsubAllJobs = false - val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction]) + val jobGraph = newGraph def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size def add(command: CommandLineFunction) { @@ -25,47 +30,135 @@ class QGraph extends Logging { val inputs = function.inputs val outputs = function.outputs - if (inputs.size > 1) - for ((name, input) <- inputs) - addNullEdge(ListMap(name -> input), inputs) + for ((name, input) <- inputs) { + addCollectionInputs(name, input) + if (inputs.size > 1) + addMappingEdge(ListMap(name -> input), inputs) + } - if (outputs.size > 1) - for ((name, output) <- outputs) - addNullEdge(outputs, ListMap(name -> output)) + for ((name, output) <- outputs) { + addCollectionOutputs(name, output) + if (outputs.size > 1) + addMappingEdge(outputs, ListMap(name -> output)) + } } + + var pruning = true + while (pruning) { + pruning = false + val filler = jobGraph.edgeSet.filter(isFiller(_)) + if (filler.size > 0) { + jobGraph.removeAllEdges(filler) + pruning = true + } + } + + jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_))) } def run = { var isReady = true for (function <- JavaConversions.asSet(jobGraph.edgeSet)) { - val missingValues = function.missingValues - if (missingValues.size > 0) { - isReady = false - logger.error(function match { - case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine) - case x => "Missing values:" - }) - for (missing <- missingValues) { - logger.error(" " + missing) - } + function match { + case cmd: CommandLineFunction => + val missingValues = cmd.missingValues + if (missingValues.size > 0) { + isReady = false + logger.error("Missing values for function: %s".format(cmd.commandLine)) + for (missing <- missingValues) + logger.error(" " + missing) + } + case _ => } } - + + val detector = new CycleDetector(jobGraph) + if (detector.detectCycles) { + logger.error("Cycles were detected in the graph:") + for (cycle <- detector.findCycles) + logger.error(" " + cycle) + isReady = false + } + if (isReady || this.dryRun) (new TopologicalJobScheduler(this) with LsfJobRunner).runJobs } - private def add(f: QFunction, replace: Boolean) { - val inputs = QNode(f.inputs.values.filter(_ != null).toSet) - val outputs = QNode(f.outputs.values.filter(_ != null).toSet) - jobGraph.addVertex(inputs) - jobGraph.addVertex(outputs) - if (replace) - jobGraph.removeAllEdges(inputs, outputs) - jobGraph.addEdge(inputs, outputs, f) + private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] { + def createEdge(input: QNode, output: QNode) = new MappingFunction(input.valueMap, output.valueMap)}) + + private def add(f: QFunction, replace: Boolean): Unit = { + try { + f.freeze + + f match { + case scatterGather: ScatterGatherableFunction if (bsubAllJobs && scatterGather.scatterGatherable) => + val functions = scatterGather.generateFunctions() + if (logger.isTraceEnabled) + logger.trace("Scattered into %d parts: %s".format(functions.size, functions)) + functions.foreach(add(_)) + case _ => + val inputs = QNode(f.inputs.values.filter(_ != null).toSet) + val outputs = QNode(f.outputs.values.filter(_ != null).toSet) + val newSource = jobGraph.addVertex(inputs) + val newTarget = jobGraph.addVertex(outputs) + val removedEdges = if (replace) jobGraph.removeAllEdges(inputs, outputs) else Nil + val added = jobGraph.addEdge(inputs, outputs, f) + if (logger.isTraceEnabled) { + logger.trace("Mapped from: " + inputs) + logger.trace("Mapped to: " + outputs) + logger.trace("Mapped via: " + f) + logger.trace("Removed edges: " + removedEdges) + logger.trace("New source?: " + newSource) + logger.trace("New target?: " + newTarget) + logger.trace("") + } + } + } catch { + case e: Exception => + throw new QException("Error adding function: " + f, e) + } } - private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = { - add(new MappingFunction(input, output), false) + private def addCollectionInputs(name: String, value: Any): Unit = { + CollectionUtils.foreach(value, (item, collection) => + addMappingEdge(ListMap(name -> item), ListMap(name -> collection))) } + + private def addCollectionOutputs(name: String, value: Any): Unit = { + CollectionUtils.foreach(value, (item, collection) => + addMappingEdge(ListMap(name -> collection), ListMap(name -> item))) + } + + private def addMappingEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = + add(new MappingFunction(input, output), false) + + private def isMappingEdge(edge: QFunction) = + edge.isInstanceOf[MappingFunction] + + private def isFiller(edge: QFunction) = { + if (isMappingEdge(edge)) { + val source = jobGraph.getEdgeSource(edge) + val target = jobGraph.getEdgeTarget(edge) + if (jobGraph.outgoingEdgesOf(target).size == 0 || jobGraph.incomingEdgesOf(source).size == 0) + true + else if (isLoopback(source) || isLoopback(target)) + true + else false + } else false + } + + private def isLoopback(node: QNode) = { + var loopback = false + val incoming = jobGraph.incomingEdgesOf(node) + val outgoing = jobGraph.outgoingEdgesOf(node) + if (incoming.size == 1 && outgoing.size == 1) + if (isMappingEdge(incoming.head) && isMappingEdge(outgoing.head)) + if (jobGraph.getEdgeSource(incoming.head) == jobGraph.getEdgeTarget(outgoing.head)) + loopback = true + loopback + } + + private def isOrphan(node: QNode) = + (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala index 15a405d82..48d74dae1 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala @@ -1,6 +1,20 @@ package org.broadinstitute.sting.queue.engine +import scala.collection.immutable.ListMap + /** * Represents a state between QFunctions the directed acyclic QGraph */ -case class QNode (private val items: Set[Any]) +case class QNode (private val items: Set[Any]) { + /** + * Used during QGraph error reporting. + * The EdgeFactory uses the valueMap to create new edges for the CycleDetector. + */ + def valueMap = { + var map = ListMap.empty[String, Any] + for (item <- items) + if (item != null) + map += item.toString -> item + map + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala index 882f6977c..3e2c5a46c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala @@ -12,20 +12,28 @@ import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunc abstract class TopologicalJobScheduler(private val qGraph: QGraph) extends CommandLineRunner with DispatchJobRunner with Logging { - protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph) + protected val iterator = new TopologicalOrderIterator(qGraph.jobGraph) iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { - case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph) + case f: DispatchFunction if (qGraph.bsubAllJobs) => dispatch(f, qGraph) case f: CommandLineFunction => run(f, qGraph) case f: MappingFunction => /* do nothing for mapping functions */ } }) def runJobs = { - logger.info("Number of jobs: %s".format(this.qGraph.numJobs)) + logger.info("Number of jobs: %s".format(qGraph.numJobs)) + if (logger.isTraceEnabled) + logger.trace("Number of nodes: %s".format(qGraph.jobGraph.vertexSet.size)) + var numNodes = 0 for (target <- iterator) { + if (logger.isTraceEnabled) + logger.trace("Visiting: " + target) + numNodes += 1 // Do nothing for now, let event handler respond } + if (logger.isTraceEnabled) + logger.trace("Done walking %s nodes.".format(numNodes)) } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 4288cf8b7..5b0700fc6 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -3,20 +3,9 @@ package org.broadinstitute.sting.queue.function import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph} +import java.lang.reflect.Field -trait CommandLineFunction extends QFunction with DispatchFunction { - - /** - * The command line to run locally or via grid computing. - */ - def commandLine: String - - /** - * The directory where the command should run. - */ - @Internal - var commandDirectory: File = new File(".") - +trait CommandLineFunction extends InputOutputFunction with DispatchFunction { /** * Repeats parameters with a prefix/suffix if they are set otherwise returns "". * Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString. @@ -31,50 +20,25 @@ trait CommandLineFunction extends QFunction with DispatchFunction { protected def optional(prefix: String, param: Any, suffix: String = "") = if (hasValue(param)) prefix + toValue(param) + suffix else "" - /** - * Sets a field value using the name of the field. - * Field must be annotated with @Input, @Output, or @Internal - * @returns true if the value was found and set - */ - def setValue(name: String, value: String) = { - ReflectionUtils.getField(this, name) match { - case Some(field) => - val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input]) - val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output]) - val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal]) - if (isInput || isOutput || isInternal) { - ReflectionUtils.setValue(this, field, value) - } - true - case None => false - } - } - - private lazy val fields = ReflectionUtils.getAllFields(this.getClass) - private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal]) - def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input]) - def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output]) - - override def missingValues = { - val missingInputs = missingFields(inputs) - val missingOutputs = missingFields(outputs) + def missingValues = { + val missingInputs = missingFields(inputFields) + val missingOutputs = missingFields(outputFields) missingInputs | missingOutputs } - private def missingFields(fields: Map[String, Any]) = { + private def missingFields(fields: List[Field]) = { var missing = Set.empty[String] - for ((name, value) <- fields) { - val isOptional = ReflectionUtils.getField(this, name) match { - case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional]) - case None => false - } + for (field <- fields) { + val isOptional = ReflectionUtils.hasAnnotation(field, classOf[Optional]) if (!isOptional) - if (!hasValue(value)) - missing += name + if (!hasValue(ReflectionUtils.getValue(this, field))) + missing += field.getName } missing } + protected def hasFieldValue(field: Field) = hasValue(this.getFieldValue(field)) + private def hasValue(param: Any) = param match { case null => false case Nil => false diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala index 024cee4f5..8d441ab10 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala @@ -1,14 +1,35 @@ package org.broadinstitute.sting.queue.function import java.io.File -import org.broadinstitute.sting.queue.util.Internal +import java.lang.management.ManagementFactory +import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction +import org.broadinstitute.sting.queue.util._ -trait DispatchFunction extends QFunction with MemoryLimitedFunction { +trait DispatchFunction extends InputOutputFunction { def commandLine: String - var commandDirectory: File + @Input + @Optional + var memoryLimit: Option[Int] = None + + /** + * The directory where the command should run. + */ + @Internal + var commandDirectory: File = IOUtils.CURRENT_DIR + + @Internal + var jobNamePrefix: String = _ + + @Internal var jobName: String = _ + + @Output + @Gather(classOf[SimpleTextGatherFunction]) var jobOutputFile: File = _ + + @Output + @Gather(classOf[SimpleTextGatherFunction]) var jobErrorFile: File = _ @Internal @@ -16,4 +37,54 @@ trait DispatchFunction extends QFunction with MemoryLimitedFunction { @Internal var jobQueue = "broad" + + override def freeze = { + if (jobNamePrefix == null) + jobNamePrefix = DispatchFunction.processNamePrefix + + if (jobName == null) + jobName = DispatchFunction.nextJobName(jobNamePrefix) + + if (jobOutputFile == null) + jobOutputFile = new File(jobName + ".out") + + if (jobErrorFile == null) + jobErrorFile = new File(jobName + ".err") + + commandDirectory = IOUtils.absolute(IOUtils.CURRENT_DIR, commandDirectory) + + super.freeze + } + + /** + * Override the canon function to change any relative path to an absolute path. + */ + override protected def canon(value: Any) = { + value match { + case file: File => IOUtils.absolute(commandDirectory, file) + case x => super.canon(x) + } + } + + def absolute(file: File) = IOUtils.absolute(commandDirectory, file) + def temp(subDir: String) = IOUtils.sub(commandDirectory, jobName + "-" + subDir) + + override def toString = commandLine +} + +object DispatchFunction { + private val processNamePrefix = "Q-" + { + var prefix = ManagementFactory.getRuntimeMXBean.getName + val index = prefix.indexOf(".") + if (index >= 0) + prefix = prefix.substring(0, index) + prefix + } + + private var jobIndex = 0 + + private def nextJobName(prefix: String) = { + jobIndex += 1 + prefix + "-" + jobIndex + } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala new file mode 100644 index 000000000..61b298f16 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/InputOutputFunction.scala @@ -0,0 +1,67 @@ +package org.broadinstitute.sting.queue.function + +import java.lang.reflect.Field +import org.broadinstitute.sting.queue.util._ + +/** + * A function with @Inputs and @Outputs tagging fields that can be set by the user in a QScript + */ +trait InputOutputFunction extends QFunction with Cloneable { + def getFieldValue(field: Field) = ReflectionUtils.getValue(this, field) + def setFieldValue(field: Field, value: Any) = ReflectionUtils.setValue(this, field, value) + + def functionFields: List[Field] = inputFields ::: outputFields ::: internalFields + def inputFields = ReflectionUtils.filterFields(fields, classOf[Input]) + def outputFields = ReflectionUtils.filterFields(fields, classOf[Output]) + def internalFields = ReflectionUtils.filterFields(fields, classOf[Internal]) + + private lazy val fields = ReflectionUtils.getAllFields(this.getClass) + def inputs = ReflectionUtils.getFieldNamesValues(this, inputFields) + def outputs = ReflectionUtils.getFieldNamesValues(this, outputFields) + def internals = ReflectionUtils.getFieldNamesValues(this, internalFields) + + /** + * Sets a field value using the name of the field. + * Field must be annotated with @Input, @Output, or @Internal + * @returns true if the value was found and set + */ + def addOrUpdateWithStringValue(name: String, value: String) = { + fields.find(_.getName == name) match { + case Some(field) => + val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input]) + val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output]) + val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal]) + if (isInput || isOutput || isInternal) { + ReflectionUtils.addOrUpdateWithStringValue(this, field, value) + } + true + case None => false + } + } + + def cloneFunction() = clone.asInstanceOf[this.type] + // explicitly overriden so that trait function cloneFunction can use this.clone + override protected def clone = super.clone + + /** + * As the function is frozen, changes all fields to their canonical forms. + */ + override def freeze = { + for (field <- this.functionFields) + mapField(field, canon) + super.freeze + } + + def mapField(field: Field, f: Any => Any): Any = { + var fieldValue = this.getFieldValue(field) + fieldValue = CollectionUtils.updated(fieldValue, f).asInstanceOf[AnyRef] + this.setFieldValue(field, fieldValue) + fieldValue + } + + /** + * Set value to a uniform value across functions. + * The biggest example is file paths relative to the command directory in DispatchFunction + */ + protected def canon(value: Any): Any = value +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala index 06ba9f3e8..e525f58b6 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala @@ -1,6 +1,8 @@ package org.broadinstitute.sting.queue.function -trait IntervalFunction { - type Intervals = String - var intervals: Intervals -} \ No newline at end of file +import java.io.File + +trait IntervalFunction extends InputOutputFunction { + var referenceFile: File + var intervals: File +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala index 509c7cad0..affeb7cc4 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala @@ -10,5 +10,5 @@ import scala.collection.immutable.ListMap class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction { def inputs = in def outputs = out - def run(qGraph: QGraph) = null + override def toString = "" } diff --git a/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala deleted file mode 100644 index c7fad1500..000000000 --- a/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala +++ /dev/null @@ -1,9 +0,0 @@ -package org.broadinstitute.sting.queue.function - -import org.broadinstitute.sting.queue.util.{Input, Optional} - -trait MemoryLimitedFunction { - @Input - @Optional - var memoryLimit: Option[Int] = None -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index bef3257d9..4d639d09f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -1,10 +1,28 @@ package org.broadinstitute.sting.queue.function -import org.broadinstitute.sting.queue.engine.QGraph import scala.collection.immutable.ListMap +/** + * The base interface for all functions in Queue. + * Inputs and outputs are specified as ListMaps of name -> value. + * The names are used for debugging. + * Inputs are matched to other outputs by using .equals() + */ trait QFunction { + /** + * After a function is frozen no more updates are allowed by the user. + * The function is allow to make necessary updates internally to make sure + * the inputs and outputs will be equal to other inputs and outputs. + */ + def freeze = {} + + /** + * ListMap of name -> value inputs for this function. + */ def inputs: ListMap[String, Any] + + /** + * ListMap of name -> value outputs for this function. + */ def outputs: ListMap[String, Any] - def missingValues = Set.empty[String] } diff --git a/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala index f7eeead32..84a81ec16 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala @@ -1,11 +1,12 @@ package org.broadinstitute.sting.queue.function.gatk import java.io.File -import org.broadinstitute.sting.queue.util.{Input, Optional} -import org.broadinstitute.sting.queue.function.{MemoryLimitedFunction, IntervalFunction, CommandLineFunction} +import org.broadinstitute.sting.queue.function.IntervalFunction +import org.broadinstitute.sting.queue.util.{Scatter, Internal, Input, Optional} +import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, IntervalScatterFunction} -trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with IntervalFunction { - @Input +trait GatkFunction extends ScatterGatherableFunction with IntervalFunction { + @Internal @Optional var javaTmpDir: String = _ @@ -13,7 +14,7 @@ trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with I var gatkJar: String = _ @Input - var referenceFile: String = _ + var referenceFile: File = _ @Input @Optional @@ -21,14 +22,15 @@ trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with I @Input @Optional - var dbsnp: File = _ + @Scatter(classOf[IntervalScatterFunction]) + var intervals: File = _ @Input @Optional - var intervals: Intervals = new Intervals("all") + var dbsnp: File = _ protected def gatkCommandLine(walker: String) = - "java%s%s -jar %s -T %s -R %s%s%s " + "java%s%s -jar %s -T %s -R %s%s%s%s " .format(optional(" -Xmx", memoryLimit, "g"), optional(" -Djava.io.tmpdir=", javaTmpDir), gatkJar, walker, referenceFile, repeat(" -I ", bamFiles), optional(" -D ", dbsnp), optional(" -L ", intervals)) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/BamGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/BamGatherFunction.scala new file mode 100644 index 000000000..259dd26f1 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/BamGatherFunction.scala @@ -0,0 +1,9 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import java.io.File + +class BamGatherFunction extends GatherFunction { + type GatherType = File + + def commandLine = "samtools merge %s%s".format(originalOutput, repeat(" ", gatherParts)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala new file mode 100644 index 000000000..5de2ed33c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala @@ -0,0 +1,15 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util.Input +import java.io.File + +class CleanupTempDirsFunction extends CommandLineFunction { + @Input + var originalOutputs: List[Any] = Nil + + @Input + var tempDirectories: List[File] = Nil + + def commandLine = "rm -rf%s".format(repeat(" '", tempDirectories, "'")) +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala new file mode 100644 index 000000000..b10090348 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -0,0 +1,15 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import java.io.File +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util.{Output, Input} + +class CreateTempDirsFunction extends CommandLineFunction { + @Input + var originalInputs: List[Any] = Nil + + @Output + var tempDirectories: List[File] = Nil + + def commandLine = "mkdir%s".format(repeat(" '", tempDirectories, "'")) +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala new file mode 100644 index 000000000..8bdce674b --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -0,0 +1,16 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.{CommandLineFunction} +import org.broadinstitute.sting.queue.util.{Input, Output} + +trait GatherFunction extends CommandLineFunction { + type GatherType + + @Input + var gatherParts: List[GatherType] = Nil + + @Output + var originalOutput: GatherType = _ + + def setOriginalFunction(originalFunction: ScatterGatherableFunction) = {} +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/IntervalScatterFunction.scala new file mode 100644 index 000000000..979acebd8 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/IntervalScatterFunction.scala @@ -0,0 +1,21 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import java.io.File +import org.broadinstitute.sting.queue.util.Input +import org.broadinstitute.sting.queue.function.IntervalFunction + +class IntervalScatterFunction extends ScatterFunction { + type ScatterType = File + + @Input + var referenceFile: File = _ + + override def setOriginalFunction(originalFunction: ScatterGatherableFunction) = { + val command = originalFunction.asInstanceOf[IntervalFunction] + referenceFile = command.referenceFile + super.setOriginalFunction(originalFunction) + } + + // TODO: Use the reference file for "all" + def commandLine = "splitIntervals.sh %s%s".format(originalInput, repeat(" ", scatterParts)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala new file mode 100644 index 000000000..f93bc857a --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -0,0 +1,20 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util.{Input, Output} +import java.io.File + +trait ScatterFunction extends CommandLineFunction { + type ScatterType + + @Input + var originalInput: ScatterType = _ + + @Input + var tempDirectories: List[File] = Nil + + @Output + var scatterParts: List[ScatterType] = Nil + + def setOriginalFunction(originalFunction: ScatterGatherableFunction) = {} +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala new file mode 100644 index 000000000..13da1b459 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -0,0 +1,135 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import java.lang.reflect.Field +import java.io.File +import org.broadinstitute.sting.queue.util._ + +trait ScatterGatherableFunction extends CommandLineFunction { + @Internal + var scatterCount: Int = 1 + + def scatterField = this.inputFields.find(field => ReflectionUtils.hasAnnotation(field, classOf[Scatter])).get + + def scatterGatherable = { + if (scatterCount < 2) + false + else if (!hasFieldValue(scatterField)) + false + else + true + } + + def generateFunctions() = ScatterGatherableFunction.generateFunctions(this) +} + +object ScatterGatherableFunction { + private def generateFunctions(originalFunction: ScatterGatherableFunction) = { + var functions = List.empty[CommandLineFunction] + var tempDirectories = List.empty[File] + + // Create a function that will remove any temporary items + var cleanupFunction = new CleanupTempDirsFunction + cleanupFunction.jobNamePrefix = originalFunction.jobNamePrefix + cleanupFunction.commandDirectory = originalFunction.commandDirectory + + // Find the field with @Scatter and its value + var scatterField = originalFunction.scatterField + val originalValue = originalFunction.getFieldValue(scatterField) + + // Create the scatter function based on @Scatter + val scatterFunction = getScatterFunction(scatterField) + scatterFunction.setOriginalFunction(originalFunction) + scatterFunction.jobNamePrefix = originalFunction.jobNamePrefix + scatterFunction.commandDirectory = originalFunction.temp("scatter-" + scatterField.getName) + scatterFunction.originalInput = originalValue.asInstanceOf[scatterFunction.ScatterType] + tempDirectories :+= scatterFunction.commandDirectory + functions :+= scatterFunction + + // Create the gather functions for each output field + var gatherFunctions = Map.empty[Field, GatherFunction] + for (outputField <- originalFunction.outputFields) { + + // Create the gather function based on @Gather + val gatherFunction = getGatherFunction(outputField) + gatherFunction.setOriginalFunction(originalFunction) + gatherFunction.jobNamePrefix = originalFunction.jobNamePrefix + gatherFunction.commandDirectory = originalFunction.temp("gather-" + outputField.getName) + + val gatheredValue = originalFunction.getFieldValue(outputField).asInstanceOf[gatherFunction.GatherType] + gatherFunction.originalOutput = gatheredValue + + tempDirectories :+= gatherFunction.commandDirectory + cleanupFunction.originalOutputs :+= gatheredValue + + functions :+= gatherFunction + + gatherFunctions += outputField -> gatherFunction + } + + // Create the clone functions for running the parallel jobs + var cloneFunctions = List.empty[CommandLineFunction] + for (i <- 1 to originalFunction.scatterCount) { + val cloneFunction = newFunctionClone(originalFunction) + cloneFunctions :+= cloneFunction + + val tempDir = originalFunction.temp("temp-"+i) + cloneFunction.commandDirectory = tempDir + tempDirectories :+= tempDir + + // Reset the input of the clone to the the temp dir and add it as an output of the scatter + var scatterPart = CollectionUtils.updated(originalValue, resetToTempDir(tempDir)) + scatterFunction.scatterParts :+= scatterPart.asInstanceOf[scatterFunction.ScatterType] + cloneFunction.setFieldValue(scatterField, scatterPart) + + // For each each output field, change value to the temp dir and feed it into the gatherer + for (outputField <- originalFunction.outputFields) { + val gatherFunction = gatherFunctions(outputField) + val gatherPart = cloneFunction.mapField(outputField, resetToTempDir(tempDir)) + gatherFunction.gatherParts :+= gatherPart.asInstanceOf[gatherFunction.GatherType] + } + } + functions = cloneFunctions ::: functions + + // Create a function to create all of the temp directories. + // All of its inputs are the inputs of the original function. + val initializeFunction = new CreateTempDirsFunction + initializeFunction.jobNamePrefix = originalFunction.jobNamePrefix + initializeFunction.commandDirectory = originalFunction.commandDirectory + + for (inputField <- originalFunction.inputFields) + initializeFunction.originalInputs :+= originalFunction.getFieldValue(inputField) + + initializeFunction.tempDirectories = tempDirectories + scatterFunction.tempDirectories = tempDirectories + cleanupFunction.tempDirectories = tempDirectories + + functions +:= initializeFunction + functions :+= cleanupFunction + + // Return all the various functions we created + functions + } + + private def resetToTempDir(tempDir: File): Any => Any = { + (any: Any) => { + any match { + case file: File => IOUtils.reset(tempDir, file) + case x => x + } + } + } + + private def getScatterFunction(inputField: Field) = + ReflectionUtils.getAnnotation(inputField, classOf[Scatter]).value.newInstance.asInstanceOf[ScatterFunction] + + private def getGatherFunction(outputField: Field) = + ReflectionUtils.getAnnotation(outputField, classOf[Gather]).value.newInstance.asInstanceOf[GatherFunction] + + private def newFunctionClone(originalFunction: ScatterGatherableFunction) = { + val cloneFunction = originalFunction.cloneFunction.asInstanceOf[ScatterGatherableFunction] + // Make sure clone doesn't get scattered + cloneFunction.scatterCount = 1 + cloneFunction + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala new file mode 100644 index 000000000..070c36115 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala @@ -0,0 +1,10 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import java.io.File + +class SimpleTextGatherFunction extends GatherFunction { + type GatherType = File + + // TODO: Write a text merging utility that takes into account headers. + def commandLine = "mergeText.sh %s%s".format(originalOutput, repeat(" ", gatherParts)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala new file mode 100644 index 000000000..169de8677 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala @@ -0,0 +1,45 @@ +package org.broadinstitute.sting.queue.util + +/** + * Utilities that try to deeply apply operations to collections + */ +object CollectionUtils { + + def test(value: Any, f: Any => Boolean): Boolean = { + var result = f(value) + foreach(value, (item, collection) => { + result |= f(item) + }) + result + } + + def updated(value: Any, f: (Any) => Any): Any = { + value match { + case seq: Seq[_] => seq.map(updated(_, f)) + case array: Array[_] => array.map(updated(_, f)) + case option: Option[_] => option.map(updated(_, f)) + case x => f(x) + } + } + + def foreach(value: Any, f: (Any, Any) => Unit): Unit = { + value match { + case seq: Seq[_] => + for (item <- seq) { + f(item, seq) + foreach(item, f) + } + case product: Product => + for (item <- product.productIterator) { + f(item, product) + foreach(item, f) + } + case array: Array[_] => + for (item <- array) { + f(item, array) + foreach(item, f) + } + case _ => + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala new file mode 100644 index 000000000..5fa902391 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -0,0 +1,31 @@ +package org.broadinstitute.sting.queue.util + +import java.io.{IOException, File} + +object IOUtils { + val CURRENT_DIR = new File(".") + + def sub(parent: File, subPath: String) = { + val file = new File(subPath) + if (parent == CURRENT_DIR && file == CURRENT_DIR) + CURRENT_DIR.getCanonicalFile + else if (parent == CURRENT_DIR || file.isAbsolute) + file + else if (file == CURRENT_DIR) + parent + else + new File(parent, subPath) + } + + def temp(prefix: String, suffix: String = "") = { + val tempDir = File.createTempFile(prefix + "-", suffix) + if(!tempDir.delete) + throw new IOException("Could not delete sub file: " + tempDir.getAbsolutePath()) + if(!tempDir.mkdir) + throw new IOException("Could not create sub directory: " + tempDir.getAbsolutePath()) + tempDir + } + + def reset(dir: File, file: File) = sub(dir, file.getName).getAbsoluteFile + def absolute(dir: File, file: File) = sub(dir, file.getPath).getAbsoluteFile +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala index 6861a31d6..c61a6267f 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala @@ -7,9 +7,7 @@ import org.apache.log4j._ */ trait Logging { private val className = this.getClass.getName - protected lazy val logger = configuredLogger - - def configuredLogger = { + protected lazy val logger = { Logging.configureLogging Logger.getLogger(className) } @@ -17,15 +15,17 @@ trait Logging { object Logging { private var configured = false - private var isDebug = false + private var level = Level.INFO def configureLogging = { if (!configured) { var root = Logger.getRootLogger root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} - %m %n"))) - root.setLevel(if(isDebug) Level.DEBUG else Level.INFO) + root.setLevel(level) configured = true } } - def enableDebug = {isDebug = true; Logger.getRootLogger.setLevel(Level.DEBUG)} + def setDebug = setLevel(Level.DEBUG) + def setTrace = setLevel(Level.TRACE) + private def setLevel(level: Level) = {this.level = level; Logger.getRootLogger.setLevel(level)} } diff --git a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala index ccd3aceaf..75e4e0a89 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala @@ -8,13 +8,20 @@ import scala.collection.immutable.ListMap import java.lang.reflect.{ParameterizedType, Field} object ReflectionUtils { - def getField(obj: AnyRef, name: String) = getAllFields(obj.getClass).find(_.getName == name) - def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null + + def getAnnotation[T <: Annotation](field: Field, annotation: Class[T]): T = { + if (!hasAnnotation(field, annotation)) + throw new QException("Field %s is missing annotation %s".format(field, annotation)) + field.getAnnotation(annotation).asInstanceOf[T] + } - def getFieldsAnnotatedWith(obj: AnyRef, fields: List[Field], annotation: Class[_ <: Annotation]) = - ListMap(fields.filter(field => hasAnnotation(field, annotation)) - .map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*) + def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList) + + def filterFields(fields: List[Field], annotation: Class[_ <: Annotation]) = fields.filter(field => hasAnnotation(field, annotation)) + + def getFieldNamesValues(obj: AnyRef, fields: List[Field]) = + ListMap(fields.map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*) def getAllTypes(clazz: Class[_]) = { var types = List.empty[Class[_]] @@ -26,19 +33,13 @@ object ReflectionUtils { types } - def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList) - - def setValue(obj: AnyRef, field: Field, value: String) = { + def getValue(obj: AnyRef, field: Field) = fieldGetter(field).invoke(obj) + def setValue(obj: AnyRef, field: Field, value: Any) = fieldSetter(field).invoke(obj, value.asInstanceOf[AnyRef]) + def addOrUpdateWithStringValue(obj: AnyRef, field: Field, value: String) = { val getter = fieldGetter(field) val setter = fieldSetter(field) - if (getter == null) - throw new QException("Field may be private? Unable to find getter for field: " + field) - - if (getter == null) - throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field) - if (classOf[Seq[_]].isAssignableFrom(field.getType)) { val fieldType = getCollectionType(field) @@ -87,8 +88,19 @@ object ReflectionUtils { else None } - private[util] def fieldGetter(field: Field) = field.getDeclaringClass.getMethod(field.getName) - private[util] def fieldSetter(field: Field) = field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType) + private def fieldGetter(field: Field) = + try { + field.getDeclaringClass.getMethod(field.getName) + } catch { + case e: NoSuchMethodException => throw new QException("Field may be private? Unable to find getter for field: " + field) + } + + private def fieldSetter(field: Field) = + try { + field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType) + } catch { + case e: NoSuchMethodException => throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field) + } private def coerce(clazz: Class[_], value: String) = { if (classOf[String] == clazz) value diff --git a/shell/mergeText.sh b/shell/mergeText.sh new file mode 100755 index 000000000..b2ba5934d --- /dev/null +++ b/shell/mergeText.sh @@ -0,0 +1,36 @@ +#!/bin/sh + +# Merges a set of files, skipping over common headers. + +if [ $# -lt 2 ]; then + echo "Usage: $0 [ .. ]" + exit 1 +elif [ $# -eq 2 ]; then + cp $2 $1 +else + outputFile=$1 + shift + + test -e $outputFile && rm -f $outputFile + + exec 3< $1 + exec 4< $2 + + startLine=1 + while true; do + read -u 3 header1 + if [ $? -ne 0 ]; then break; fi + read -u 4 header2 + if [ $? -ne 0 ]; then break; fi + if [ $header1 != $header2 ]; then break; fi + echo "$header1" >> outputfile + ((startLine++)) + done + + exec 3<&- + exec 4<&- + + for inputFile in $@; do + tail -n +$startLine $inputFile >> $outputFile + done +fi diff --git a/shell/splitIntervals.sh b/shell/splitIntervals.sh new file mode 100755 index 000000000..ca4512673 --- /dev/null +++ b/shell/splitIntervals.sh @@ -0,0 +1,55 @@ +#!/bin/sh + +# Splits an interval list into multiple files + +if [ $# -lt 2 ]; then + echo "Usage: $0 [ .. ]" + exit 1 +else + inputFile=$1 + shift + + totalLines=$(wc -l < $inputFile) + + exec 3< $inputFile + + numHeaders=0 + while true; do + read -u 3 nextLine + if [ $? -ne 0 ]; then break; fi + if [[ $nextLine != @* ]]; then break; fi + ((numHeaders++)) + done + + numFiles=$# + ((numIntervals = totalLines - numHeaders)) + + if [ $numIntervals -lt $numFiles ]; then + echo "Error: Number of intervals $numIntervals is less than the number of files $numFiles." + exec 3<&- + exit 1 + fi + + ((linesPerFile = numIntervals / numFiles)) + ((remainder = numIntervals % numFiles)) + + ((linesPerFile++)) + + fileNumber=0 + for outputFile in $@; do + + # Earlier files with get the remainder until it's no longer needed. + if [ $fileNumber -eq $remainder ]; then ((linesPerFile--)); fi + ((fileNumber++)) + + head -n $numHeaders $inputFile > $outputFile + + for ((line=0; line<$linesPerFile; line++)); do + echo "$nextLine" >> $outputFile + read -u 3 nextLine + if [ $? -ne 0 ]; then break; fi + done + done + + exec 3<&- +fi