From 32fc221ffe631fcd040fefd5df663b6fddd2d46c Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 15 Jun 2010 04:43:46 +0000 Subject: [PATCH] Replaced pattern matched pipeline spec with annotated objects. Old version is no longer available. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3558 348d0f76-0448-11de-a6fe-93d51630548a --- ivy.xml | 4 +- .../sting/queue/util/ClassType.java | 40 +++++ .../sting/queue/util/Input.java | 38 +++++ .../sting/queue/util/Internal.java | 40 +++++ .../sting/queue/util/Optional.java | 38 +++++ .../sting/queue/util/Output.java | 38 +++++ .../sting/queue/QArguments.scala | 24 +-- .../sting/queue/QCommandLine.scala | 10 +- .../broadinstitute/sting/queue/QScript.scala | 103 +++++++++++++ .../queue/engine/CommandLineRunner.scala | 17 +++ .../queue/engine/DispatchJobRunner.scala | 56 +++++++ .../sting/queue/engine/LsfJobRunner.scala | 57 +++++++ .../sting/queue/engine/Pipeline.scala | 16 -- .../sting/queue/engine/QCommand.scala | 9 -- .../sting/queue/engine/QFile.scala | 43 ------ .../sting/queue/engine/QGraph.scala | 71 +++++++++ .../sting/queue/engine/QModelEdge.scala | 7 - .../sting/queue/engine/QNode.scala | 6 + .../sting/queue/engine/QRule.scala | 3 - .../engine/TopologicalJobScheduler.scala | 31 ++++ .../engine/graphing/ExplicitJobGrapher.scala | 27 ---- .../queue/engine/graphing/JobGrapher.scala | 31 ---- .../engine/graphing/TreeJobGrapher.scala | 143 ------------------ .../scheduling/DispatchJobScheduler.scala | 79 ---------- .../queue/engine/scheduling/ExecEdge.scala | 17 --- .../engine/scheduling/JobScheduler.scala | 65 -------- .../engine/scheduling/MapResourceNode.scala | 5 - .../engine/scheduling/ResourceEdge.scala | 6 - .../engine/scheduling/ResourceNode.scala | 9 -- .../scheduling/SimpleJobScheduler.scala | 19 --- .../scheduling/TopologicalJobScheduler.scala | 29 ---- .../queue/function/CommandLineFunction.scala | 92 +++++++++++ .../queue/function/DispatchFunction.scala | 19 +++ .../queue/function/IntervalFunction.scala | 6 + .../queue/function/MappingFunction.scala | 14 ++ .../function/MemoryLimitedFunction.scala | 10 ++ .../sting/queue/function/QFunction.scala | 10 ++ .../queue/function/gatk/GatkFunction.scala | 35 +++++ .../sting/queue/util/Logging.scala | 2 +- .../sting/queue/util/ProcessUtils.scala | 5 +- .../sting/queue/util/ReflectionUtils.scala | 96 ++++++++++++ 41 files changed, 839 insertions(+), 531 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/queue/util/ClassType.java create mode 100644 java/src/org/broadinstitute/sting/queue/util/Input.java create mode 100644 java/src/org/broadinstitute/sting/queue/util/Internal.java create mode 100644 java/src/org/broadinstitute/sting/queue/util/Optional.java create mode 100644 java/src/org/broadinstitute/sting/queue/util/Output.java create mode 100755 scala/src/org/broadinstitute/sting/queue/QScript.scala create mode 100755 scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala create mode 100755 scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/QFile.scala create mode 100755 scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/QNode.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/QRule.scala create mode 100755 scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala delete mode 100755 scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/QFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala diff --git a/ivy.xml b/ivy.xml index f0f0498ec..433bd5ef5 100644 --- a/ivy.xml +++ b/ivy.xml @@ -42,8 +42,8 @@ - - + + diff --git a/java/src/org/broadinstitute/sting/queue/util/ClassType.java b/java/src/org/broadinstitute/sting/queue/util/ClassType.java new file mode 100644 index 000000000..ae8bcdaef --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/ClassType.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies the type of an input our output field. + * Retains it during runtime to work around type erasure. + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface ClassType { + Class value(); +} diff --git a/java/src/org/broadinstitute/sting/queue/util/Input.java b/java/src/org/broadinstitute/sting/queue/util/Input.java new file mode 100644 index 000000000..15fa28cb7 --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Input.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies an input to a QueueFunction + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Input { +} diff --git a/java/src/org/broadinstitute/sting/queue/util/Internal.java b/java/src/org/broadinstitute/sting/queue/util/Internal.java new file mode 100644 index 000000000..842cc9d1a --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Internal.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies an internal setting for a QueueFunction. + * Not an input or output but should be copied with a function. + * Internals should have default values that should be handled, i.e. they are always @Optional + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Internal { +} diff --git a/java/src/org/broadinstitute/sting/queue/util/Optional.java b/java/src/org/broadinstitute/sting/queue/util/Optional.java new file mode 100644 index 000000000..e31df349b --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Optional.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies an input or output to a QueueFunction is optional + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Optional { +} diff --git a/java/src/org/broadinstitute/sting/queue/util/Output.java b/java/src/org/broadinstitute/sting/queue/util/Output.java new file mode 100644 index 000000000..5b8c5c46d --- /dev/null +++ b/java/src/org/broadinstitute/sting/queue/util/Output.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util; + +import java.lang.annotation.*; + +/** + * Specifies an output to a QueueFunction + * Written in java because scala doesn't support RetentionPolicy.RUNTIME + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface Output { +} diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala index d17e7ab4a..b0f091b90 100755 --- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala +++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala @@ -8,13 +8,13 @@ import java.io.{FileInputStream, File} import java.util.Properties class QArguments(args: Array[String]) { - var useBsub = false + var bsubAllJobs = false var dryRun = false val scripts = new ListBuffer[String] var inputPaths = List.empty[File] var argMap = Map.empty[String, String] - parseArgs(args) + val userArgs = parseArgs(args) private def parseArgs(args: Array[String]) = { var filtered = new ListBuffer[String] @@ -26,13 +26,15 @@ class QArguments(args: Array[String]) { if (isFlagged(filtered, "-dry")) dryRun = true if (isFlagged(filtered, "-bsub")) - useBsub = true + bsubAllJobs = true for (arg <- getArgs(filtered, "-P")) addArg(arg) for (arg <- getArgs(filtered, "-I")) addFile(arg) for (arg <- getArgs(filtered, "-S")) scripts.append(arg) + + List(filtered:_*) } private def isFlagged(filtered: ListBuffer[String], search: String) = { @@ -65,22 +67,24 @@ class QArguments(args: Array[String]) { var file = new File(arg) if (arg.contains("=") && !file.exists) { val tokens = arg.split("=", 2) - argMap = argMap.updated(tokens(0), tokens(1)) - } else if (file.exists && arg.endsWith(".properties")) { + argMap += tokens(0) -> tokens(1) + } else if (arg.endsWith(".properties")) { + if (!file.exists) + throw new QException("File not found: " + file.getAbsolutePath) var props = new Properties props.load(new FileInputStream(file)) for ((name, value) <- props) - argMap = argMap.updated(name, value) + argMap += name -> value + } else { + throw new QException("Invalid property: " + arg) } } def addFile(arg: String): Unit = { var file = new File(arg) - if (arg.endsWith(".list")) { + inputPaths :+= file + if (arg.endsWith(".list")) new XReadLines(file).iterator.foreach(addFile(_)) - } else { - inputPaths = inputPaths ::: List(file) - } } } diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index c9e9ea694..79f030070 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -13,6 +13,7 @@ object QCommandLine extends Application with Logging { new QArguments(args) } catch { case exception => { + println(exception) println(usage) System.exit(-1) } @@ -34,20 +35,13 @@ object QCommandLine extends Application with Logging { System.exit(-1) } - if (qArgs.inputPaths.size == 0) { - println("Error: No inputs specified") - println(usage) - System.exit(-1) - } - - val newArgs = new ListBuffer[String] newArgs.appendAll(args) QArguments.strip(newArgs, "-S") newArgs.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, qArgs.scripts.head) MainGenericRunner.main(newArgs.toArray) - // NOTE: This line is not reached because something in MainGenericRunner is exiting the VM. + // NOTE: This line is not reached because the MainGenericRunner exits the VM. logger.debug("exiting") } } diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala new file mode 100755 index 000000000..d466c85f4 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -0,0 +1,103 @@ +package org.broadinstitute.sting.queue + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.engine.QGraph + +/** + * Syntactic sugar for filling in a pipeline using a Scala script. + */ +object QScript { + // Type aliases so users don't have to import + type File = java.io.File + type Input = org.broadinstitute.sting.queue.util.Input + type Output = org.broadinstitute.sting.queue.util.Output + type Optional = org.broadinstitute.sting.queue.util.Optional + type ClassType = org.broadinstitute.sting.queue.util.ClassType + type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction + type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction + + // The arguments for executing pipelines + private var qArgs: QArguments = _ + + // A default pipeline. Can also use multiple 'new Pipeline()' + private val pipeline = new Pipeline + + /** + * Initializes the QArguments and returns a list of the rest of the user args. + */ + def setArgs(params: Array[String]) = { + qArgs = new QArguments(params) + qArgs.userArgs + } + + /** + * Returns a list of files that were specified with "-I " on the command line + * or inside a .list file. + */ + def inputs(extension: String) = qArgs.inputPaths.filter(_.getName.endsWith(extension)) + + /** + * Exchanges the extension on a file. + */ + def swapExt(file: File, oldExtension: String, newExtension: String) = + new File(file.getName.stripSuffix(oldExtension) + newExtension) + + /** + * Adds one or more command line functions for dispatch later during run() + */ + def add(functions: CommandLineFunction*) = pipeline.add(functions:_*) + + /** + * Sets the @Input and @Output values for all the functions + */ + def setParams(): Unit = pipeline.setParams() + + /** + * Sets the @Input and @Output values for a single function + */ + def setParams(function: CommandLineFunction): Unit = pipeline.setParams(function) + + /** + * Executes functions that have been added to the pipeline. + */ + def run() = pipeline.run() + + + /** + * Encapsulates a set of functions to run together. + */ + protected class Pipeline { + private var functions = List.empty[CommandLineFunction] + + /** + * Adds one or more command line functions for dispatch later during run() + */ + def add(functions: CommandLineFunction*) = + this.functions :::= List(functions:_*) + + /** + * Sets the @Input and @Output values for all the functions + */ + def setParams(): Unit = + for (function <- functions) setParams(function) + + /** + * Sets the @Input and @Output values for a single function + */ + def setParams(function: CommandLineFunction): Unit = + for ((name, value) <- qArgs.argMap) function.setValue(name, value) + + /** + * Executes functions that have been added to the pipeline. + */ + def run() = { + val qGraph = new QGraph + qGraph.dryRun = qArgs.dryRun + qGraph.bsubAllJobs = qArgs.bsubAllJobs + for (function <- functions) + qGraph.add(function) + qGraph.fillIn + qGraph.run + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala new file mode 100755 index 000000000..318624d3c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineRunner.scala @@ -0,0 +1,17 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.util.{Logging, ProcessUtils} +import org.broadinstitute.sting.queue.function.CommandLineFunction + +/** + * Runs jobs one at a time locally + */ +trait CommandLineRunner extends Logging { + def run(function: CommandLineFunction, qGraph: QGraph) = { + var commandLine = function.commandLine + logger.info(commandLine) + + if (!qGraph.dryRun) + ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory) + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala new file mode 100755 index 000000000..8966945b1 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -0,0 +1,56 @@ +package org.broadinstitute.sting.queue.engine + +import edu.mit.broad.core.lsf.LocalLsfJob +import collection.JavaConversions._ +import management.ManagementFactory +import java.io.File +import java.util.ArrayList +import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction} + +trait DispatchJobRunner { + type DispatchJobType + private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType] + + protected def newJobName = DispatchJobRunner.nextJobName + + def dispatch(function: DispatchFunction, qGraph: QGraph) + + protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) = + dispatchJobs += function -> dispatchJob + + /** + * Walks up the graph looking for the previous LsfJobs + */ + protected def previousJobs(function: QFunction, qGraph: QGraph) : List[DispatchJobType] = { + var previous = List.empty[DispatchJobType] + + val source = qGraph.jobGraph.getEdgeSource(function) + for (incomingEdge <- qGraph.jobGraph.incomingEdgesOf(source)) { + incomingEdge match { + + // Stop recursing when we find a job along the edge and return its job id + case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction) + + // For any other type of edge find the LSF jobs preceding the edge + case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph) + } + } + previous + } +} + +object DispatchJobRunner { + private val jobNamePrefix = "Q-" + { + var prefix = ManagementFactory.getRuntimeMXBean.getName + val index = prefix.indexOf(".") + if (index >= 0) + prefix = prefix.substring(0, index) + prefix + } + private var jobIndex = 0 + + private def nextJobName = { + jobIndex += 1 + jobNamePrefix + "-" + jobIndex + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala new file mode 100644 index 000000000..edfcaf179 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -0,0 +1,57 @@ +package org.broadinstitute.sting.queue.engine + +import collection.JavaConversions._ +import edu.mit.broad.core.lsf.LocalLsfJob +import org.broadinstitute.sting.queue.function.DispatchFunction +import java.io.File +import java.util.ArrayList +import org.broadinstitute.sting.queue.util.Logging + +trait LsfJobRunner extends DispatchJobRunner with Logging { + type DispatchJobType = LocalLsfJob + + def dispatch(function: DispatchFunction, qGraph: QGraph) = { + var jobName = function.jobName + if (jobName == null) + jobName = newJobName + + var jobOutputFile = function.jobOutputFile + if (jobOutputFile == null) + jobOutputFile = new File(jobName + ".out") + + var jobErrorFile = function.jobErrorFile + if (jobErrorFile == null) + jobErrorFile = new File(jobName + ".err") + + val job = new LocalLsfJob + job.setName(jobName) + job.setOutputFile(jobOutputFile) + job.setErrFile(jobErrorFile) + job.setWorkingDir(function.commandDirectory) + job.setProject(function.jobProject) + job.setQueue(function.jobQueue) + job.setCommand(function.commandLine) + + var extraArgs = List("-r") + + if (function.memoryLimit.isDefined) + extraArgs :::= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") + + val previous = previousJobs(function, qGraph) + if (previous.size > 0) + extraArgs :::= List("-w", dependencyExpression(previous)) + + job.setExtraBsubArgs(new ArrayList(extraArgs)) + + addJob(function, job) + + logger.info(job.getBsubCommand.mkString(" ")) + + if (!qGraph.dryRun) + job.start + } + + private def dependencyExpression(jobs: List[LocalLsfJob]) = { + jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")") + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala b/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala deleted file mode 100755 index fff54849b..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/Pipeline.scala +++ /dev/null @@ -1,16 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import graphing.{ExplicitJobGrapher, TreeJobGrapher} -import scheduling.ResourceNode - -/** - * Syntactic sugar for filling in a pipeline using a Scala script. - */ -object Pipeline { - def addRule(rule: (Any, Any), commandString: String): Unit = TreeJobGrapher.addRule(rule, commandString) - def run(args: Array[String], sources: Any, targets: Any): Unit = TreeJobGrapher.run(args, sources, targets) - - def node() = ExplicitJobGrapher.node() - def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = ExplicitJobGrapher.addEdge(rule, commandString) - def run(): Unit = ExplicitJobGrapher.run() -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala b/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala deleted file mode 100755 index 1126fea19..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/QCommand.scala +++ /dev/null @@ -1,9 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -/** - * Defines a basic command to run - * TODO: Allow overriding arguments per command such as the job queue - */ -class QCommand(val commandString: String) extends QModelEdge { - override def toString = commandString -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala b/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala deleted file mode 100755 index 27c11b545..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/QFile.scala +++ /dev/null @@ -1,43 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder} -import java.io.File -import org.apache.commons.lang.StringUtils -import org.broadinstitute.sting.queue.QException - -/** - * Represents a file extension along with several tags. - * TODO: Use the tags to map rules between wildcards, ex: *.vcf -> *.eval - */ -class QFile(val fileType: String, val parts: String*) { - val extension = (List(parts:_*) ::: List(fileType)).mkString(".") - override def toString = extension - override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1) - override def hashCode = HashCodeBuilder.reflectionHashCode(this) - - def matchesFile(path: String): Boolean = matchesFile(new File(path)) - def matchesFile(file: File): Boolean = file.getAbsolutePath.endsWith(extension) - def baseName(path: String): String = baseName(new File(path)) - def baseName(file: File): String = StringUtils.removeEnd(file.getAbsolutePath, extension) - def fullName(baseName: String) = baseName + extension -} - -object QFile { - def getFiles(files: Any) : List[QFile] = { - files match { - case null => List.empty[QFile] - case Nil => List.empty[QFile] - case path: String => List(new QFile(path)) - case file: QFile => List(file) - // Any List or Tuple add the members to this list - case product: Product => { - var list = List.empty[QFile] - for (fileList <- product.productIterator.toList.map(getFiles(_))) { - list :::= fileList - } - list - } - case x => throw new QException("Unknown file type: " + x) - } - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala new file mode 100755 index 000000000..f54a8f0ab --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -0,0 +1,71 @@ +package org.broadinstitute.sting.queue.engine + +import org.jgrapht.graph.SimpleDirectedGraph +import scala.collection.JavaConversions +import scala.collection.immutable.ListMap +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction} + +class QGraph extends Logging { + var dryRun = true + var bsubAllJobs = false + val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction]) + def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size + + def add(command: CommandLineFunction) { + add(command, true) + } + + /** + * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs. + */ + def fillIn = { + // clone since edgeSet is backed by the graph + for (function <- JavaConversions.asSet(jobGraph.edgeSet).clone) { + val inputs = function.inputs + val outputs = function.outputs + + if (inputs.size > 1) + for ((name, input) <- inputs) + addNullEdge(ListMap(name -> input), inputs) + + if (outputs.size > 1) + for ((name, output) <- outputs) + addNullEdge(outputs, ListMap(name -> output)) + } + } + + def run = { + var isReady = true + for (function <- JavaConversions.asSet(jobGraph.edgeSet)) { + val missingValues = function.missingValues + if (missingValues.size > 0) { + isReady = false + logger.error(function match { + case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine) + case x => "Missing values:" + }) + for (missing <- missingValues) { + logger.error(" " + missing) + } + } + } + + if (isReady || this.dryRun) + (new TopologicalJobScheduler(this) with LsfJobRunner).runJobs + } + + private def add(f: QFunction, replace: Boolean) { + val inputs = QNode(f.inputs.values.filter(_ != null).toSet) + val outputs = QNode(f.outputs.values.filter(_ != null).toSet) + jobGraph.addVertex(inputs) + jobGraph.addVertex(outputs) + if (replace) + jobGraph.removeAllEdges(inputs, outputs) + jobGraph.addEdge(inputs, outputs, f) + } + + private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = { + add(new MappingFunction(input, output), false) + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala deleted file mode 100644 index 580501ef1..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/QModelEdge.scala +++ /dev/null @@ -1,7 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -/** - * Used for modeling before an edge gets - * replicated into an actual ResourceEdge - */ -class QModelEdge diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala new file mode 100644 index 000000000..15a405d82 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala @@ -0,0 +1,6 @@ +package org.broadinstitute.sting.queue.engine + +/** + * Represents a state between QFunctions the directed acyclic QGraph + */ +case class QNode (private val items: Set[Any]) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala b/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala deleted file mode 100755 index a52e3d363..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/QRule.scala +++ /dev/null @@ -1,3 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -class QRule (val inputs: List[QFile], val outputs: List[QFile], var command: QCommand) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala new file mode 100755 index 000000000..882f6977c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala @@ -0,0 +1,31 @@ +package org.broadinstitute.sting.queue.engine + +import org.jgrapht.traverse.TopologicalOrderIterator +import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter} +import collection.JavaConversions._ +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, DispatchFunction, QFunction} + +/** + * Loops over the job graph running jobs as the edges are traversed + */ +abstract class TopologicalJobScheduler(private val qGraph: QGraph) + extends CommandLineRunner with DispatchJobRunner with Logging { + + protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph) + + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { + case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph) + case f: CommandLineFunction => run(f, qGraph) + case f: MappingFunction => /* do nothing for mapping functions */ + } + }) + + def runJobs = { + logger.info("Number of jobs: %s".format(this.qGraph.numJobs)) + for (target <- iterator) { + // Do nothing for now, let event handler respond + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala deleted file mode 100644 index 4be5e17aa..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/ExplicitJobGrapher.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.broadinstitute.sting.queue.engine.graphing - -import org.broadinstitute.sting.queue.engine.scheduling.{ResourceNode, ExecEdge} -import org.broadinstitute.sting.queue.engine.QCommand - -class ExplicitJobGrapher extends JobGrapher - -object ExplicitJobGrapher { - private val grapher = new ExplicitJobGrapher - - def node() = new ResourceNode - - def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = { - addEdge(rule._1, rule._2, new QCommand(commandString)) - } - - private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = { - val resourceEdge = new ExecEdge(command.commandString) - grapher.jobGraph.addVertex(source) - grapher.jobGraph.addVertex(target) - grapher.jobGraph.addEdge(source, target, resourceEdge) - } - - def run() = { - grapher.run() - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala deleted file mode 100755 index 4fc1c0cc5..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/JobGrapher.scala +++ /dev/null @@ -1,31 +0,0 @@ -package org.broadinstitute.sting.queue.engine.graphing - -import org.broadinstitute.sting.queue.QArguments -import org.broadinstitute.sting.queue.engine.scheduling._ -import org.broadinstitute.sting.queue.util.Logging -import org.jgrapht.graph.SimpleDirectedGraph - -abstract class JobGrapher() extends Logging { - /** - * Jobs to be run. - * Can be populated adhoc or during createJobGraph() - */ - protected val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge]) - - var qArgs: QArguments = _ - - def run() = { - createJobGraph() - val scheduler = createScheduler() - scheduler.runJobs - } - - protected def createJobGraph() = {} - - private def createScheduler() : JobScheduler = { - qArgs.useBsub match { - case false => new SimpleJobScheduler(jobGraph, qArgs) - case true => new DispatchJobScheduler(jobGraph, qArgs) - } - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala b/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala deleted file mode 100644 index 55b77b236..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/graphing/TreeJobGrapher.scala +++ /dev/null @@ -1,143 +0,0 @@ -package org.broadinstitute.sting.queue.engine.graphing - -import org.jgrapht.graph.SimpleDirectedGraph -import org.jgrapht.alg.BellmanFordShortestPath -import org.broadinstitute.sting.queue.{QArguments, QException} -import collection.mutable.ListBuffer -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.engine.scheduling.{ResourceEdge, ExecEdge, MapResourceNode} -import org.broadinstitute.sting.queue.engine.{QModelEdge, QCommand, QFile, QRule} - -/** - * Converts a set of rules provided by the user and a list of files into a graph of jobs to run - */ -class TreeJobGrapher extends JobGrapher { - private val modelGraph = new SimpleDirectedGraph[List[QFile], QModelEdge](classOf[QModelEdge]) - - private val rules = new ListBuffer[QRule] - private var sourceFiles = List.empty[QFile] - private var targetFiles = List.empty[QFile] - - // Used to tag a model edge for Element <-> List - private class QCollectionEdge extends QModelEdge - - override protected def createJobGraph() = { - createModelGraph() - - var missingPaths = List.empty[Tuple2[QFile,QFile]] - for (sourceFile <- sourceFiles) { - for (targetFile <- targetFiles) { - var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, List(sourceFile), List(targetFile)) - if (shortestPath == null) - missingPaths = missingPaths ::: List((sourceFile, targetFile)) - else - addPaths(shortestPath, qArgs) - } - } - - for ((sourceFile, targetFile) <- missingPaths) { - logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile)) - } - - if (missingPaths.size > 0) - throw new QException("Not all inputs and outputs found in the pipeline graph") - } - - private def createModelGraph() = { - - // Look for rules with more than one input or output and add - // internal dependencies between the elements and the list - for (rule <- rules) { - if (rule.inputs.size > 1) { - for (input <- rule.inputs) { - modelGraph.addVertex(List(input)) - modelGraph.addVertex(rule.inputs) - modelGraph.addEdge(List(input), rule.inputs, new QCollectionEdge) - } - } - if (rule.outputs.size > 1) { - for (output <- rule.outputs) { - modelGraph.addVertex(rule.outputs) - modelGraph.addVertex(List(output)) - modelGraph.addEdge(rule.outputs, List(output), new QCollectionEdge) - } - } - } - - // Add the explicit rules - for (rule <- rules) { - modelGraph.addVertex(rule.inputs) - modelGraph.addVertex(rule.outputs) - modelGraph.addEdge(rule.inputs, rule.outputs, rule.command) - } - } - - private def addPaths(shortestPath: java.util.List[QModelEdge], qArgs: QArguments) { - for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) { - val source = modelGraph.getEdgeSource(shortestPath.head).head - if (source.matchesFile(inputFile)) { - val baseName = source.baseName(inputFile) - val target = modelGraph.getEdgeTarget(shortestPath.last).head - addPathsToTarget(baseName, List(target)) - } - } - } - - private def addPathsToTarget(baseName: String, targets: List[QFile]) : Unit = { - for (command <- modelGraph.incomingEdgesOf(targets)) { - val sources = modelGraph.getEdgeSource(command) - addJobGraphEdge(baseName, sources, targets, command) - addPathsToTarget(baseName, sources) - } - } - - private def addJobGraphEdge(baseName: String, sources: List[QFile], targets: List[QFile], command: QModelEdge) { - val resourceSource = new MapResourceNode(mapFiles(baseName, sources)) - val resourceTarget = new MapResourceNode(mapFiles(baseName, targets)) - val resourceEdge = command match { - case qCommand: QCommand => new ExecEdge(qCommand.commandString) - case qTransition: QCollectionEdge => new ResourceEdge - } - jobGraph.addVertex(resourceSource) - jobGraph.addVertex(resourceTarget) - jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge) - } - - /** - * Creates a mapping of the files based on the baseName. - * key: file extension - * value: full name - * Used by the JobScheduler to lookup values as the commands are expanded at exec time. - */ - private def mapFiles(baseName: String, files: List[QFile]) : Map[String, String] = { - Map(files.map(file => (file.extension, file.fullName(baseName))):_*) - } -} - -/** - * Syntactic sugar for filling in a pipeline using a Scala script. - */ -object TreeJobGrapher { - private val grapher = new TreeJobGrapher - - /** - * Sugar that allows addRule( inputs -> outputs, command ) - */ - def addRule(rule: (Any, Any), commandString: String): Unit = { - val inputs = QFile.getFiles(rule._1) - val outputs = QFile.getFiles(rule._2) - val command = new QCommand(commandString) - addRule(inputs, outputs, command) - } - - private def addRule(inputs: List[QFile], outputs: List[QFile], command: QCommand): Unit = { - grapher.rules += new QRule(inputs, outputs, command) - } - - def run(args: Array[String], sources: Any, targets: Any) = { - grapher.qArgs = new QArguments(args) - grapher.sourceFiles = QFile.getFiles(sources) - grapher.targetFiles = QFile.getFiles(targets) - grapher.run() - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala deleted file mode 100755 index fb7450d9c..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/DispatchJobScheduler.scala +++ /dev/null @@ -1,79 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.jgrapht.DirectedGraph -import edu.mit.broad.core.lsf.LocalLsfJob -import collection.JavaConversions._ -import management.ManagementFactory -import java.io.File -import java.util.ArrayList -import org.broadinstitute.sting.queue.QArguments - -/** - * Dispatches jobs to LSF and then returns. - */ -class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) - extends TopologicalJobScheduler(jobGraph, qArgs) { - private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob] - private var lsfJobIndex = 0 - private val jvmName = ManagementFactory.getRuntimeMXBean.getName - private val jobNamePrefix = "Q-" + jvmName - - def processExec(exec: ExecEdge) = { - lsfJobIndex += 1 - val job = new LocalLsfJob - val jobName = jobNamePrefix + "-" + lsfJobIndex - val outputFile = jobName + ".out" - val errorFile = jobName + ".err" - val workingDir = lookup(exec, "jobWorkingDir", ".") - val lsfProject = lookup(exec, "jobProject", "Queue") - val queue = lookup(exec, "jobQueue", "broad") - val memory = lookup(exec, "jobMemory", "2") - - var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]") - - val sourceJobs = sourceLsfJobs(exec) - if (sourceJobs.size > 0) { - extraArgs :::= List("-w", dependencyExpression(sourceJobs)) - } - job.setName(jobName) - job.setExtraBsubArgs(new ArrayList(extraArgs)) - job.setProject(lsfProject) - job.setWorkingDir(new File(workingDir)) - job.setProject(lsfProject) - job.setCommand(exec.commandString) - job.setOutputFile(new File(workingDir, outputFile)) - job.setErrFile(new File(workingDir, errorFile)) - job.setQueue(queue) - - lsfJobs = lsfJobs.updated(exec, job) - - logger.info(job.getBsubCommand.mkString(" ")) - - if (!qArgs.dryRun) - job.start - } - - /** - * Walks up the graph looking for the previous LsfJobs for this node - */ - private def sourceLsfJobs(edge: ResourceEdge) : List[LocalLsfJob] = { - var sourceJobs = List.empty[LocalLsfJob] - - val source = this.jobGraph.getEdgeSource(edge) - for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { - incomingEdge match { - - // Stop recursing when we find a job along this edge and return it's job id - case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec)) - - // For any other type of edge find the LSF jobs preceeding the edge - case resourceEdge: ResourceEdge => sourceJobs :::= sourceLsfJobs(resourceEdge) - } - } - sourceJobs - } - - private def dependencyExpression(jobs: List[LocalLsfJob]) = { - jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")") - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala deleted file mode 100755 index ac09bf51f..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ExecEdge.scala +++ /dev/null @@ -1,17 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.apache.commons.lang.text.{StrLookup, StrSubstitutor} -import java.lang.String - -class ExecEdge(private val templateCommandString: String) - extends ResourceEdge { - private var convertedCommandString: String = _ - def commandString = convertedCommandString - - override def traverse(graph: JobScheduler) = { - // Lookup any variable using the target node, or any of it's input nodes. - val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) }) - convertedCommandString = sub.replace(templateCommandString) - graph.processExec(this) - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala deleted file mode 100755 index c3a92b2f7..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/JobScheduler.scala +++ /dev/null @@ -1,65 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.jgrapht.DirectedGraph -import org.broadinstitute.sting.queue.util.Logging -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.QArguments - -abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge], - protected val qArgs: QArguments) extends Logging { - - private var missingKeys = Set.empty[String] - - def runJobs - def numJobs = jobGraph.edgeSet.size - - def processExec(exec: ExecEdge) : Unit - - /** - * Emulates storing of properties per node by looking up values on - * the current edge/target-node or any preceding nodes in the graph. - */ - def lookup(edge: ResourceEdge, key: String, default: String) : String = { - val value = lookupRecursive(edge, key) match { - case Some(value) => value - case None => qArgs.argMap.getOrElse(key, default) - } - if (value == null) - missingKeys = missingKeys ++ Set(key) - value - } - - protected def logMissingKeys = { - if (qArgs.dryRun && !missingKeys.isEmpty) { - logger.warn("Missing keys:") - for (key <- missingKeys) - logger.warn(" ${" + key + "}") - } - } - - private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = { - var value = edge.lookup(key) - if (value.isDefined) - return value - - value = this.jobGraph.getEdgeTarget(edge).lookup(key) - if (value.isDefined) - return value - - return lookupRecursive(this.jobGraph.getEdgeSource(edge), key) - } - - private def lookupRecursive(node: ResourceNode, key: String) : Option[String] = { - var value = node.lookup(key) - if (value.isDefined) - return value - - for (edge <- this.jobGraph.incomingEdgesOf(node)) { - value = lookupRecursive(edge, key) - if (value.isDefined) - return value - } - - return None - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala deleted file mode 100644 index 59155a610..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/MapResourceNode.scala +++ /dev/null @@ -1,5 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -class MapResourceNode (private val resources: Map[String,String]) extends ResourceNode { - override def lookup(key: String) : Option[String] = this.resources.get(key) -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala deleted file mode 100755 index a06ca9871..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceEdge.scala +++ /dev/null @@ -1,6 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -class ResourceEdge { - def traverse(graph: JobScheduler): Unit = {} - def lookup(key: String) : Option[String] = None -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala deleted file mode 100755 index 475224ef8..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/ResourceNode.scala +++ /dev/null @@ -1,9 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder} - -class ResourceNode() { - override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1) - override def hashCode = HashCodeBuilder.reflectionHashCode(this) - def lookup(key: String) : Option[String] = None -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala deleted file mode 100755 index 6c1ea43ca..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/SimpleJobScheduler.scala +++ /dev/null @@ -1,19 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.broadinstitute.sting.queue.util.ProcessUtils -import org.broadinstitute.sting.queue.QArguments -import org.jgrapht.DirectedGraph - -/** - * Runs jobs one at a time locally - */ -class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) - extends TopologicalJobScheduler(jobGraph, qArgs) { - def processExec(exec: ExecEdge) = { - var commandString = exec.commandString - logger.info(commandString) - - if (!qArgs.dryRun) - ProcessUtils.runCommandAndWait(commandString) - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala deleted file mode 100755 index 3689ebd6e..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/scheduling/TopologicalJobScheduler.scala +++ /dev/null @@ -1,29 +0,0 @@ -package org.broadinstitute.sting.queue.engine.scheduling - -import org.jgrapht.DirectedGraph -import org.jgrapht.traverse.TopologicalOrderIterator -import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter} -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.QArguments - -/** - * Loops over the job graph running jobs as the edges are traversed - */ -abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments) - extends JobScheduler(jobGraph, qArgs) { - - protected val iterator = new TopologicalOrderIterator(this.jobGraph) - - iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] { - override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = - event.getEdge.traverse(TopologicalJobScheduler.this) - }) - - override def runJobs = { - logger.info(String.format("Running %s jobs.", this.numJobs.toString)) - for (target <- iterator) { - // Do nothing for now, let event handler respond - } - logMissingKeys - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala new file mode 100644 index 000000000..4288cf8b7 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -0,0 +1,92 @@ +package org.broadinstitute.sting.queue.function + +import java.io.File +import org.broadinstitute.sting.queue.util._ +import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph} + +trait CommandLineFunction extends QFunction with DispatchFunction { + + /** + * The command line to run locally or via grid computing. + */ + def commandLine: String + + /** + * The directory where the command should run. + */ + @Internal + var commandDirectory: File = new File(".") + + /** + * Repeats parameters with a prefix/suffix if they are set otherwise returns "". + * Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString. + */ + protected def repeat(prefix: String, params: Seq[_], suffix: String = "", separator: String = "") = + params.filter(param => hasValue(param)).map(param => prefix + toValue(param) + suffix).mkString(separator) + + /** + * Returns parameter with a prefix/suffix if it is set otherwise returns "". + * Does not output null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString. + */ + protected def optional(prefix: String, param: Any, suffix: String = "") = + if (hasValue(param)) prefix + toValue(param) + suffix else "" + + /** + * Sets a field value using the name of the field. + * Field must be annotated with @Input, @Output, or @Internal + * @returns true if the value was found and set + */ + def setValue(name: String, value: String) = { + ReflectionUtils.getField(this, name) match { + case Some(field) => + val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input]) + val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output]) + val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal]) + if (isInput || isOutput || isInternal) { + ReflectionUtils.setValue(this, field, value) + } + true + case None => false + } + } + + private lazy val fields = ReflectionUtils.getAllFields(this.getClass) + private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal]) + def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input]) + def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output]) + + override def missingValues = { + val missingInputs = missingFields(inputs) + val missingOutputs = missingFields(outputs) + missingInputs | missingOutputs + } + + private def missingFields(fields: Map[String, Any]) = { + var missing = Set.empty[String] + for ((name, value) <- fields) { + val isOptional = ReflectionUtils.getField(this, name) match { + case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional]) + case None => false + } + if (!isOptional) + if (!hasValue(value)) + missing += name + } + missing + } + + private def hasValue(param: Any) = param match { + case null => false + case Nil => false + case None => false + case _ => true + } + + private def toValue(param: Any): String = param match { + case null => "" + case Nil => "" + case None => "" + case Some(x) => x.toString + case x => x.toString + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala new file mode 100644 index 000000000..024cee4f5 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/DispatchFunction.scala @@ -0,0 +1,19 @@ +package org.broadinstitute.sting.queue.function + +import java.io.File +import org.broadinstitute.sting.queue.util.Internal + +trait DispatchFunction extends QFunction with MemoryLimitedFunction { + def commandLine: String + var commandDirectory: File + + var jobName: String = _ + var jobOutputFile: File = _ + var jobErrorFile: File = _ + + @Internal + var jobProject = "Queue" + + @Internal + var jobQueue = "broad" +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala new file mode 100644 index 000000000..06ba9f3e8 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/IntervalFunction.scala @@ -0,0 +1,6 @@ +package org.broadinstitute.sting.queue.function + +trait IntervalFunction { + type Intervals = String + var intervals: Intervals +} \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala new file mode 100644 index 000000000..509c7cad0 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala @@ -0,0 +1,14 @@ +package org.broadinstitute.sting.queue.function + +import org.broadinstitute.sting.queue.engine.QGraph +import scala.collection.immutable.ListMap + +/** + * Utility class to map a set of inputs to set of outputs. + * The QGraph uses this function internally to return + */ +class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction { + def inputs = in + def outputs = out + def run(qGraph: QGraph) = null +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala new file mode 100644 index 000000000..7f9d98542 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/MemoryLimitedFunction.scala @@ -0,0 +1,10 @@ +package org.broadinstitute.sting.queue.function + +import org.broadinstitute.sting.queue.util.{Input, Optional, ClassType} + +trait MemoryLimitedFunction { + @Input + @Optional + @ClassType(classOf[Int]) + var memoryLimit: Option[Int] = None +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala new file mode 100644 index 000000000..bef3257d9 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -0,0 +1,10 @@ +package org.broadinstitute.sting.queue.function + +import org.broadinstitute.sting.queue.engine.QGraph +import scala.collection.immutable.ListMap + +trait QFunction { + def inputs: ListMap[String, Any] + def outputs: ListMap[String, Any] + def missingValues = Set.empty[String] +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala new file mode 100644 index 000000000..6d6fb7b8e --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/gatk/GatkFunction.scala @@ -0,0 +1,35 @@ +package org.broadinstitute.sting.queue.function.gatk + +import java.io.File +import org.broadinstitute.sting.queue.util.{ClassType, Input, Optional} +import org.broadinstitute.sting.queue.function.{MemoryLimitedFunction, IntervalFunction, CommandLineFunction} + +trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with IntervalFunction { + @Input + @Optional + var javaTmpDir: String = _ + + @Input + var gatkJar: String = _ + + @Input + var referenceFile: String = _ + + @Input + @Optional + @ClassType(classOf[File]) + var bamFiles: List[File] = Nil + + @Input + @Optional + var dbsnp: File = _ + + @Input + @Optional + var intervals: Intervals = new Intervals("all") + + protected def gatkCommandLine(walker: String) = + "java%s%s -jar %s -T %s -R %s%s%s " + .format(optional(" -Xmx", memoryLimit, "g"), optional(" -Djava.io.tmpdir=", javaTmpDir), + gatkJar, walker, referenceFile, repeat(" -I ", bamFiles), optional(" -D ", dbsnp), optional(" -L ", intervals)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala index 7a6bf3f27..6861a31d6 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/Logging.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/Logging.scala @@ -7,7 +7,7 @@ import org.apache.log4j._ */ trait Logging { private val className = this.getClass.getName - lazy val logger = configuredLogger + protected lazy val logger = configuredLogger def configuredLogger = { Logging.configureLogging diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala index fdb90f733..f79a4f33d 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessUtils.scala @@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.util import org.broadinstitute.sting.utils.text.XReadLines import collection.mutable.ListBuffer import collection.JavaConversions._ +import java.io.File object ProcessUtils extends Logging { @@ -15,10 +16,10 @@ object ProcessUtils extends Logging { val running = new ListBuffer[Process]() - def runCommandAndWait(command: String) = { + def runCommandAndWait(command: String, directory: File) = { logger.debug("Running command: " + command) - var builder = new ProcessBuilder("sh", "-c", command) + var builder = new ProcessBuilder("sh", "-c", command).directory(directory) var process = builder.start running += process diff --git a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala new file mode 100644 index 000000000..cfc10f409 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala @@ -0,0 +1,96 @@ +package org.broadinstitute.sting.queue.util + +import java.lang.reflect.Field +import org.broadinstitute.sting.queue.QException +import java.lang.annotation.Annotation +import scala.concurrent.JavaConversions +import scala.concurrent.JavaConversions._ +import scala.collection.immutable.ListMap + +object ReflectionUtils { + def getField(obj: AnyRef, name: String) = getAllFields(obj.getClass).find(_.getName == name) + + def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null + + def getFieldsAnnotatedWith(obj: AnyRef, fields: List[Field], annotation: Class[_ <: Annotation]) = + ListMap(fields.filter(field => hasAnnotation(field, annotation)) + .map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*) + + def getAllTypes(clazz: Class[_]) = { + var types = List.empty[Class[_]] + var c = clazz + while (c != null) { + types :+= c + c = c.getSuperclass + } + types + } + + def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList) + + def setValue(obj: AnyRef, field: Field, value: String) = { + + val getter = fieldGetter(field) + val setter = fieldSetter(field) + + if (getter == null) + throw new QException("Field may be private? Unable to find getter for field: " + field) + + if (getter == null) + throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field) + + if (classOf[Seq[_]].isAssignableFrom(field.getType)) { + + if (!field.isAnnotationPresent(classOf[ClassType])) + throw new QException("@ClassType must be specified due to type erasure for field: " + field) + + val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value + val typeValue = coerce(fieldType, value) + + var list = getter.invoke(obj).asInstanceOf[Seq[_]] + list :+= typeValue + setter.invoke(obj, list) + + } else if (classOf[Option[_]].isAssignableFrom(field.getType)) { + + if (!field.isAnnotationPresent(classOf[ClassType])) + throw new QException("@ClassType must be specified due to type erasure for field: " + field) + + val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value + val typeValue = coerce(fieldType, value) + + setter.invoke(obj, Some(typeValue)) + + } else { + + val fieldType = field.getType + val typeValue = coerce(fieldType, value) + + setter.invoke(obj, typeValue.asInstanceOf[AnyRef]) + } + } + + private[util] def fieldGetter(field: Field) = field.getDeclaringClass.getMethod(field.getName) + private[util] def fieldSetter(field: Field) = field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType) + + private def coerce(clazz: Class[_], value: String) = { + if (classOf[String] == clazz) value + else if (classOf[Boolean] == clazz) value.toBoolean + else if (classOf[Byte] == clazz) value.toByte + else if (classOf[Short] == clazz) value.toShort + else if (classOf[Int] == clazz) value.toInt + else if (classOf[Long] == clazz) value.toLong + else if (classOf[Float] == clazz) value.toFloat + else if (classOf[Double] == clazz) value.toDouble + else if (hasStringConstructor(clazz)) + clazz.getConstructor(classOf[String]).newInstance(value) + else throw new QException("Unable to coerce value '%s' to type '%s'.".format(value, clazz)) + } + + private def hasStringConstructor(clazz: Class[_]) = { + clazz.getConstructors.exists(constructor => { + val parameters = constructor.getParameterTypes + parameters.size == 1 && parameters.head == classOf[String] + }) + } +}