Complete GSA-497: Let Queue write out runInfo on the fly, after each job group finishes running

-- Queue will incrementally now write out its jobReport.txt file whenever jobs finish running (FAIL or DONE)
-- This makes it far easier to track what's going on, or to analyze incrementally performance results coming out of Queue
-- Generally cleaned up the QJobsReporting code, creating a new clean class QJobsReporter that holds all of the information on what to do log and where to put into, which was previously scattered in QCommandLine and QJobReport
This commit is contained in:
Mark DePristo 2012-08-20 10:33:42 -04:00
parent 94e7f677ad
commit 9eec33ec3b
4 changed files with 189 additions and 91 deletions

View File

@ -105,8 +105,7 @@ class QCommandLine extends CommandLineProgram with Logging {
def execute = {
if (settings.qSettings.runName == null)
settings.qSettings.runName = FilenameUtils.removeExtension(scripts.head.getName)
qGraph.settings = settings
qGraph.initializeWithSettings(settings)
val allQScripts = pluginManager.createAllTypes();
for (script <- allQScripts) {
@ -137,26 +136,9 @@ class QCommandLine extends CommandLineProgram with Logging {
logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", functionsAndStatus.size))
if (!settings.disableJobReport) {
val jobStringName = {
if (settings.jobReportFile != null)
settings.jobReportFile
else
settings.qSettings.runName + ".jobreport.txt"
}
if (!shuttingDown) {
val reportFile = IOUtils.absolute(settings.qSettings.runDirectory, jobStringName)
logger.info("Writing JobLogging GATKReport to file " + reportFile)
QJobReport.printReport(functionsAndStatus, reportFile)
if (settings.run) {
val pdfFile = IOUtils.absolute(settings.qSettings.runDirectory, FilenameUtils.removeExtension(jobStringName) + ".pdf")
logger.info("Plotting JobLogging GATKReport to file " + pdfFile)
QJobReport.plotReport(reportFile, pdfFile)
}
}
}
// write the final complete job report
logger.info("Writing final jobs report...")
qGraph.writeJobsReport()
if (!qGraph.success) {
logger.info("Done with errors")

View File

@ -39,7 +39,7 @@ import collection.immutable.{TreeSet, TreeMap}
import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction}
import java.util.Date
import org.broadinstitute.sting.utils.Utils
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.io.{FilenameUtils, FileUtils, IOUtils}
import java.io.{OutputStreamWriter, File}
/**
@ -71,6 +71,16 @@ class QGraph extends Logging {
private val inProcessManager = new InProcessJobManager
private def managers = Seq[Any](inProcessManager, commandLineManager)
/**
* If true, we will write out incremental job reports
*/
private val INCREMENTAL_JOBS_REPORT = true
/**
* Holds the optional jobInfoReporter structure
*/
private var jobInfoReporter: QJobsReporter = null
private class StatusCounts {
var pending = 0
var running = 0
@ -79,6 +89,19 @@ class QGraph extends Logging {
}
private val statusCounts = new StatusCounts
/**
* Final initialization step of this QGraph -- tell it runtime setting information
*
* The settings aren't necessarily available until after this QGraph object has been constructed, so
* this function must be called once the QGraphSettings have been filled in.
*
* @param settings
*/
def initializeWithSettings(settings: QGraphSettings) {
this.settings = settings
this.jobInfoReporter = createJobsReporter()
}
/**
* Adds a QScript created CommandLineFunction to the graph.
* @param command Function to add to the graph.
@ -467,6 +490,12 @@ class QGraph extends Logging {
checkRetryJobs(failedJobs)
}
// incremental
if ( logNextStatusCounts && INCREMENTAL_JOBS_REPORT ) {
logger.info("Writing incremental jobs reports...")
writeJobsReport(false)
}
readyJobs ++= getReadyJobs
}
@ -1084,6 +1113,39 @@ class QGraph extends Logging {
}
}
/**
* Create the jobsReporter for this QGraph, based on the settings data.
*
* Must be called after settings has been initialized properly
*
* @return
*/
private def createJobsReporter(): QJobsReporter = {
val jobStringName = if (settings.jobReportFile != null)
settings.jobReportFile
else
settings.qSettings.runName + ".jobreport.txt"
val reportFile = org.broadinstitute.sting.utils.io.IOUtils.absolute(settings.qSettings.runDirectory, jobStringName)
val pdfFile = if ( settings.run )
Some(org.broadinstitute.sting.utils.io.IOUtils.absolute(settings.qSettings.runDirectory, FilenameUtils.removeExtension(jobStringName) + ".pdf"))
else
None
new QJobsReporter(settings.disableJobReport, reportFile, pdfFile)
}
/**
* Write, if possible, the jobs report
*/
def writeJobsReport(plot: Boolean = true) {
// note: the previous logic didn't write the job report if the system was shutting down, but I don't
// see any reason not to write the job report
if ( jobInfoReporter != null )
jobInfoReporter.write(this, plot)
}
/**
* Returns true if the graph was shutdown instead of exiting on its own.
*/

View File

@ -25,13 +25,8 @@
package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport}
import org.broadinstitute.sting.utils.exceptions.UserException
import org.broadinstitute.sting.gatk.report.GATKReportTable
import org.broadinstitute.sting.queue.engine.JobRunInfo
import java.io.{PrintStream, File}
import org.broadinstitute.sting.utils.R.{RScriptLibrary, RScriptExecutor}
import org.broadinstitute.sting.utils.io.Resource
import org.apache.commons.io.{IOUtils, FileUtils}
/**
* A mixin to add Job info to the class
@ -98,31 +93,10 @@ trait QJobReport extends Logging {
}
object QJobReport {
val JOB_REPORT_QUEUE_SCRIPT = "queueJobReport.R"
// todo -- fixme to have a unique name for Scatter/gather jobs as well
var seenCounter = 1
var seenNames = Set[String]()
def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport)
jobs foreach {case (qf, info) => qf.setRunInfo(info)}
val stream = new PrintStream(FileUtils.openOutputStream(dest))
try {
printJobLogging(jobs.keys.toSeq, stream)
} finally {
IOUtils.closeQuietly(stream)
}
}
def plotReport(reportFile: File, pdfFile: File) {
val executor = new RScriptExecutor
executor.addLibrary(RScriptLibrary.GSALIB)
executor.addScript(new Resource(JOB_REPORT_QUEUE_SCRIPT, classOf[QJobReport]))
executor.addArgs(reportFile.getAbsolutePath, pdfFile.getAbsolutePath)
executor.exec()
}
def workAroundSameJobNames(func: QFunction):String = {
if ( seenNames.apply(func.jobName) ) {
seenCounter += 1
@ -132,45 +106,4 @@ object QJobReport {
func.jobName
}
}
/**
* Prints the JobLogging logs to a GATKReport. First splits up the
* logs by group, and for each group generates a GATKReportTable
*/
private def printJobLogging(logs: Seq[QFunction], stream: PrintStream) {
// create the report
val report: GATKReport = new GATKReport
// create a table for each group of logs
for ( (group, groupLogs) <- groupLogs(logs) ) {
val keys = logKeys(groupLogs)
report.addTable(group, "Job logs for " + group, keys.size)
val table: GATKReportTable = report.getTable(group)
// add the columns
keys.foreach(table.addColumn(_))
for (log <- groupLogs) {
for ( key <- keys )
table.set(log.getReportName, key, log.getReportFeature(key))
}
}
report.print(stream)
}
private def groupLogs(logs: Seq[QFunction]): Map[String, Seq[QFunction]] = {
logs.groupBy(_.getReportGroup)
}
private def logKeys(logs: Seq[QFunction]): Set[String] = {
// the keys should be the same for each log, but we will check that
val keys = Set[String](logs(0).getReportFeatureNames : _*)
for ( log <- logs )
if ( keys.sameElements(Set(log.getReportFeatureNames)) )
throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " +
"We found one with %s and another with %s").format(keys, log.getReportFeatureNames))
keys
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright (c) 2012, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util
import java.io.{PrintStream, File}
import org.broadinstitute.sting.utils.io.{Resource}
import org.broadinstitute.sting.queue.engine.{JobRunInfo, QGraph}
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.utils.R.{RScriptLibrary, RScriptExecutor}
import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport}
import org.broadinstitute.sting.utils.exceptions.UserException
import org.apache.commons.io.{FileUtils, IOUtils}
/**
* Writes out RunInfo to a GATKReport
*/
class QJobsReporter(val disabled: Boolean, val reportFile: File, val pdfFile: Option[File]) extends Logging {
private val JOB_REPORT_QUEUE_SCRIPT = "queueJobReport.R"
/**
* Write out a job report based on the finished jobs graph
* @param jobGraph
* @param enabledPlotting if true, we will plot the report as well with the JOB_REPORT_QUEUE_SCRIPT
*/
def write(jobGraph: QGraph, enabledPlotting: Boolean) {
if ( ! disabled ) {
logger.info("Writing JobLogging GATKReport to file " + reportFile)
printReport(jobGraph.getFunctionsAndStatus, reportFile)
if ( enabledPlotting )
pdfFile match {
case Some(file) =>
logger.info("Plotting JobLogging GATKReport to file " + file)
plotReport(reportFile, file)
case None =>
}
}
}
private def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport)
jobs foreach {case (qf, info) => qf.setRunInfo(info)}
val stream = new PrintStream(FileUtils.openOutputStream(dest))
try {
printJobLogging(jobs.keys.toSeq, stream)
} finally {
IOUtils.closeQuietly(stream)
}
}
private def plotReport(reportFile: File, pdfFile: File) {
val executor = new RScriptExecutor
executor.addLibrary(RScriptLibrary.GSALIB)
executor.addScript(new Resource(JOB_REPORT_QUEUE_SCRIPT, classOf[QJobReport]))
executor.addArgs(reportFile.getAbsolutePath, pdfFile.getAbsolutePath)
executor.exec()
}
/**
* Prints the JobLogging logs to a GATKReport. First splits up the
* logs by group, and for each group generates a GATKReportTable
*/
private def printJobLogging(logs: Seq[QFunction], stream: PrintStream) {
// create the report
val report: GATKReport = new GATKReport
// create a table for each group of logs
for ( (group, groupLogs) <- groupLogs(logs) ) {
val keys = logKeys(groupLogs)
report.addTable(group, "Job logs for " + group, keys.size)
val table: GATKReportTable = report.getTable(group)
// add the columns
keys.foreach(table.addColumn(_))
for (log <- groupLogs) {
for ( key <- keys )
table.set(log.getReportName, key, log.getReportFeature(key))
}
}
report.print(stream)
}
private def groupLogs(logs: Seq[QFunction]): Map[String, Seq[QFunction]] = {
logs.groupBy(_.getReportGroup)
}
private def logKeys(logs: Seq[QFunction]): Set[String] = {
// the keys should be the same for each log, but we will check that
val keys = Set[String](logs(0).getReportFeatureNames : _*)
for ( log <- logs )
if ( keys.sameElements(Set(log.getReportFeatureNames)) )
throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " +
"We found one with %s and another with %s").format(keys, log.getReportFeatureNames))
keys
}
}