diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index 85c3db699..25baaca4f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -1,8 +1,8 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.util.IOUtils import java.util.Date +import org.broadinstitute.sting.queue.util.{Logging, IOUtils} /** * Runs a function that executes in process and does not fork out an external process. @@ -11,12 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce private var runStatus: RunnerStatus.Value = _ def start() = { - runInfo.startTime = new Date() + getRunInfo.startTime = new Date() runStatus = RunnerStatus.RUNNING function.run() - runInfo.doneTime = new Date() + getRunInfo.doneTime = new Date() val content = "%s%nDone.".format(function.description) IOUtils.writeContents(function.jobOutputFile, content) runStatus = RunnerStatus.DONE diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 2316f3968..563a32486 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -25,6 +25,7 @@ package org.broadinstitute.sting.queue.engine import java.util.Date +import java.text.SimpleDateFormat /* * Copyright (c) 2011, The Broad Institute @@ -57,14 +58,20 @@ class JobRunInfo { var startTime: Date = _ var doneTime: Date = _ var memUsedInGb: Int = -1 - var hostName: String = "localhost" var status: RunnerStatus.Value = RunnerStatus.DONE def getStatus = status + def getStartTime = startTime def getDoneTime = doneTime + def getFormattedStartTime = formatTime(getStartTime) + def getFormattedDoneTime = formatTime(getDoneTime) + + val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS"); + private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" + def getMemoryUsedInGb = memUsedInGb - def getHostname = hostName + def isFilledIn = startTime != null def getRuntimeInMs: Long = { if ( getDoneTime != null && getStartTime != null ) @@ -74,7 +81,7 @@ class JobRunInfo { } override def toString: String = - "started %s ended %s runtime %s on host %s using %d Gb memory".format(getStartTime, getDoneTime, getRuntimeInMs, getHostname, getMemoryUsedInGb) + "started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb) } object JobRunInfo { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 13c841778..166596316 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -70,7 +70,7 @@ class QGraphSettings { var expandedDotFile: File = _ @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) - var jobReportFile: File = new File("queue_job_report.gatkreport.txt") + var jobReportFile: File = new File("/dev/stdout") @ArgumentCollection val qSettings = new QSettings diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index bb711344c..47f0d2c18 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -31,11 +31,12 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} -import com.sun.jna.ptr.IntByReference import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} -import com.sun.jna.{Structure, StringArray, NativeLong} import java.util.regex.Pattern import java.lang.StringBuffer +import java.util.Date +import com.sun.jna.{Pointer, Structure, StringArray, NativeLong} +import com.sun.jna.ptr.{PointerByReference, IntByReference} /** * Runs jobs on an LSF compute cluster. @@ -271,12 +272,21 @@ object Lsf706JobRunner extends Logging { logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo)) + def updateRunInfo() { + runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) + runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) + runner.getRunInfo.hostName = "unavailable" // TODO : exHosts + runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem + } + runner.updateStatus( if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { // Done successfully. + updateRunInfo() RunnerStatus.DONE } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { // Exited function that (probably) won't be retried. + updateRunInfo() RunnerStatus.FAILED } else { // Note that we still saw the job in the system. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala index 03f9d3315..4124f65a0 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -27,6 +27,7 @@ package org.broadinstitute.sting.queue.engine.shell import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util.ShellJob import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import java.util.Date /** * Runs jobs one at a time locally @@ -50,8 +51,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu // Allow advanced users to update the job. updateJobRun(job) + getRunInfo.startTime = new Date() updateStatus(RunnerStatus.RUNNING) job.run() + getRunInfo.doneTime = new Date() updateStatus(RunnerStatus.DONE) } diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index 882831f6b..be0c2a5fc 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -49,15 +49,15 @@ trait QJobReport extends QFunction { def getName: String = features.get("jobName").get private def addRunInfo(info: JobRunInfo) { + logger.info("info " + info) features = features ++ Map( "analysisName" -> this.analysisName, "jobName" -> this.jobName, "intermediate" -> this.isIntermediate, - "startTime" -> info.getStartTime, - "doneTime" -> info.getDoneTime, + "startTime" -> info.getFormattedStartTime, + "doneTime" -> info.getFormattedDoneTime, "memUsedInGb" -> info.getMemoryUsedInGb, - "runtime" -> info.getRuntimeInMs, - "hostName" -> info.getHostname).mapValues(_.toString) + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") } def setJobLogging(group: String) { @@ -71,7 +71,8 @@ trait QJobReport extends QFunction { } object QJobReport { - def printReport(jobs: Map[QFunction, JobRunInfo], dest: File) { + def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) { + val jobs = jobsRaw.filter(_._2.isFilledIn) val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList) jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get)) val stream = new PrintStream(new FileOutputStream(dest))