First implementation of JobRunInfo

-- onExecutionDone(Map(QFunction, JobRunInfo)) is the new signature, so that you can walk over your jobs and inspect their success/failure and runtime characteristics
This commit is contained in:
Mark DePristo 2011-08-23 16:51:54 -04:00
parent 6d6feb5540
commit 31ec6e316c
6 changed files with 100 additions and 2 deletions

View File

@ -114,7 +114,7 @@ class QCommandLine extends CommandLineProgram with Logging {
// walk over each script, calling onExecutionDone
for (script <- allQScripts) {
script.onExecutionDone(script.functions, qGraph.success)
script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success)
}
if (!qGraph.success) {

View File

@ -24,6 +24,7 @@
package org.broadinstitute.sting.queue
import engine.JobRunInfo
import org.broadinstitute.sting.queue.function.QFunction
import annotation.target.field
import io.Source
@ -61,8 +62,9 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
* A default handler for the onExecutionDone() function. By default this doesn't do anything
* except print out a fine status message.
*/
def onExecutionDone(jobs: List[QFunction], success: Boolean) {
def onExecutionDone(jobs: Map[QFunction, JobRunInfo], success: Boolean) {
logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", jobs.size))
for ( (f, info) <- jobs ) logger.info(" %s %s".format(f.jobName, info))
}
/**

View File

@ -23,6 +23,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
*/
var depth = -1
var runInfo: JobRunInfo = JobRunInfo.default // todo: replace after testing with _
/**
* Initializes with the current status of the function.
*/
@ -88,6 +90,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
tailError()
} else if (currentStatus == RunnerStatus.DONE) {
try {
runInfo = runner.getRunInfo
runner.cleanup()
function.doneOutputs.foreach(_.createNewFile())
} catch {

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.engine
import java.util.Date
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* Base class containing all of the information about a job run.
*/
class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String, status: RunnerStatus.Value) {
def getStatus = status
def getStartTime = startTime
def getDoneTime = doneTime
def getMemoryUsedInGb = memUsedInGb
def getHostname = hostName
def getRuntimeInMs: Long = {
getDoneTime.getTime - getStartTime.getTime
}
override def toString: String =
"started %s ended %s runtime %s on host %s using %d Gb memory".format(getStartTime, getDoneTime, getRuntimeInMs, getHostname, getMemoryUsedInGb)
}
object JobRunInfo {
def default = new JobRunInfo(new Date(), new Date(), 1, "localhost", RunnerStatus.DONE)
def detailed(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String) =
new JobRunInfo(startTime, doneTime, memUsedInGb, hostName, RunnerStatus.DONE)
}

View File

@ -69,6 +69,11 @@ trait JobRunner[TFunction <: QFunction] {
def cleanup() {
}
/**
* Must be overloaded
*/
def getRunInfo: JobRunInfo = JobRunInfo.default
/**
* Calls back to a hook that an expert user can setup to modify a job.
* @param value Value to modify.

View File

@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.util._
import collection.immutable.{TreeSet, TreeMap}
import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction}
import java.util.Date
/**
* The internal dependency tracker between sets of function input and output files.
@ -939,6 +940,14 @@ class QGraph extends Logging {
edges.sorted(functionOrdering).foreach(edge => if (running) f(edge))
}
/**
* Utility function for running a method over all function edges.
* @param edgeFunction Function to run for each FunctionEdge.
*/
private def getFunctionEdges: List[FunctionEdge] = {
jobGraph.edgeSet.toList.filter(_.isInstanceOf[FunctionEdge]).asInstanceOf[List[FunctionEdge]]
}
/**
* Utility function for running a method over all functions, but traversing the nodes in order of dependency.
* @param edgeFunction Function to run for each FunctionEdge.
@ -1028,6 +1037,10 @@ class QGraph extends Logging {
*/
def isShutdown = !running
def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = {
getFunctionEdges.map(edge => (edge.function, edge.runInfo)).toMap
}
/**
* Kills any forked jobs still running.
*/