Saving the order functions are added to in the QScript. Using the order during submission of ready jobs (but not currently dryrun) and during -status.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4508 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-15 20:00:35 +00:00
parent 8b2d387643
commit 9dc2e931b6
4 changed files with 63 additions and 17 deletions

View File

@ -1,6 +1,7 @@
package org.broadinstitute.sting.queue
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.QFunction
/**
* Defines a Queue pipeline as a collection of CommandLineFunctions.
@ -13,6 +14,7 @@ trait QScript extends Logging {
type Argument = org.broadinstitute.sting.commandline.Argument
type ArgumentCollection = org.broadinstitute.sting.commandline.ArgumentCollection
type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
type InProcessFunction = org.broadinstitute.sting.queue.function.InProcessFunction
type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
type Scatter = org.broadinstitute.sting.queue.function.scattergather.Scatter
type Gather = org.broadinstitute.sting.queue.function.scattergather.Gather
@ -26,7 +28,7 @@ trait QScript extends Logging {
/**
* The command line functions that will be executed for this QScript.
*/
var functions = List.empty[CommandLineFunction]
var functions = List.empty[QFunction]
/**
* Exchanges the extension on a file.
@ -64,6 +66,16 @@ trait QScript extends Logging {
* Adds one or more command line functions to be run.
* @param functions Functions to add.
*/
def add(functions: CommandLineFunction*) = this.functions ++= List(functions:_*)
def add(functions: QFunction*) = {
functions.foreach(function => function.addOrder = QScript.nextAddOrder)
this.functions ++= functions
}
}
object QScript {
private var addOrder = 0
private def nextAddOrder = {
addOrder += 1
List(addOrder)
}
}

View File

@ -180,14 +180,14 @@ class QGraph extends Logging {
case f: FunctionEdge =>
this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING
case _ => false
}.map(_.asInstanceOf[FunctionEdge])
}.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_))
}
private def getRunningJobs = {
jobGraph.edgeSet.filter{
case f: FunctionEdge => f.status == RunnerStatus.RUNNING
case _ => false
}.map(_.asInstanceOf[FunctionEdge])
}.map(_.asInstanceOf[FunctionEdge]).toList.sortWith(compare(_,_))
}
/**
@ -425,20 +425,23 @@ class QGraph extends Logging {
* Gets job statuses by traversing the graph and looking for status-related files
*/
private def doStatus(statusFunc: String => Unit) = {
var statuses = Map.empty[String, AnalysisStatus]
foreachFunction(edgeCLF => {
if (edgeCLF.function.analysisName != null) {
updateStatus(statuses.get(edgeCLF.function.analysisName) match {
var statuses = List.empty[AnalysisStatus]
var maxWidth = 0
foreachFunction(edge => {
val name = edge.function.analysisName
if (name != null) {
updateStatus(statuses.find(_.analysisName == name) match {
case Some(status) => status
case None =>
val status = new AnalysisStatus(edgeCLF.function.analysisName)
statuses += edgeCLF.function.analysisName -> status
val status = new AnalysisStatus(name)
maxWidth = maxWidth max name.length
statuses :+= status
status
}, edgeCLF)
}, edge)
}
})
statuses.values.toList.sortBy(_.analysisName).foreach(status => {
statuses.foreach(status => {
if (status.scatter.total + status.gather.total > 0) {
var sgStatus = RunnerStatus.PENDING
if (status.scatter.failed + status.gather.failed > 0)
@ -450,7 +453,7 @@ class QGraph extends Logging {
status.status = sgStatus
}
var info = status.analysisName + ": [" + status.status + "]"
var info = ("%-" + maxWidth + "s [%#7s]").format(status.analysisName, status.status)
if (status.scatter.total + status.gather.total > 1) {
info += formatSGStatus(status.scatter, "s")
info += formatSGStatus(status.gather, "g")
@ -582,10 +585,33 @@ class QGraph extends Logging {
* @param edgeFunction Function to run for each FunctionEdge.
*/
private def foreachFunction(f: (FunctionEdge) => Unit) = {
jobGraph.edgeSet.foreach{
case functionEdge: FunctionEdge => f(functionEdge)
case map: MappingEdge => /* do nothing for mapping functions */
jobGraph.edgeSet.toList
.filter(_.isInstanceOf[FunctionEdge])
.map(_.asInstanceOf[FunctionEdge])
.sortWith(compare(_,_))
.foreach(f(_))
}
private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean =
compare(f1.function, f2.function)
private def compare(f1: QFunction, f2: QFunction): Boolean = {
val len1 = f1.addOrder.size
val len2 = f2.addOrder.size
val len = len1 min len2
for (i <- 0 until len) {
val order1 = f1.addOrder(i)
val order2 = f2.addOrder(i)
if (order1 < order2)
return true
if (order1 > order2)
return false
}
if (len1 < len2)
return true
else
return false
}
/**

View File

@ -34,6 +34,9 @@ trait QFunction {
/** Temporary directory to write any files */
var jobTempDir: File = IOUtils.javaTempDir
/** Order the function was added to the graph. */
var addOrder: List[Int] = Nil
/** File to redirect any output. Defaults to <jobName>.out */
@Output(doc="File to redirect any output", required=false)
@Gather(classOf[SimpleTextGatherFunction])

View File

@ -74,6 +74,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
// Create the scatter function based on @Scatter
val scatterFunction = this.newScatterFunction(this.scatterField)
scatterFunction.addOrder = this.addOrder :+ 1
scatterFunction.analysisName = this.analysisName
scatterFunction.qSettings = this.qSettings
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
@ -86,9 +87,11 @@ trait ScatterGatherableFunction extends CommandLineFunction {
// Create the gather functions for each output field
var gatherFunctions = Map.empty[ArgumentSource, GatherFunction]
var gatherOutputs = Map.empty[ArgumentSource, File]
var gatherAddOrder = this.scatterCount + 2
for (gatherField <- outputFieldsWithValues) {
val gatherFunction = this.newGatherFunction(gatherField)
val gatherOutput = getFieldFile(gatherField)
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
gatherFunction.analysisName = this.analysisName
gatherFunction.qSettings = this.qSettings
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
@ -99,6 +102,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput
gatherAddOrder += 1
}
// Create the clone functions for running the parallel jobs
@ -108,6 +112,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
cloneFunction.originalFunction = this
cloneFunction.index = i
cloneFunction.addOrder = this.addOrder :+ (i+1)
// Setup the fields on the clone function, outputing each as a relative file in the sg directory.
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)