From 9dc2e931b641e5fc34996ec31001e2196cdecb8b Mon Sep 17 00:00:00 2001 From: kshakir Date: Fri, 15 Oct 2010 20:00:35 +0000 Subject: [PATCH] Saving the order functions are added to in the QScript. Using the order during submission of ready jobs (but not currently dryrun) and during -status. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4508 348d0f76-0448-11de-a6fe-93d51630548a --- .../broadinstitute/sting/queue/QScript.scala | 18 +++++-- .../sting/queue/engine/QGraph.scala | 54 ++++++++++++++----- .../sting/queue/function/QFunction.scala | 3 ++ .../ScatterGatherableFunction.scala | 5 ++ 4 files changed, 63 insertions(+), 17 deletions(-) diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index 1d3d828a6..363e56831 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -1,6 +1,7 @@ package org.broadinstitute.sting.queue import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.QFunction /** * Defines a Queue pipeline as a collection of CommandLineFunctions. @@ -13,6 +14,7 @@ trait QScript extends Logging { type Argument = org.broadinstitute.sting.commandline.Argument type ArgumentCollection = org.broadinstitute.sting.commandline.ArgumentCollection type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction + type InProcessFunction = org.broadinstitute.sting.queue.function.InProcessFunction type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction type Scatter = org.broadinstitute.sting.queue.function.scattergather.Scatter type Gather = org.broadinstitute.sting.queue.function.scattergather.Gather @@ -26,7 +28,7 @@ trait QScript extends Logging { /** * The command line functions that will be executed for this QScript. */ - var functions = List.empty[CommandLineFunction] + var functions = List.empty[QFunction] /** * Exchanges the extension on a file. @@ -64,6 +66,16 @@ trait QScript extends Logging { * Adds one or more command line functions to be run. * @param functions Functions to add. */ - def add(functions: CommandLineFunction*) = this.functions ++= List(functions:_*) - + def add(functions: QFunction*) = { + functions.foreach(function => function.addOrder = QScript.nextAddOrder) + this.functions ++= functions + } +} + +object QScript { + private var addOrder = 0 + private def nextAddOrder = { + addOrder += 1 + List(addOrder) + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 30cbb9121..ac51fcf03 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -180,14 +180,14 @@ class QGraph extends Logging { case f: FunctionEdge => this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING case _ => false - }.map(_.asInstanceOf[FunctionEdge]) + }.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_)) } private def getRunningJobs = { jobGraph.edgeSet.filter{ case f: FunctionEdge => f.status == RunnerStatus.RUNNING case _ => false - }.map(_.asInstanceOf[FunctionEdge]) + }.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_)) } /** @@ -425,20 +425,23 @@ class QGraph extends Logging { * Gets job statuses by traversing the graph and looking for status-related files */ private def doStatus(statusFunc: String => Unit) = { - var statuses = Map.empty[String, AnalysisStatus] - foreachFunction(edgeCLF => { - if (edgeCLF.function.analysisName != null) { - updateStatus(statuses.get(edgeCLF.function.analysisName) match { + var statuses = List.empty[AnalysisStatus] + var maxWidth = 0 + foreachFunction(edge => { + val name = edge.function.analysisName + if (name != null) { + updateStatus(statuses.find(_.analysisName == name) match { case Some(status) => status case None => - val status = new AnalysisStatus(edgeCLF.function.analysisName) - statuses += edgeCLF.function.analysisName -> status + val status = new AnalysisStatus(name) + maxWidth = maxWidth max name.length + statuses :+= status status - }, edgeCLF) + }, edge) } }) - statuses.values.toList.sortBy(_.analysisName).foreach(status => { + statuses.foreach(status => { if (status.scatter.total + status.gather.total > 0) { var sgStatus = RunnerStatus.PENDING if (status.scatter.failed + status.gather.failed > 0) @@ -450,7 +453,7 @@ class QGraph extends Logging { status.status = sgStatus } - var info = status.analysisName + ": [" + status.status + "]" + var info = ("%-" + maxWidth + "s [%#7s]").format(status.analysisName, status.status) if (status.scatter.total + status.gather.total > 1) { info += formatSGStatus(status.scatter, "s") info += formatSGStatus(status.gather, "g") @@ -582,10 +585,33 @@ class QGraph extends Logging { * @param edgeFunction Function to run for each FunctionEdge. */ private def foreachFunction(f: (FunctionEdge) => Unit) = { - jobGraph.edgeSet.foreach{ - case functionEdge: FunctionEdge => f(functionEdge) - case map: MappingEdge => /* do nothing for mapping functions */ + jobGraph.edgeSet.toList + .filter(_.isInstanceOf[FunctionEdge]) + .map(_.asInstanceOf[FunctionEdge]) + .sortWith(compare(_,_)) + .foreach(f(_)) + } + + private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean = + compare(f1.function, f2.function) + + private def compare(f1: QFunction, f2: QFunction): Boolean = { + val len1 = f1.addOrder.size + val len2 = f2.addOrder.size + val len = len1 min len2 + + for (i <- 0 until len) { + val order1 = f1.addOrder(i) + val order2 = f2.addOrder(i) + if (order1 < order2) + return true + if (order1 > order2) + return false } + if (len1 < len2) + return true + else + return false } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index b089d7aa0..a0cee7b16 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -34,6 +34,9 @@ trait QFunction { /** Temporary directory to write any files */ var jobTempDir: File = IOUtils.javaTempDir + /** Order the function was added to the graph. */ + var addOrder: List[Int] = Nil + /** File to redirect any output. Defaults to .out */ @Output(doc="File to redirect any output", required=false) @Gather(classOf[SimpleTextGatherFunction]) diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index fdf7c5e88..732c67314 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -74,6 +74,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Create the scatter function based on @Scatter val scatterFunction = this.newScatterFunction(this.scatterField) + scatterFunction.addOrder = this.addOrder :+ 1 scatterFunction.analysisName = this.analysisName scatterFunction.qSettings = this.qSettings scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) @@ -86,9 +87,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Create the gather functions for each output field var gatherFunctions = Map.empty[ArgumentSource, GatherFunction] var gatherOutputs = Map.empty[ArgumentSource, File] + var gatherAddOrder = this.scatterCount + 2 for (gatherField <- outputFieldsWithValues) { val gatherFunction = this.newGatherFunction(gatherField) val gatherOutput = getFieldFile(gatherField) + gatherFunction.addOrder = this.addOrder :+ gatherAddOrder gatherFunction.analysisName = this.analysisName gatherFunction.qSettings = this.qSettings gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) @@ -99,6 +102,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction gatherOutputs += gatherField -> gatherOutput + gatherAddOrder += 1 } // Create the clone functions for running the parallel jobs @@ -108,6 +112,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { cloneFunction.originalFunction = this cloneFunction.index = i + cloneFunction.addOrder = this.addOrder :+ (i+1) // Setup the fields on the clone function, outputing each as a relative file in the sg directory. cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)