When generating the QGraph, don't regenerate if there aren't scatter/gather jobs.
Fixed a display issue with the number of milliseconds that Queue has tried to contact LSF.
This commit is contained in:
parent
b327fa3779
commit
e93052a51e
|
|
@ -138,30 +138,32 @@ class QGraph extends Logging {
|
||||||
validate()
|
validate()
|
||||||
|
|
||||||
if (running && numMissingValues == 0) {
|
if (running && numMissingValues == 0) {
|
||||||
logger.info("Generating scatter gather jobs.")
|
|
||||||
val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge))
|
val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge))
|
||||||
|
if (!scatterGathers.isEmpty) {
|
||||||
|
logger.info("Generating scatter gather jobs.")
|
||||||
|
|
||||||
var addedFunctions = List.empty[QFunction]
|
var addedFunctions = List.empty[QFunction]
|
||||||
for (scatterGather <- scatterGathers) {
|
for (scatterGather <- scatterGathers) {
|
||||||
val functions = scatterGather.asInstanceOf[FunctionEdge]
|
val functions = scatterGather.asInstanceOf[FunctionEdge]
|
||||||
.function.asInstanceOf[ScatterGatherableFunction]
|
.function.asInstanceOf[ScatterGatherableFunction]
|
||||||
.generateFunctions()
|
.generateFunctions()
|
||||||
addedFunctions ++= functions
|
addedFunctions ++= functions
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Removing original jobs.")
|
||||||
|
this.jobGraph.removeAllEdges(scatterGathers)
|
||||||
|
prune()
|
||||||
|
|
||||||
|
logger.info("Adding scatter gather jobs.")
|
||||||
|
addedFunctions.foreach(function => if (running) this.add(function))
|
||||||
|
|
||||||
|
logger.info("Regenerating graph.")
|
||||||
|
fill
|
||||||
|
val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile
|
||||||
|
if (scatterGatherDotFile != null)
|
||||||
|
renderToDot(scatterGatherDotFile)
|
||||||
|
validate()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Removing original jobs.")
|
|
||||||
this.jobGraph.removeAllEdges(scatterGathers)
|
|
||||||
prune()
|
|
||||||
|
|
||||||
logger.info("Adding scatter gather jobs.")
|
|
||||||
addedFunctions.foreach(function => if (running) this.add(function))
|
|
||||||
|
|
||||||
logger.info("Regenerating graph.")
|
|
||||||
fill
|
|
||||||
val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile
|
|
||||||
if (scatterGatherDotFile != null)
|
|
||||||
renderToDot(scatterGatherDotFile)
|
|
||||||
validate()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -286,11 +286,11 @@ object Lsf706JobRunner extends Logging {
|
||||||
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
|
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
|
||||||
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
|
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
|
||||||
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId))
|
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId))
|
||||||
val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate)
|
val unknownStatusMillis = (System.currentTimeMillis - runner.lastStatusUpdate)
|
||||||
if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) {
|
if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) {
|
||||||
// Unknown status has been returned for a while now.
|
// Unknown status has been returned for a while now.
|
||||||
runner.updateStatus(RunnerStatus.FAILED)
|
runner.updateStatus(RunnerStatus.FAILED)
|
||||||
logger.error("Unable to read LSF status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description))
|
logger.error("Unable to read LSF status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), runner.jobId, runner.function.description))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue