From ca5db821ce969c049cbebee338a1a99ac60eb8e0 Mon Sep 17 00:00:00 2001 From: kshakir Date: Wed, 6 Oct 2010 18:29:56 +0000 Subject: [PATCH] Added the ability to Queue to run scala functions inside the JVM. NOTE: Extend from InProcessFunction instead of CommandLineFunction to use this functionality. Queue now submits new LSF jobs only after previous functions have completed successfully. When the Queue process is shutdown (ex: via Control-C) sends a bkill command for any running jobs. Ported commands like creating directories and scatter/gather interval list to scala functions. Updates to LSF status tracking by porting the python to internally generated bash scripts. Temporarily disabled job name submission to LSF. Plus side is that the full command is now available in "bjobs -w". TODO: Put back jobName passing to LSF based on an option? Changed BaseTest to allow scala to access paths to references. Changed the extension generator to default the analysis name to the walker "name". git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4442 348d0f76-0448-11de-a6fe-93d51630548a --- .../gatk/GATKExtensionsGenerator.java | 5 +- .../org/broadinstitute/sting/BaseTest.java | 18 +- scala/qscript/kshakir/CleanBamFile.scala | 6 +- .../sting/queue/QCommandLine.scala | 24 +- .../sting/queue/QSettings.scala | 3 - .../queue/engine/DispatchJobRunner.scala | 52 +- .../sting/queue/engine/DryRunner.scala | 38 ++ .../sting/queue/engine/FunctionEdge.scala | 41 ++ .../sting/queue/engine/InProcessRunner.scala | 41 ++ .../sting/queue/engine/JobRunner.scala | 19 +- .../sting/queue/engine/LsfJobRunner.scala | 169 +++++-- .../MappingEdge.scala} | 5 +- .../sting/queue/engine/QEdge.scala | 23 + .../sting/queue/engine/QGraph.scala | 450 ++++++++++-------- .../sting/queue/engine/RunnerStatus.scala | 8 + .../sting/queue/engine/ShellJobRunner.scala | 36 +- .../gatk/ContigScatterFunction.scala | 2 +- .../gatk/IntervalScatterFunction.scala | 84 +++- .../queue/function/CommandLineFunction.scala | 347 +------------- .../queue/function/DispatchWaitFunction.scala | 15 - .../queue/function/InProcessFunction.scala | 12 + .../sting/queue/function/QFunction.scala | 303 +++++++++++- .../CleanupTempDirsFunction.scala | 14 +- .../CreateTempDirsFunction.scala | 18 +- .../scattergather/GatherFunction.scala | 4 +- .../scattergather/ScatterFunction.scala | 4 +- .../ScatterGatherableFunction.scala | 11 +- .../SimpleTextGatherFunction.scala | 50 +- .../sting/queue/util/CommandLineJob.scala | 2 +- .../sting/queue/util/IOUtils.scala | 38 +- .../sting/queue/util/JobExitException.scala | 11 + .../sting/queue/util/LsfJob.scala | 27 +- .../sting/queue/util/LsfKillJob.scala | 26 + .../sting/queue/util/ProcessController.scala | 10 +- .../sting/queue/util/ShellJob.scala | 7 +- .../sting/queue/util/IOUtilsUnitTest.scala | 14 + 36 files changed, 1162 insertions(+), 775 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala rename scala/src/org/broadinstitute/sting/queue/{function/MappingFunction.scala => engine/MappingEdge.scala} (63%) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala delete mode 100644 scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java index b4d96a124..a6893636e 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java @@ -116,8 +116,9 @@ public class GATKExtensionsGenerator extends CommandLineProgram { argumentFields.addAll(RodBindField.getRodArguments(walkerType, trackBuilder)); argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType)); - writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, - walkerName, String.format("analysis_type = \"%s\"%n", walkerName), argumentFields); + writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, walkerName, + String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName), + argumentFields); } } } diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 1da296e42..374ac0157 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -42,16 +42,16 @@ public abstract class BaseTest { /** our log, which we want to capture anything from org.broadinstitute.sting */ public static Logger logger = Logger.getRootLogger(); - protected static String hg18Reference = "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta"; - protected static String hg19Reference = "/seq/references/Homo_sapiens_assembly19/v0/Homo_sapiens_assembly19.fasta"; - protected static String b36KGReference = "/humgen/1kg/reference/human_b36_both.fasta"; - protected static String b37KGReference = "/humgen/1kg/reference/human_g1k_v37.fasta"; - protected static String GATKDataLocation = "/humgen/gsa-hpprojects/GATK/data/"; - protected static String validationDataLocation = GATKDataLocation + "Validation_Data/"; - protected static String evaluationDataLocation = GATKDataLocation + "Evaluation_Data/"; - protected static String comparisonDataLocation = GATKDataLocation + "Comparisons/"; + public static String hg18Reference = "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta"; + public static String hg19Reference = "/seq/references/Homo_sapiens_assembly19/v0/Homo_sapiens_assembly19.fasta"; + public static String b36KGReference = "/humgen/1kg/reference/human_b36_both.fasta"; + public static String b37KGReference = "/humgen/1kg/reference/human_g1k_v37.fasta"; + public static String GATKDataLocation = "/humgen/gsa-hpprojects/GATK/data/"; + public static String validationDataLocation = GATKDataLocation + "Validation_Data/"; + public static String evaluationDataLocation = GATKDataLocation + "Evaluation_Data/"; + public static String comparisonDataLocation = GATKDataLocation + "Comparisons/"; - protected static String testDir = "testdata/"; + public String testDir = "testdata/"; protected static boolean alreadySetup = false; diff --git a/scala/qscript/kshakir/CleanBamFile.scala b/scala/qscript/kshakir/CleanBamFile.scala index cf2f4c32e..77da34e1a 100644 --- a/scala/qscript/kshakir/CleanBamFile.scala +++ b/scala/qscript/kshakir/CleanBamFile.scala @@ -175,7 +175,11 @@ class CleanBamFile extends QScript { bamIndex.bamFile = fixedBam bamIndex.bamFileIndex = swapExt(fixedBam, "bam", "bam.bai") - val importer = new ImportSingleValueFunction + val importer = new ImportSingleValueFunction { + /** Files that this job should wait on before running. */ + @Input(doc="Explicit job dependencies", required=false) + var jobDependencies: List[File] = Nil + } importer.jobQueue = shortJobQueue importer.jarFile = firehoseImportJar importer.host = firehoseHost diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index a18a8ef69..12e24af96 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue import java.io.File import java.util.Arrays import org.broadinstitute.sting.queue.engine.QGraph -import org.broadinstitute.sting.queue.util.{Logging, ScalaCompoundArgumentTypeDescriptor} import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util.{ProcessController, Logging, ScalaCompoundArgumentTypeDescriptor} /** * Entry point of Queue. Compiles and runs QScripts passed in to the command line. @@ -17,9 +17,6 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false) private var bsubAllJobs = false - @Argument(fullName="bsub_wait_jobs", shortName="bsubWait", doc="Wait for bsub submitted jobs before exiting", required=false) - private var bsubWaitJobs = false - @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) private var run = false @@ -46,10 +43,10 @@ class QCommandLine extends CommandLineProgram with Logging { * functions, and then builds and runs a QGraph based on the dependencies. */ def execute = { + val qGraph = new QGraph qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs - qGraph.bsubWaitJobs = bsubWaitJobs qGraph.skipUpToDateJobs = skipUpToDate qGraph.dotFile = dotFile qGraph.expandedDotFile = expandedDotFile @@ -65,6 +62,14 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } + Runtime.getRuntime.addShutdownHook(new Thread { + /** Kills running processes as the JVM shuts down. */ + override def run = { + qGraph.shutdown() + ProcessController.shutdown() + } + }) + if ( ! getStatus ) { logger.info("Running generated graph") qGraph.run @@ -73,8 +78,13 @@ class QCommandLine extends CommandLineProgram with Logging { qGraph.checkStatus } - logger.info("Done") - 0 + if (qGraph.hasFailed) { + logger.info("Done with errors") + 1 + } else { + logger.info("Done") + 0 + } } /** diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala index b97bf176f..808a5ea81 100644 --- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -22,9 +22,6 @@ class QSettings { @Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false) var memoryLimit: Option[Int] = None - - @Argument(fullName="runJobsIfPrecedingFail", shortName="runIfFail", doc="If this flag is set then ALL jobs will run even if the previous jobs fail.", required=false) - var runJobsIfPrecedingFail = false } /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala index ec9d0d819..c07ff2ff2 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -1,7 +1,6 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.{CommandLineFunction, QFunction} -import scala.collection.immutable.ListSet +import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util.IOUtils import java.io.File @@ -9,62 +8,17 @@ import java.io.File * Dispatches jobs to a compute cluster. */ trait DispatchJobRunner extends JobRunner { - /** Type of the job. */ - type DispatchJobType - /** An internal cache of all the jobs that have run by command line function. */ - private var dispatchJobs = Map.empty[CommandLineFunction, DispatchJobType] - /** An internal list of functions that have no other dependencies. */ - private var waitJobsByGraph = Map.empty[QGraph, ListSet[DispatchJobType]] - - /** - * Adds the job to the internal cache of previous jobs and removes the previous jobs that - * the job was dependent on from the list of function that have no dependencies. - * @param function CommandLineFunction to add to the list. - * @param qGraph Current qGraph being iterated over. - * @param dispatchJob The job that is being added to the cache. - * @param previousJobs The previous jobs that the job was dependent one. - */ - protected def addJob(function: CommandLineFunction, qGraph: QGraph, - dispatchJob: DispatchJobType, previousJobs: Iterable[DispatchJobType]) = { - dispatchJobs += function -> dispatchJob - var waitJobs = getWaitJobs(qGraph) - for (previousJob <- previousJobs) - waitJobs -= previousJob - waitJobs += dispatchJob - waitJobsByGraph += qGraph -> waitJobs - } - - /** - * Walks up the graph looking for the previous LsfJobs. - * @param function Function to examine for a previous command line job. - * @param qGraph The graph that contains the jobs. - * @return A list of prior jobs. - */ - protected def previousJobs(function: CommandLineFunction, qGraph: QGraph) : List[DispatchJobType] = - qGraph.previousJobs(function).map(dispatchJobs(_)) - - /** - * Returns a set of jobs that have no following jobs in the graph. - * @param qGraph The graph that contains the jobs. - * @return ListSet[DispatchJobType] of previous jobs that have no dependent jobs. - */ - protected def getWaitJobs(qGraph: QGraph) = { - if (!waitJobsByGraph.contains(qGraph)) - waitJobsByGraph += qGraph -> ListSet.empty[DispatchJobType] - waitJobsByGraph(qGraph) - } - /** * Builds a command line that can be run to force an automount of the directories. * @param function Function to look jobDirectories. - * @return A "cd [&& cd ]" command. + * @return A "cd '' [&& cd '']" command. */ protected def mountCommand(function: CommandLineFunction) = { var dirs = Set.empty[File] for (dir <- function.jobDirectories) dirs += IOUtils.dirLevel(dir, 2) if (dirs.size > 0) - Some(dirs.mkString("cd ", " && cd ", "")) + Some(dirs.mkString("cd '", "' && cd '", "'")) else None } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala new file mode 100644 index 000000000..28d73c7fb --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala @@ -0,0 +1,38 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} + +/** + * Only logs the command to run. Doesn't actually run it. + */ +class DryRunner(function: QFunction) extends JobRunner with Logging { + /** + * Dry runs the function logging the command lines. + * @param function Command to run. + */ + // TODO: Why do we need the dry runner? Can we just use a dryRun() method to log per JobRunner? + def start() = { + function match { + case clf: CommandLineFunction => { + if (logger.isDebugEnabled) { + logger.debug(clf.commandDirectory + " > " + clf.commandLine) + } else { + logger.info(clf.commandLine) + } + logger.info("Output written to " + clf.jobOutputFile) + if (clf.jobErrorFile != null) { + logger.info("Errors written to " + clf.jobErrorFile) + } else { + if (logger.isDebugEnabled) + logger.info("Errors also written to " + clf.jobOutputFile) + } + } + case qFunction => { + logger.info(qFunction.description) + } + } + } + + def status = RunnerStatus.DONE +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala new file mode 100644 index 000000000..ce300b7a2 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -0,0 +1,41 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.QFunction + +/** + * An edge in the QGraph that runs a QFunction. + * The edge is created first to determine inter-node dependencies, + * and then the runner is specified later when the time comes to + * execute the function in the edge. + */ +class FunctionEdge(var function: QFunction) extends QEdge { + var runner: JobRunner =_ + + private var currentStatus = { + val doneOutputs = function.doneOutputs + val failOutputs = function.failOutputs + if (failOutputs.exists(_.exists)) + RunnerStatus.FAILED + else if (doneOutputs.forall(_.exists)) + RunnerStatus.DONE + else + RunnerStatus.PENDING + } + + def status = { + if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) + if (runner != null) + currentStatus = runner.status + currentStatus + } + + def resetPending() = { + currentStatus = RunnerStatus.PENDING + function.doneOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete) + } + + def inputs = function.inputs + def outputs = function.outputs + override def dotString = function.dotString +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala new file mode 100644 index 000000000..22c65bf2e --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -0,0 +1,41 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.queue.util.Logging + +/** + * Runs a function that executes in process and does not fork out an external process. + */ +class InProcessRunner(function: InProcessFunction) extends JobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + + def start() = { + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + function.description) + } else { + logger.info("Starting: " + function.description) + } + + function.doneOutputs.foreach(_.delete) + function.failOutputs.foreach(_.delete) + runStatus = RunnerStatus.RUNNING + try { + function.run() + function.doneOutputs.foreach(_.createNewFile) + runStatus = RunnerStatus.DONE + logger.info("Done: " + function.description) + } catch { + case e => { + runStatus = RunnerStatus.FAILED + try { + function.failOutputs.foreach(_.createNewFile) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.description, e) + } + } + } + + def status = runStatus +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 93ff4cc1c..74625992f 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -1,16 +1,21 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.CommandLineFunction - /** * Base interface for job runners. */ trait JobRunner { /** - * Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction - * in which case it waits for all other terminal functions to complete. + * Runs the function. + * After the function returns the status of the function should + * be RUNNING, FAILED, or DONE. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) -} \ No newline at end of file + def start() + + /** + * Returns the current run status. + * Must only be called AFTER start(). + * @return RUNNING, DONE, or FAILED. + */ + def status: RunnerStatus.Value +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 61761e82c..f8262bd46 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -1,22 +1,39 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.{CommandLineFunction, DispatchWaitFunction} +import java.io.File import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging} +import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs on an LSF compute cluster. */ -class LsfJobRunner extends DispatchJobRunner with Logging { - type DispatchJobType = LsfJob +class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + + var job: LsfJob = _ + + /** A file to look for to validate that the function ran to completion. */ + private var jobStatusPath: String = _ + + /** A temporary job done file to let Queue know that the process ran successfully. */ + private lazy val jobDoneFile = new File(jobStatusPath + ".done") + + /** A temporary job done file to let Queue know that the process exited with an error. */ + private lazy val jobFailFile = new File(jobStatusPath + ".fail") + + /** A generated pre-exec shell script. */ + private var preExec: File = _ + + /** A generated post-exec shell script. */ + private var postExec: File = _ /** * Dispatches the function on the LSF cluster. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) = { - val job = new LsfJob - job.name = function.jobName + def start() = { + job = new LsfJob + // job.name = function.jobName TODO: Make setting the job name optional. job.outputFile = function.jobOutputFile job.errorFile = function.jobErrorFile job.project = function.jobProject @@ -32,56 +49,114 @@ class LsfJobRunner extends DispatchJobRunner with Logging { if (function.memoryLimit.isDefined) job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - if ( ! function.commandLine.contains("mkdir")) // wild hack -- ignore mkdirs ?? - job.postExecCommand = function.doneOutputs.foldLeft("python /humgen/gsa-scr1/chartl/sting/python/lsf_post_touch.py ")((b,a) => b + a.getAbsolutePath+" ") - // hacky trailing space, so sue me -- CH + preExec = writePreExec(function) + job.preExecCommand = "sh " + preExec - val previous: Iterable[LsfJob] = - if (function.isInstanceOf[DispatchWaitFunction]) { - job.waitForCompletion = true - getWaitJobs(qGraph) - } else { - previousJobs(function, qGraph) - } - - mountCommand(function) match { - case Some(command) => job.preExecCommand = command - case None => /* ignore */ - } - - if (previous.size > 0) - job.extraBsubArgs ++= List("-w", dependencyExpression(previous, - function.jobRunOnlyIfPreviousSucceed, qGraph.dryRun)) - - addJob(function, qGraph, job, previous) + postExec = writePostExec(function) + job.postExecCommand = "sh " + postExec if (logger.isDebugEnabled) { - logger.debug(function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) + logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) } else { - logger.info(job.bsubCommand.mkString(" ")) + logger.info("Starting: " + job.bsubCommand.mkString(" ")) } - if (!qGraph.dryRun) - job.run + runStatus = RunnerStatus.RUNNING + job.run() + jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString + logger.info("Submitted LSF job id: " + job.bsubJobId) } /** - * Returns the dependency expression for the prior jobs. - * @param jobs Previous jobs this job is dependent on. - * @param runOnSuccess Run the job only if the previous jobs succeed. - * @param dryRun If the current run is a dry run. - * @return The dependency expression for the prior jobs. + * Updates and returns the status by looking for job status files. + * After the job status files are detected they are cleaned up from + * the file system and the status is cached. + * + * Note, these temporary job status files are currently different from the + * .done files used to determine if a file has been created successfully. */ - private def dependencyExpression(jobs: Iterable[LsfJob], - runOnSuccess: Boolean, dryRun: Boolean) = { - val jobIds = if (dryRun) - jobs.toSet[LsfJob].map("\"" + _.name + "\"") - else - jobs.toSet[LsfJob].map(_.bsubJobId) + def status = { + if (logger.isDebugEnabled) { + logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists)) + logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists)) + } - if (runOnSuccess) - jobIds.mkString("done(", ") && done(", ")") - else - jobIds.mkString("ended(", ") && ended(", ")") + if (jobFailFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.FAILED + logger.info("Error: " + job.bsubCommand.mkString(" ")) + tailError() + } else if (jobDoneFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.DONE + logger.info("Done: " + job.bsubCommand.mkString(" ")) + } + + runStatus + } + + /** + * Removes all temporary files used for this LSF job. + */ + private def removeTemporaryFiles() = { + preExec.delete() + postExec.delete() + jobDoneFile.delete() + jobFailFile.delete() + } + + /** + * Outputs the last lines of the error logs. + */ + private def tailError() = { + val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile + val tailLines = IOUtils.tail(errorFile, 100) + val nl = "%n".format() + logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) + } + + /** + * Writes a pre-exec file to cleanup any status files and + * optionally mount any automount directories on the node. + * @return the file path to the pre-exec. + */ + private def writePreExec(function: CommandLineFunction): File = { + val preExec = new StringBuilder + + preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(function.commandDirectory)) + function.doneOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) + preExec.append("rm -f '%s/'.$LSB_JOBID.fail%n".format(function.commandDirectory)) + function.failOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) + + mountCommand(function).foreach(command => + preExec.append("%s%n".format(command))) + + IOUtils.writeTempFile(preExec.toString, ".preExec", "", function.commandDirectory) + } + + /** + * Writes a post-exec file to create the status files. + * @return the file path to the post-exec. + */ + private def writePostExec(function: CommandLineFunction): File = { + val postExec = new StringBuilder + + val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString + val touchFail = function.failOutputs.map("touch '%s'%n".format(_)).mkString + + postExec.append("""| + |if [ "${LSB_JOBPEND:-unset}" != "unset" ]; then + | exit 0 + |fi + | + |JOB_STAT_ROOT='%s/'.$LSB_JOBID + |if [ "$LSB_JOBEXIT_STAT" == "0" ]; then + |%stouch "$JOB_STAT_ROOT".done + |else + |%stouch "$JOB_STAT_ROOT".fail + |fi + |""".stripMargin.format(function.commandDirectory, touchDone, touchFail)) + + IOUtils.writeTempFile(postExec.toString, ".postExec", "", function.commandDirectory) } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala similarity index 63% rename from scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala rename to scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala index c43567809..6f42a705b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.queue.function +package org.broadinstitute.sting.queue.engine import java.io.File @@ -6,10 +6,11 @@ import java.io.File * Utility class to map a set of inputs to set of outputs. * The QGraph uses this function internally to map between user defined functions. */ -class MappingFunction(val inputs: Set[File], val outputs: Set[File]) extends QFunction { +class MappingEdge(val inputs: Set[File], val outputs: Set[File]) extends QEdge { /** * For debugging purposes returns . * @return */ override def toString = "" + override def dotString = "" } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala new file mode 100644 index 000000000..265d60a74 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala @@ -0,0 +1,23 @@ +package org.broadinstitute.sting.queue.engine + +import java.io.File + +/** + * An edge in the QGraph + */ +trait QEdge { + /** + * Set of inputs for this function. + */ + def inputs: Set[File] + + /** + * Set of outputs for this function. + */ + def outputs: Set[File] + + /** + * The function description in .dot files + */ + def dotString = "" +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 1756ef2c8..43923746b 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -4,25 +4,22 @@ import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.graph.SimpleDirectedGraph import scala.collection.JavaConversions import scala.collection.JavaConversions._ -import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction -import org.broadinstitute.sting.queue.util.Logging import org.jgrapht.alg.CycleDetector import org.jgrapht.EdgeFactory import org.jgrapht.ext.DOTExporter import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.{QSettings, QException} -import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction} -import collection.mutable.HashMap +import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, ScatterGatherableFunction} +import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} +import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging} /** * The internal dependency tracker between sets of function input and output files. */ class QGraph extends Logging { - var status = false var dryRun = true var bsubAllJobs = false - var bsubWaitJobs = false var skipUpToDateJobs = false var dotFile: File = _ var expandedDotFile: File = _ @@ -34,14 +31,48 @@ class QGraph extends Logging { * Adds a QScript created CommandLineFunction to the graph. * @param command Function to add to the graph. */ - def add(command: CommandLineFunction) { - addFunction(command) + def add(command: QFunction) { + try { + command.qSettings = this.qSettings + command.freeze + addEdge(new FunctionEdge(command)) + } catch { + case e: Exception => + throw new QException("Error adding function: " + command, e) + } } + + private def scatterGatherable(edge: FunctionEdge) = { + edge.function match { + case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true + case _ => false + } + } + + /** * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ def run = { + val numMissingValues = fillGraph + val isReady = numMissingValues == 0 + + if (isReady || this.dryRun) { + runJobs() + } + + if (numMissingValues > 0) { + logger.error("Total missing values: " + numMissingValues) + } + + if (isReady && this.dryRun) { + logger.info("Dry run completed successfully!") + logger.info("Re-run with \"-run\" to execute the functions.") + } + } + + private def fillGraph = { fill if (dotFile != null) renderToDot(dotFile) @@ -49,15 +80,15 @@ class QGraph extends Logging { if (numMissingValues == 0 && bsubAllJobs) { logger.debug("Scatter gathering jobs.") - var scatterGathers = List.empty[ScatterGatherableFunction] + var scatterGathers = List.empty[FunctionEdge] loop({ - case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => - scatterGathers :+= scatterGather + case edge: FunctionEdge if (scatterGatherable(edge)) => + scatterGathers :+= edge }) - var addedFunctions = List.empty[CommandLineFunction] + var addedFunctions = List.empty[QFunction] for (scatterGather <- scatterGathers) { - val functions = scatterGather.generateFunctions() + val functions = scatterGather.function.asInstanceOf[ScatterGatherableFunction].generateFunctions() if (this.debugMode) logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) addedFunctions ++= functions @@ -65,7 +96,7 @@ class QGraph extends Logging { this.jobGraph.removeAllEdges(scatterGathers) prune - addedFunctions.foreach(this.addFunction(_)) + addedFunctions.foreach(this.add(_)) fill val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile @@ -74,46 +105,33 @@ class QGraph extends Logging { numMissingValues = validate } - val isReady = numMissingValues == 0 - - if ( (isReady || this.dryRun) && ! this.status ) - runJobs - - if (numMissingValues > 0) { - logger.error("Total missing values: " + numMissingValues) - } - - if (isReady && this.dryRun && ! this.status) { - logger.info("Dry run completed successfully!") - logger.info("Re-run with \"-run\" to execute the functions.") - } + numMissingValues } def checkStatus = { // build up the full DAG with scatter-gather jobs - this.status = true - run - runStatus + fillGraph + logStatus } /** - * Walks up the graph looking for the previous LsfJobs. + * Walks up the graph looking for the previous command line edges. * @param function Function to examine for a previous command line job. * @param qGraph The graph that contains the jobs. * @return A list of prior jobs. */ - def previousJobs(function: QFunction) : List[CommandLineFunction] = { - var previous = List.empty[CommandLineFunction] + def previousFunctions(edge: QEdge) : List[FunctionEdge] = { + var previous = List.empty[FunctionEdge] - val source = this.jobGraph.getEdgeSource(function) + val source = this.jobGraph.getEdgeSource(edge) for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { incomingEdge match { // Stop recursing when we find a job along the edge and return its job id - case commandLineFunction: CommandLineFunction => previous :+= commandLineFunction + case functionEdge: FunctionEdge => previous :+= functionEdge - // For any other type of edge find the LSF jobs preceding the edge - case qFunction: QFunction => previous ++= previousJobs(qFunction) + // For any other type of edge find the jobs preceding the edge + case edge: QEdge => previous ++= previousFunctions(edge) } } previous @@ -125,8 +143,6 @@ class QGraph extends Logging { */ private def fill = { fillIn - if (skipUpToDateJobs) - removeUpToDate prune } @@ -136,34 +152,34 @@ class QGraph extends Logging { private def fillIn = { // clone since edgeSet is backed by the graph JavaConversions.asSet(jobGraph.edgeSet).clone.foreach { - case cmd: CommandLineFunction => { + case cmd: FunctionEdge => { addCollectionOutputs(cmd.outputs) addCollectionInputs(cmd.inputs) } - case map: MappingFunction => /* do nothing for mapping functions */ + case map: MappingEdge => /* do nothing for mapping edges */ } } - /** - * Removes functions that are up to date. - */ - private def removeUpToDate = { - var upToDateJobs = Set.empty[CommandLineFunction] + private def getReadyJobs = { + var readyJobs = List.empty[FunctionEdge] loop({ - case f if (upToDate(f, upToDateJobs)) => { - logger.info("Skipping command because it is up to date: %n%s".format(f.commandLine)) - upToDateJobs += f + case f: FunctionEdge => { + if (this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING) + readyJobs :+= f } }) - for (upToDateJob <- upToDateJobs) - jobGraph.removeEdge(upToDateJob) + readyJobs } - /** - * Returns true if the all previous functions in the graph are up to date, and the function is up to date. - */ - private def upToDate(commandLineFunction: CommandLineFunction, upToDateJobs: Set[CommandLineFunction]) = { - this.previousJobs(commandLineFunction).forall(upToDateJobs.contains(_)) && commandLineFunction.upToDate + private def getRunningJobs = { + var runningJobs = List.empty[FunctionEdge] + loop({ + case f: FunctionEdge => { + if (f.status == RunnerStatus.RUNNING) + runningJobs :+= f + } + }) + runningJobs } /** @@ -190,15 +206,15 @@ class QGraph extends Logging { private def validate = { var numMissingValues = 0 JavaConversions.asSet(jobGraph.edgeSet).foreach { - case cmd: CommandLineFunction => - val missingFieldValues = cmd.missingFields + case cmd: FunctionEdge => + val missingFieldValues = cmd.function.missingFields if (missingFieldValues.size > 0) { numMissingValues += missingFieldValues.size - logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine)) + logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description)) for (missing <- missingFieldValues) logger.error(" " + missing) } - case map: MappingFunction => /* do nothing for mapping functions */ + case map: MappingEdge => /* do nothing for mapping edges */ } val detector = new CycleDetector(jobGraph) @@ -215,157 +231,170 @@ class QGraph extends Logging { /** * Runs the jobs by traversing the graph. */ - private def runJobs = { - val runner = if (bsubAllJobs) new LsfJobRunner else new ShellJobRunner + private def runJobs() = { + loop({ case f: FunctionEdge => { + val isDone = this.skipUpToDateJobs && + f.status == RunnerStatus.DONE && + this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) + if (!isDone) + f.resetPending() + }}) - val numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size + var readyJobs = getReadyJobs + var runningJobs = Set.empty[FunctionEdge] + while (readyJobs.size + runningJobs.size > 0) { + var exitedJobs = List.empty[FunctionEdge] + runningJobs.foreach(runner => { + if (runner.status != RunnerStatus.RUNNING) + exitedJobs :+= runner + }) + exitedJobs.foreach(runner => runningJobs -= runner) - logger.info("Number of jobs: %s".format(numJobs)) - if (this.debugMode) { - val numNodes = jobGraph.vertexSet.size - logger.debug("Number of nodes: %s".format(numNodes)) - } - var numNodes = 0 - - loop( - edgeFunction = { case f => runner.run(f, this) }, - nodeFunction = { - case node => { - if (this.debugMode) - logger.debug("Visiting: " + node) - numNodes += 1 + readyJobs.foreach(f => { + f.runner = newRunner(f.function) + f.runner.start() + if (f.status == RunnerStatus.RUNNING) { + runningJobs += f } }) - if (this.debugMode) - logger.debug("Done walking %s nodes.".format(numNodes)) - - if (bsubAllJobs && bsubWaitJobs) { - logger.info("Waiting for jobs to complete.") - val wait = new DispatchWaitFunction - wait.qSettings = this.qSettings - wait.freeze - runner.run(wait, this) + if (readyJobs.size == 0 && runningJobs.size > 0) + Thread.sleep(30000L) + readyJobs = getReadyJobs } } + private def newRunner(f: QFunction) = { + if (this.dryRun) + new DryRunner(f) + else { + f match { + case cmd: CommandLineFunction => + if (this.bsubAllJobs) + new LsfJobRunner(cmd) + else + new ShellJobRunner(cmd) + case inProc: InProcessFunction => + new InProcessRunner(inProc) + case _ => + throw new QException("Unexpected function: " + f) + } + } + } + + /** + * Tracks analysis status. + */ + private class AnalysisStatus(val analysisName: String) { + var status = RunnerStatus.PENDING + var scatter = new ScatterGatherStatus + var gather = new ScatterGatherStatus + } + + /** + * Tracks scatter gather status. + */ + private class ScatterGatherStatus { + var total = 0 + var done = 0 + var failed = 0 + } + /** * Gets job statuses by traversing the graph and looking for status-related files */ - private def runStatus = { - var statuses: HashMap[String,HashMap[String,Int]] = new HashMap[String,HashMap[String,Int]] - loop( - edgeFunction = { case edgeCLF => { - if ( edgeCLF.analysisName != null && ! edgeCLF.outputs.forall(file => file.getName.endsWith(".out") || file.getName.endsWith(".err") )) { - if ( ! statuses.keySet.contains(edgeCLF.analysisName) ) { - statuses.put(edgeCLF.analysisName,emptyStatusMap) - } - updateMap(statuses(edgeCLF.analysisName),edgeCLF) - } - } - }) - formatStatus(statuses) - } + private def logStatus = { + var statuses = Map.empty[String, AnalysisStatus] + loop({ + case edgeCLF: FunctionEdge if (edgeCLF.function.analysisName != null) => + updateStatus(statuses.get(edgeCLF.function.analysisName) match { + case Some(status) => status + case None => + val status = new AnalysisStatus(edgeCLF.function.analysisName) + statuses += edgeCLF.function.analysisName -> status + status + }, edgeCLF) + }) - /** - * Creates an empty map with keys for status updates, todo -- make this nicer somehow - */ - private def emptyStatusMap = { - var sMap = new HashMap[String,Int] - sMap.put("status",-1) - sMap.put("sgTotal",0) - sMap.put("sgDone",0) - sMap.put("sgRunning",0) - sMap.put("sgFailed",0) - // note -- pending = total - done - run - failed - sMap + statuses.values.toList.sortBy(_.analysisName).foreach(status => { + if (status.scatter.total + status.gather.total > 0) { + var sgStatus = RunnerStatus.PENDING + if (status.scatter.failed + status.gather.failed > 0) + sgStatus = RunnerStatus.FAILED + else if (status.scatter.done + status.gather.done == status.scatter.total + status.gather.total) + sgStatus = RunnerStatus.DONE + else if (status.scatter.done + status.gather.done > 0) + sgStatus = RunnerStatus.RUNNING + status.status = sgStatus + } + + var info = status.analysisName + ": [" + status.status + "]" + if (status.scatter.total + status.gather.total > 1) { + info += formatSGStatus(status.scatter, "s") + info += formatSGStatus(status.gather, "g") + } + logger.info(info) + }) } /** * Updates a status map with scatter/gather status information (e.g. counts) */ - private def updateMap(stats: HashMap[String,Int], clf: CommandLineFunction) = { - if ( clf.isGather ) { - logger.debug(clf.analysisName+": "+clf.doneOutputs.mkString(", ")) - if ( clf.isDone ) { - stats("status") = 1 - } else { - stats("status") = 0 - } + private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = { + if (edge.function.isInstanceOf[GatherFunction]) { + updateSGStatus(stats.gather, edge) + } else if (edge.function.isInstanceOf[ScatterGatherableFunction]) { + updateSGStatus(stats.scatter, edge) } else { - stats("sgTotal") = (stats("sgTotal") + 1) - if ( clf.isDone ) { - stats("sgDone") = (stats("sgDone") + 1) + stats.status = edge.status + } + } + + private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = { + stats.total += 1 + edge.status match { + case RunnerStatus.DONE => { + stats.done += 1 } + case RunnerStatus.FAILED => { + stats.failed += 1 + } + /* can't tell the difference between pending and running right now! */ + case RunnerStatus.PENDING => + case RunnerStatus.RUNNING => } } /** - * Formats a complete status map (analysis name --> map {string, int}) into nice strings + * Formats a status into nice strings */ - private def formatStatus(stats: HashMap[String,HashMap[String,Int]]) = { - stats.foreach{ case(analysisName, status) => { - var infoStr = analysisName - val doneInt = status("status") - if ( doneInt == 1 ) { - infoStr += " [DONE]" - } else if ( doneInt == 0 ) { - infoStr += " [NOT DONE]" - } else { - infoStr += " [UNKNOWN]" - } - - if ( status("sgTotal") > 0 ) { - val sgTot = status("sgTotal") - val sgDone = status("sgDone") - val sgRun = status("sgRunning") - val sgFailed = status("sgFailed") - val sgPend = (sgTot - sgDone - sgRun - sgFailed) - infoStr += " %dt/%dd/%dr/%dp/%df".format(sgTot,sgDone,sgRun,sgPend,sgFailed) - } - - logger.info(infoStr) - } - - } + private def formatSGStatus(stats: ScatterGatherStatus, prefix: String) = { + " %s:%dt/%dd/%df".format( + prefix, stats.total, stats.done, stats.failed) } /** * Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction. * @return A new graph */ - private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] { - def createEdge(input: QNode, output: QNode) = new MappingFunction(input.files, output.files)}) + private def newGraph = new SimpleDirectedGraph[QNode, QEdge](new EdgeFactory[QNode, QEdge] { + def createEdge(input: QNode, output: QNode) = new MappingEdge(input.files, output.files)}) - /** - * Adds a generic QFunction to the graph. - * @param f Generic QFunction to add to the graph. - */ - private def addFunction(f: QFunction): Unit = { - try { - f match { - case cmd: CommandLineFunction => cmd.qSettings = this.qSettings - case map: MappingFunction => /* do nothing for mapping functions */ - } - f.freeze - val inputs = QNode(f.inputs) - val outputs = QNode(f.outputs) - val newSource = jobGraph.addVertex(inputs) - val newTarget = jobGraph.addVertex(outputs) - val removedEdges = jobGraph.removeAllEdges(inputs, outputs) - val added = jobGraph.addEdge(inputs, outputs, f) - if (this.debugMode) { - logger.debug("Mapped from: " + inputs) - logger.debug("Mapped to: " + outputs) - logger.debug("Mapped via: " + f) - logger.debug("Removed edges: " + removedEdges) - logger.debug("New source?: " + newSource) - logger.debug("New target?: " + newTarget) - logger.debug("") - } - } catch { - case e: Exception => - throw new QException("Error adding function: " + f, e) + private def addEdge(edge: QEdge) = { + val inputs = QNode(edge.inputs) + val outputs = QNode(edge.outputs) + val newSource = jobGraph.addVertex(inputs) + val newTarget = jobGraph.addVertex(outputs) + val removedEdges = jobGraph.removeAllEdges(inputs, outputs) + val added = jobGraph.addEdge(inputs, outputs, edge) + if (this.debugMode) { + logger.debug("Mapped from: " + inputs) + logger.debug("Mapped to: " + outputs) + logger.debug("Mapped via: " + edge) + logger.debug("Removed edges: " + removedEdges) + logger.debug("New source?: " + newSource) + logger.debug("New target?: " + newTarget) + logger.debug("") } } @@ -399,25 +428,17 @@ class QGraph extends Logging { jobGraph.getEdge(QNode(input), QNode(output)) != null || jobGraph.getEdge(QNode(output), QNode(input)) != null if (!hasEdge) - addFunction(new MappingFunction(input, output)) + addEdge(new MappingEdge(input, output)) } - /** - * Returns true if the edge is an internal mapping edge. - * @param edge Edge to check. - * @return true if the edge is an internal mapping edge. - */ - private def isMappingEdge(edge: QFunction) = - edge.isInstanceOf[MappingFunction] - /** * Returns true if the edge is mapping edge that is not needed because it does * not direct input or output from a user generated CommandLineFunction. * @param edge Edge to check. * @return true if the edge is not needed in the graph. */ - private def isFiller(edge: QFunction) = { - if (isMappingEdge(edge)) { + private def isFiller(edge: QEdge) = { + if (edge.isInstanceOf[MappingEdge]) { if (jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0) true else if (jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0) @@ -428,7 +449,7 @@ class QGraph extends Logging { /** * Returns true if the node is not connected to any edges. - * @param node Node (set of files) to check + * @param node Node (set of files) to check. * @return true if this set of files is not needed in the graph. */ private def isOrphan(node: QNode) = @@ -439,12 +460,12 @@ class QGraph extends Logging { * @param edgeFunction Optional function to run for each edge visited. * @param nodeFunction Optional function to run for each node visited. */ - private def loop(edgeFunction: PartialFunction[CommandLineFunction, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { + private def loop(edgeFunction: PartialFunction[QEdge, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { val iterator = new TopologicalOrderIterator(this.jobGraph) - iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { - override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { - case cmd: CommandLineFunction => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) - case map: MappingFunction => /* do nothing for mapping functions */ + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = event.getEdge match { + case cmd: FunctionEdge => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) + case map: MappingEdge => /* do nothing for mapping functions */ } }) iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node)) @@ -460,8 +481,8 @@ class QGraph extends Logging { // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a // todo -- visualizeString() command, or something that shows inputs / outputs - val ve = new org.jgrapht.ext.EdgeNameProvider[QFunction] { - def getEdgeName( function: QFunction ) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") + val ve = new org.jgrapht.ext.EdgeNameProvider[QEdge] { + def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") } //val iterator = new TopologicalOrderIterator(qGraph.jobGraph) @@ -469,4 +490,35 @@ class QGraph extends Logging { out.close } + + /** + * Returns true if any of the jobs in the graph have a status of failed. + * @return true if any of the jobs in the graph have a status of failed. + */ + def hasFailed = { + this.jobGraph.edgeSet.exists(edge => { + edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED + }) + } + + /** + * Kills any forked jobs still running. + */ + def shutdown() { + val lsfJobs = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner].job) + if (lsfJobs.size > 0) { + for (jobs <- lsfJobs.grouped(10)) { + try { + val bkill = new LsfKillJob(jobs) + logger.info(bkill.command) + bkill.run() + } catch { + case jee: JobExitException => + logger.error("Unable to kill all jobs:%n%s".format(jee.getMessage)) + case e => + logger.error("Unable to kill jobs.", e) + } + } + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala new file mode 100644 index 000000000..78c834616 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala @@ -0,0 +1,8 @@ +package org.broadinstitute.sting.queue.engine + +object RunnerStatus extends Enumeration { + val PENDING = Value("pending") + val RUNNING = Value("running") + val FAILED = Value("failed") + val DONE = Value("done") +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index e2088c6b4..88bc24a63 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -1,18 +1,19 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.util.{Logging, ShellJob} +import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob} import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs one at a time locally */ -class ShellJobRunner extends JobRunner with Logging { +class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + /** * Runs the function on the local shell. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) = { + def start() = { val job = new ShellJob job.command = function.commandLine job.workingDir = function.commandDirectory @@ -20,10 +21,11 @@ class ShellJobRunner extends JobRunner with Logging { job.errorFile = function.jobErrorFile if (logger.isDebugEnabled) { - logger.debug(function.commandDirectory + " > " + function.commandLine) + logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine) } else { - logger.info(function.commandLine) + logger.info("Starting: " + function.commandLine) } + logger.info("Output written to " + function.jobOutputFile) if (function.jobErrorFile != null) { logger.info("Errors written to " + function.jobErrorFile) @@ -32,7 +34,25 @@ class ShellJobRunner extends JobRunner with Logging { logger.info("Errors also written to " + function.jobOutputFile) } - if (!qGraph.dryRun) - job.run + function.doneOutputs.foreach(_.delete) + function.failOutputs.foreach(_.delete) + runStatus = RunnerStatus.RUNNING + try { + job.run() + function.doneOutputs.foreach(_.createNewFile) + runStatus = RunnerStatus.DONE + logger.info("Done: " + function.commandLine) + } catch { + case e: JobExitException => + runStatus = RunnerStatus.FAILED + try { + function.failOutputs.foreach(_.createNewFile) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.commandLine, e) + } } + + def status = runStatus } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala index 01de9c8f9..40beec1fb 100755 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala @@ -4,5 +4,5 @@ package org.broadinstitute.sting.queue.extensions.gatk * Splits intervals by contig instead of evenly. */ class ContigScatterFunction extends IntervalScatterFunction { - splitIntervalsScript = "splitIntervalsByContig.py" + splitByContig = true } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index dfb94d48f..ff8cc4c7c 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -1,16 +1,82 @@ package org.broadinstitute.sting.queue.extensions.gatk -import org.broadinstitute.sting.commandline.Argument -import org.broadinstitute.sting.queue.function.scattergather.ScatterFunction +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.commandline.ArgumentSource +import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, ScatterFunction} +import org.broadinstitute.sting.utils.interval.IntervalUtils +import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceDataSource +import java.io.File +import net.sf.picard.util.IntervalList +import net.sf.samtools.SAMFileHeader +import collection.JavaConversions._ +import org.broadinstitute.sting.utils.{GenomeLoc, GenomeLocSortedSet, GenomeLocParser} /** - * An interval scatter function that allows the script to be swapped out. - * The syntax of the script must be: - * [.. ] + * An interval scatter function. */ -class IntervalScatterFunction extends ScatterFunction { - @Argument(doc="Interval split script") - var splitIntervalsScript: String = "splitIntervals.sh" +class IntervalScatterFunction extends ScatterFunction with InProcessFunction { + var splitByContig = false - def commandLine = "%s %s%s".format(splitIntervalsScript, originalInput, repeat(" ", scatterParts)) + private var referenceSequence: File = _ + private var intervals: List[String] = Nil + + override def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = { + val gatk = originalFunction.asInstanceOf[CommandLineGATK] + referenceSequence = gatk.reference_sequence + intervals = gatk.intervalsString + if (gatk.intervals != null) + intervals ::= gatk.intervals.toString + } + + def run() = { + val referenceSource = new ReferenceDataSource(referenceSequence) + GenomeLocParser.setupRefContigOrdering(referenceSource.getReference); + val locs = { + // TODO: Abstract genome analysis engine has richer logic for parsing. We need to use it! + if (intervals.size == 0) { + GenomeLocSortedSet.createSetFromSequenceDictionary(referenceSource.getReference.getSequenceDictionary).toList + } else { + IntervalUtils.parseIntervalArguments(intervals, false) + } + } + + val fileHeader = new SAMFileHeader + fileHeader.setSequenceDictionary(referenceSource.getReference.getSequenceDictionary) + + var intervalList: IntervalList = null + var fileIndex = -1 + var locIndex = 0 + + if (splitByContig) { + var contig: String = null + for (loc <- locs) { + if (contig != loc.getContig && (fileIndex + 1) < scatterParts.size) { + if (fileIndex >= 0) + intervalList.write(scatterParts(fileIndex)) + fileIndex += 1 + intervalList = new IntervalList(fileHeader) + } + locIndex += 1 + intervalList.add(toInterval(loc, locIndex)) + } + intervalList.write(scatterParts(fileIndex)) + } else { + var locsPerFile = locs.size / this.scatterParts.size + if (locs.size % this.scatterParts.size != 0) locsPerFile += 1 + for (loc <- locs) { + if (locIndex % locsPerFile == 0) { + if (fileIndex >= 0) + intervalList.write(scatterParts(fileIndex)) + fileIndex += 1 + intervalList = new IntervalList(fileHeader) + } + locIndex += 1 + intervalList.add(toInterval(loc, locIndex)) + } + intervalList.write(scatterParts(fileIndex)) + } + } + + private def toInterval(loc: GenomeLoc, locIndex: Int) = + new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, true, "interval_" + locIndex) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 31d0d5541..07d35c1e1 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -1,12 +1,10 @@ package org.broadinstitute.sting.queue.function import org.broadinstitute.sting.queue.util._ -import java.lang.annotation.Annotation import org.broadinstitute.sting.commandline._ import java.io.File import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather} -import org.broadinstitute.sting.queue.{QSettings, QException} /** * A command line that will be run in a pipeline. @@ -14,24 +12,12 @@ import org.broadinstitute.sting.queue.{QSettings, QException} trait CommandLineFunction extends QFunction with Logging { def commandLine: String - /** Analysis function name */ - var analysisName: String = _ - - /** Set to true if this is a standalone or gather function */ - var isGather: Boolean = true - - /** Default settings */ - var qSettings: QSettings = _ - /** Upper memory limit */ var memoryLimit: Option[Int] = None /** Whether a job is restartable */ var jobRestartable = true - /** Directory to run the command in. */ - var commandDirectory: File = IOUtils.CURRENT_DIR - /** Prefix for automatic job name creation */ var jobNamePrefix: String = _ @@ -45,14 +31,7 @@ trait CommandLineFunction extends QFunction with Logging { var jobQueue: String = _ /** Temporary directory to write any files */ - var jobTempDir: File = new File(System.getProperty("java.io.tmpdir")) - - /** If true this function will run only if the jobs it is dependent on succeed. */ - var jobRunOnlyIfPreviousSucceed = true - - /** Files that this job should wait on before running. */ - @Input(doc="Explicit job dependencies", required=false) - var jobDependencies: List[File] = Nil + var jobTempDir: File = IOUtils.javaTempDir /** File to redirect any output. Defaults to .out */ @Output(doc="File to redirect any output", required=false) @@ -64,15 +43,6 @@ trait CommandLineFunction extends QFunction with Logging { @Gather(classOf[SimpleTextGatherFunction]) var jobErrorFile: File = _ - /** The complete list of fields on this CommandLineFunction. */ - lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList - /** The @Input fields on this CommandLineFunction. */ - lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) - /** The @Output fields on this CommandLineFunction. */ - lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) - /** The @Argument fields on this CommandLineFunction. */ - lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) - /** * Returns set of directories required to run the command. * @return Set of directories required to run the command. @@ -87,179 +57,20 @@ trait CommandLineFunction extends QFunction with Logging { dirs } - /** - * Returns the input files for this function. - * @return Set[File] inputs for this function. - */ - def inputs = getFieldFiles(inputFields) + override protected def useStatusOutput(file: File) = + file != jobOutputFile && file != jobErrorFile - /** - * Returns the output files for this function. - * @return Set[File] outputs for this function. - */ - def outputs = getFieldFiles(outputFields) - - def doneOutputs = getDoneFiles(outputFields) - - /** - * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. - * @param fields Fields to get files. - * @return Set[File] for the fields. - */ - private def getFieldFiles(fields: List[ArgumentSource]): Set[File] = { - var files = Set.empty[File] - for (field <- fields) - files ++= getFieldFiles(field) - files - } - - /** - * Returns true if all outputs already exist and are older that the inputs. - * If there are no outputs then returns false. - * @return true if all outputs already exist and are older that the inputs. - */ - def upToDate = { - val inputFiles = inputs - if ( doneOutputs.size > 0 && doneOutputs.forall(_.exists) ) { - val maxInput = inputFiles.foldLeft(Long.MinValue)((date,file) => date.max(file.lastModified)) - val minDone = doneOutputs.foldLeft(Long.MaxValue)((date,file) => date.min(file.lastModified)) - maxInput < minDone - } else false - //val inputFiles = inputs - //val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile)) - //if (outputFiles.size > 0 && outputFiles.forall(_.exists)) { - // val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified)) - // val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified)) - // maxInput < minOutput - //} else false - } - - /** - * Gets the files from the field. The field must be a File, a FileExtension, or a List or Set of either. - * @param fields Field to get files. - * @return Set[File] for the field. - */ - def getFieldFiles(field: ArgumentSource): Set[File] = { - var files = Set.empty[File] - CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { - val file = fieldValueToFile(field, fieldValue) - if (file != null) - files += file - }) - files - } - - /** - * Gets the done files from the field. The field must be a File, a FileExtension, or a List or Set of either. - * @param fields Field to get files. - * @return Set[File] set of done files for the field. - */ - private def getDoneFiles(fields: List[ArgumentSource]): Set[File] = { - var doneFiles = Set.empty[File] - for ( field <- fields ) { - CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { - val outFile = fieldValueToFile(field,fieldValue) - if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory && ! outFile.getName.endsWith(".out")) { - doneFiles += new File(outFile.getParent + "/." + outFile.getName + ".done") - } - }) - } - doneFiles - - //for ( outFile <- outFiles ) { - // if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory ) - // doneFiles += new File(outFile.getParent + "." + outFile.getName + ".done") - //} - //doneFiles - } - - def isDone = { - doneOutputs.size == 0 || doneOutputs.forall(_.exists) - } - - /** - * Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different - * @return boolean -- if files are different - */ - private def filesAreDifferent(a: File, b: File): Boolean = { - if ( b == null ) - if ( a == null ) - return false - else - return true - else - if ( a == null ) - return true - else - return ! b.getAbsolutePath.equals(a.getAbsolutePath) - } - - /** - * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. - * @param field Field to get the file. - * @return File for the field. - */ - def getFieldFile(field: ArgumentSource): File = - fieldValueToFile(field, getFieldValue(field)) - - /** - * Converts the field value to a file. The field must be a File or a FileExtension. - * @param field Field to get the file. - * @param value Value of the File or FileExtension or null. - * @return Null if value is null, otherwise the File. - * @throws QException if the value is not a File or FileExtension. - */ - private def fieldValueToFile(field: ArgumentSource, value: Any): File = value match { - case file: File => file - case null => null - case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown)) - } - - /** - * Resets the field to the temporary directory. - * @param field Field to get and set the file. - * @param tempDir new root for the file. - */ - def resetFieldFile(field: ArgumentSource, tempDir: File): File = { - getFieldValue(field) match { - case fileExtension: FileExtension => { - val newFile = IOUtils.resetParent(tempDir, fileExtension) - val newFileExtension = fileExtension.withPath(newFile.getPath) - setFieldValue(field, newFileExtension) - newFileExtension - } - case file: File => { - if (file.getClass != classOf[File]) - throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); - val newFile = IOUtils.resetParent(tempDir, file) - setFieldValue(field, newFile) - newFile - } - case null => null - case unknown => - throw new QException("Unable to set file from %s: %s".format(field, unknown)) - } - } + override def description = commandLine /** * The function description in .dot files */ override def dotString = jobName + " => " + commandLine - /** - * Sets all field values and makes them canonical so that the graph can - * match the inputs of one function to the output of another using equals(). - */ - final override def freeze = { - freezeFieldValues - canonFieldValues - super.freeze - } - /** * Sets all field values. */ - def freezeFieldValues = { + override def freezeFieldValues = { if (jobNamePrefix == null) jobNamePrefix = qSettings.jobNamePrefix @@ -272,57 +83,15 @@ trait CommandLineFunction extends QFunction with Logging { if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined) memoryLimit = qSettings.memoryLimit - if (qSettings.runJobsIfPrecedingFail) - jobRunOnlyIfPreviousSucceed = false - if (jobName == null) jobName = CommandLineFunction.nextJobName(jobNamePrefix) if (jobOutputFile == null) jobOutputFile = new File(jobName + ".out") - commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory) + super.freezeFieldValues } - /** - * Makes all field values canonical so that the graph can match the - * inputs of one function to the output of another using equals(). - */ - def canonFieldValues = { - for (field <- this.functionFields) { - var fieldValue = this.getFieldValue(field) - fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] - this.setFieldValue(field, fieldValue) - } - } - - /** - * Set value to a uniform value across functions. - * Base implementation changes any relative path to an absolute path. - * @param value to be updated - * @return the modified value, or a copy if the value is immutable - */ - protected def canon(value: Any) = { - value match { - case fileExtension: FileExtension => - val newFile = absolute(fileExtension); - val newFileExtension = fileExtension.withPath(newFile.getPath) - newFileExtension - case file: File => - if (file.getClass != classOf[File]) - throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); - absolute(file) - case x => x - } - } - - /** - * Returns the absolute path to the file relative to the job command directory. - * @param file File to root relative to the command directory if it is not already absolute. - * @return The absolute path to file. - */ - private def absolute(file: File) = IOUtils.subDir(commandDirectory, file) - /** * Repeats parameters with a prefix/suffix if they are set otherwise returns "". * Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString. @@ -350,89 +119,6 @@ trait CommandLineFunction extends QFunction with Logging { format: (String, Any, String) => String = formatValue("%s")) = if (hasValue(param)) format(prefix, param, suffix) else "" - /** - * Returns fields that do not have values which are required. - * @return List[String] names of fields missing values. - */ - def missingFields: List[String] = { - val missingInputs = missingFields(inputFields, classOf[Input]) - val missingOutputs = missingFields(outputFields, classOf[Output]) - val missingArguments = missingFields(argumentFields, classOf[Argument]) - (missingInputs | missingOutputs | missingArguments).toList.sorted - } - - /** - * Returns fields that do not have values which are required. - * @param sources Fields to check. - * @param annotation Annotation. - * @return Set[String] names of fields missing values. - */ - private def missingFields(sources: List[ArgumentSource], annotation: Class[_ <: Annotation]): Set[String] = { - var missing = Set.empty[String] - for (source <- sources) { - if (isRequired(source, annotation)) - if (!hasFieldValue(source)) - if (!exclusiveOf(source, annotation).exists(otherSource => hasFieldValue(otherSource))) - missing += "@%s: %s - %s".format(annotation.getSimpleName, source.field.getName, doc(source, annotation)) - } - missing - } - - /** - * Scala sugar type for checking annotation required and exclusiveOf. - */ - private type ArgumentAnnotation = { - def required(): Boolean - def exclusiveOf(): String - def doc(): String - } - - /** - * Returns the isRequired value from the field. - * @param field Field to check. - * @param annotation Annotation. - * @return the isRequired value from the field annotation. - */ - private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required - - /** - * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field - * @param field Field to check. - * @param annotation Annotation. - * @return the Array[ArgumentSource] that may be set instead of the field. - */ - private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf - .split(",").map(_.trim).filter(_.length > 0) - .map(fieldName => functionFields.find(fieldName == _.field.getName) match { - case Some(x) => x - case None => throw new QException("Unable to find exclusion field %s on %s".format(fieldName, this.getClass.getSimpleName)) - }) - - /** - * Returns the doc value from the field. - * @param field Field to check. - * @param annotation Annotation. - * @return the doc value from the field annotation. - */ - private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc - - /** - * Returns true if the field has a value. - * @param source Field to check for a value. - * @return true if the field has a value. - */ - protected def hasFieldValue(source: ArgumentSource) = this.hasValue(this.getFieldValue(source)) - - /** - * Returns false if the value is null or an empty collection. - * @param value Value to test for null, or a collection to test if it is empty. - * @return false if the value is null, or false if the collection is empty, otherwise true. - */ - private def hasValue(param: Any) = CollectionUtils.isNotNullOrNotEmpty(param) - /** * Returns "" if the value is null or an empty collection, otherwise return the value.toString. * @param format Format string if the value has a value @@ -450,27 +136,6 @@ trait CommandLineFunction extends QFunction with Logging { case x => format.format(x) }) + suffix - /** - * Gets the value of a field. - * @param source Field to get the value for. - * @return value of the field. - */ - def getFieldValue(source: ArgumentSource) = ReflectionUtils.getValue(invokeObj(source), source.field) - - /** - * Gets the value of a field. - * @param source Field to set the value for. - * @return value of the field. - */ - def setFieldValue(source: ArgumentSource, value: Any) = ReflectionUtils.setValue(invokeObj(source), source.field, value) - - /** - * Walks gets the fields in this object or any collections in that object - * recursively to find the object holding the field to be retrieved or set. - * @param source Field find the invoke object for. - * @return Object to invoke the field on. - */ - private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _)) } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala deleted file mode 100644 index 893345ad7..000000000 --- a/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala +++ /dev/null @@ -1,15 +0,0 @@ -package org.broadinstitute.sting.queue.function - -import java.io.File - -/** An internal class that is used by bsub to wait on all other jobs before exiting. */ -class DispatchWaitFunction extends CommandLineFunction { - /** - * Returns the command line "echo". - * @return echo - */ - def commandLine = "echo" - - jobQueue = "hour" - jobOutputFile = File.createTempFile("Q-wait", ".out") -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala new file mode 100644 index 000000000..42da4774d --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -0,0 +1,12 @@ +package org.broadinstitute.sting.queue.function + +import java.io.File + +/** + * Runs a function in process. + */ +trait InProcessFunction extends QFunction { + def run() + protected def useStatusOutput(file: File) = true + def description = this.getClass.getSimpleName +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 68a4bf4bc..4bc120d39 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -1,6 +1,11 @@ package org.broadinstitute.sting.queue.function import java.io.File +import java.lang.annotation.Annotation +import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util.{CollectionUtils, IOUtils, ReflectionUtils} +import org.broadinstitute.sting.queue.{QException, QSettings} +import collection.JavaConversions._ /** * The base interface for all functions in Queue. @@ -9,24 +14,300 @@ import java.io.File */ trait QFunction { /** - * After a function is frozen no more updates are allowed by the user. - * The function is allow to make necessary updates internally to make sure - * the inputs and outputs will be equal to other inputs and outputs. + * Analysis function name */ - def freeze = {} + var analysisName: String = _ + + /** Default settings */ + var qSettings: QSettings = _ + + /** Directory to run the command in. */ + var commandDirectory: File = IOUtils.CURRENT_DIR /** - * Set of inputs for this function. + * Description of this command line function. */ - def inputs: Set[File] - - /** - * Set of outputs for this function. - */ - def outputs: Set[File] + def description: String /** * The function description in .dot files */ def dotString = "" + + protected def useStatusOutput(file: File): Boolean + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + private def statusPaths = outputs + .filter(file => useStatusOutput(file)) + .map(file => file.getParentFile + "/." + file.getName) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def doneOutputs = statusPaths.map(path => new File(path + ".done")) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def failOutputs = statusPaths.map(path => new File(path + ".fail")) + + /** The complete list of fields on this CommandLineFunction. */ + lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList + /** The @Input fields on this CommandLineFunction. */ + lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) + /** The @Output fields on this CommandLineFunction. */ + lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) + /** The @Argument fields on this CommandLineFunction. */ + lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) + + /** + * Returns the input files for this function. + * @return Set[File] inputs for this function. + */ + def inputs = getFieldFiles(inputFields) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def outputs = getFieldFiles(outputFields) + + /** + * Returns fields that do not have values which are required. + * @return List[String] names of fields missing values. + */ + def missingFields: List[String] = { + val missingInputs = missingFields(inputFields, classOf[Input]) + val missingOutputs = missingFields(outputFields, classOf[Output]) + val missingArguments = missingFields(argumentFields, classOf[Argument]) + (missingInputs | missingOutputs | missingArguments).toList.sorted + } + + /** + * Returns fields that do not have values which are required. + * @param sources Fields to check. + * @param annotation Annotation. + * @return Set[String] names of fields missing values. + */ + private def missingFields(sources: List[ArgumentSource], annotation: Class[_ <: Annotation]): Set[String] = { + var missing = Set.empty[String] + for (source <- sources) { + if (isRequired(source, annotation)) + if (!hasFieldValue(source)) + if (!exclusiveOf(source, annotation).exists(otherSource => hasFieldValue(otherSource))) + missing += "@%s: %s - %s".format(annotation.getSimpleName, source.field.getName, doc(source, annotation)) + } + missing + } + + /** + * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. + * @param fields Fields to get files. + * @return Set[File] for the fields. + */ + private def getFieldFiles(fields: List[ArgumentSource]): Set[File] = { + var files = Set.empty[File] + for (field <- fields) + files ++= getFieldFiles(field) + files + } + + /** + * Gets the files from the field. The field must be a File, a FileExtension, or a List or Set of either. + * @param fields Field to get files. + * @return Set[File] for the field. + */ + def getFieldFiles(field: ArgumentSource): Set[File] = { + var files = Set.empty[File] + CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { + val file = fieldValueToFile(field, fieldValue) + if (file != null) + files += file + }) + files + } + + /** + * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. + * @param field Field to get the file. + * @return File for the field. + */ + def getFieldFile(field: ArgumentSource): File = + fieldValueToFile(field, getFieldValue(field)) + + /** + * Converts the field value to a file. The field must be a File or a FileExtension. + * @param field Field to get the file. + * @param value Value of the File or FileExtension or null. + * @return Null if value is null, otherwise the File. + * @throws QException if the value is not a File or FileExtension. + */ + private def fieldValueToFile(field: ArgumentSource, value: Any): File = value match { + case file: File => file + case null => null + case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown)) + } + + /** + * Resets the field to the temporary directory. + * @param field Field to get and set the file. + * @param tempDir new root for the file. + */ + def resetFieldFile(field: ArgumentSource, tempDir: File): File = { + getFieldValue(field) match { + case fileExtension: FileExtension => { + val newFile = IOUtils.resetParent(tempDir, fileExtension) + val newFileExtension = fileExtension.withPath(newFile.getPath) + setFieldValue(field, newFileExtension) + newFileExtension + } + case file: File => { + if (file.getClass != classOf[File]) + throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); + val newFile = IOUtils.resetParent(tempDir, file) + setFieldValue(field, newFile) + newFile + } + case null => null + case unknown => + throw new QException("Unable to set file from %s: %s".format(field, unknown)) + } + } + + + /** + * After a function is frozen no more updates are allowed by the user. + * The function is allow to make necessary updates internally to make sure + * the inputs and outputs will be equal to other inputs and outputs. + */ + final def freeze = { + freezeFieldValues + canonFieldValues + } + + def freezeFieldValues = { + commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory) + } + + /** + * Makes all field values canonical so that the graph can match the + * inputs of one function to the output of another using equals(). + */ + def canonFieldValues = { + for (field <- this.functionFields) { + var fieldValue = this.getFieldValue(field) + fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] + this.setFieldValue(field, fieldValue) + } + } + + /** + * Set value to a uniform value across functions. + * Base implementation changes any relative path to an absolute path. + * @param value to be updated + * @return the modified value, or a copy if the value is immutable + */ + protected def canon(value: Any) = { + value match { + case fileExtension: FileExtension => + val newFile = absolute(fileExtension); + val newFileExtension = fileExtension.withPath(newFile.getPath) + newFileExtension + case file: File => + if (file.getClass != classOf[File]) + throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); + absolute(file) + case x => x + } + } + + /** + * Returns the absolute path to the file relative to the job command directory. + * @param file File to root relative to the command directory if it is not already absolute. + * @return The absolute path to file. + */ + private def absolute(file: File) = IOUtils.subDir(commandDirectory, file) + + + /** + * Scala sugar type for checking annotation required and exclusiveOf. + */ + private type ArgumentAnnotation = { + def required(): Boolean + def exclusiveOf(): String + def doc(): String + } + + /** + * Returns the isRequired value from the field. + * @param field Field to check. + * @param annotation Annotation. + * @return the isRequired value from the field annotation. + */ + private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required + + /** + * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field + * @param field Field to check. + * @param annotation Annotation. + * @return the Array[ArgumentSource] that may be set instead of the field. + */ + private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf + .split(",").map(_.trim).filter(_.length > 0) + .map(fieldName => functionFields.find(fieldName == _.field.getName) match { + case Some(x) => x + case None => throw new QException("Unable to find exclusion field %s on %s".format(fieldName, this.getClass.getSimpleName)) + }) + + /** + * Returns the doc value from the field. + * @param field Field to check. + * @param annotation Annotation. + * @return the doc value from the field annotation. + */ + private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc + + /** + * Returns true if the field has a value. + * @param source Field to check for a value. + * @return true if the field has a value. + */ + protected def hasFieldValue(source: ArgumentSource) = this.hasValue(this.getFieldValue(source)) + + /** + * Returns false if the value is null or an empty collection. + * @param value Value to test for null, or a collection to test if it is empty. + * @return false if the value is null, or false if the collection is empty, otherwise true. + */ + protected def hasValue(param: Any) = CollectionUtils.isNotNullOrNotEmpty(param) + + /** + * Gets the value of a field. + * @param source Field to get the value for. + * @return value of the field. + */ + def getFieldValue(source: ArgumentSource) = ReflectionUtils.getValue(invokeObj(source), source.field) + + /** + * Gets the value of a field. + * @param source Field to set the value for. + * @return value of the field. + */ + def setFieldValue(source: ArgumentSource, value: Any) = ReflectionUtils.setValue(invokeObj(source), source.field, value) + + /** + * Walks gets the fields in this object or any collections in that object + * recursively to find the object holding the field to be retrieved or set. + * @param source Field find the invoke object for. + * @return Object to invoke the field on. + */ + private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _)) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala index 7b8e49501..0e0123b16 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala @@ -1,8 +1,9 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File -import org.broadinstitute.sting.commandline.{Argument, Input} +import org.broadinstitute.sting.commandline.Input +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.apache.commons.io.FileUtils /** * Removes the temporary directories for scatter / gather. @@ -10,17 +11,12 @@ import org.broadinstitute.sting.commandline.{Argument, Input} * By default uses rm -rf. * The format of the call is [.. ] */ -class CleanupTempDirsFunction extends CommandLineFunction { +class CleanupTempDirsFunction extends InProcessFunction { @Input(doc="Original outputs of the gather functions") var originalOutputs: Set[File] = Set.empty[File] @Input(doc="Temporary directories to be deleted") var tempDirectories: List[File] = Nil - @Argument(doc="rmdir script or command") - var rmdirScript = "rm -rf" - - override def upToDate = tempDirectories.forall(!_.exists) - - def commandLine = "%s%s".format(rmdirScript, repeat(" '", tempDirectories, "'")) + def run() = tempDirectories.foreach(FileUtils.deleteDirectory(_)) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala index 2b2ae420f..b257616d1 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -1,8 +1,8 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File -import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.commandline.{Argument, Output, Input} +import org.broadinstitute.sting.commandline.{Output, Input} +import org.broadinstitute.sting.queue.function.InProcessFunction /** * Creates the temporary directories for scatter / gather. @@ -10,22 +10,14 @@ import org.broadinstitute.sting.commandline.{Argument, Output, Input} * By default uses mkdir -pv * The format of the call is [.. ] */ -class CreateTempDirsFunction extends CommandLineFunction { +class CreateTempDirsFunction extends InProcessFunction { @Input(doc="Original inputs to the scattered function") var originalInputs: Set[File] = Set.empty[File] @Output(doc="Temporary directories to create") var tempDirectories: List[File] = Nil - @Argument(doc="mkdir script or command") - var mkdirScript = "mkdir -pv" + override protected def useStatusOutput(file: File) = false - override def upToDate = tempDirectories.forall(_.exists) - - def commandLine = "%s%s".format(mkdirScript, repeat(" '", tempDirectories, "'")) - - /** - * This function is creating the directories, so returns just this command directory. - */ - override def jobDirectories = Set(commandDirectory) + def run() = tempDirectories.foreach(_.mkdirs) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index f5886865a..96030e99f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -1,13 +1,13 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.{CommandLineFunction} import java.io.File import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.queue.function.QFunction /** * Base class for Gather command line functions. */ -trait GatherFunction extends CommandLineFunction { +trait GatherFunction extends QFunction { @Input(doc="Parts to gather back into the original output") var gatherParts: List[File] = Nil diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index b0a8ab794..d1cfd4916 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -1,13 +1,13 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.queue.function.QFunction /** * Base class for Scatter command line functions. */ -trait ScatterFunction extends CommandLineFunction { +trait ScatterFunction extends QFunction { @Input(doc="Original input to scatter") var originalInput: File = _ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 883e6f95e..a003b2737 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.commandline.ArgumentSource -import org.broadinstitute.sting.queue.function.CommandLineFunction import com.rits.cloning.Cloner +import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** * A function that can be run faster by splitting it up into pieces and then joining together the results. @@ -84,10 +84,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { * Returns a list of scatter / gather and clones of this function * that can be run in parallel to produce the same output as this * command line function. - * @return List[CommandLineFunction] to run instead of this function. + * @return List[QFunction] to run instead of this function. */ def generateFunctions() = { - var functions = List.empty[CommandLineFunction] + var functions = List.empty[QFunction] var tempDirectories = List.empty[File] // Only depend on input fields that have a value @@ -112,7 +112,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { } // Create the clone functions for running the parallel jobs - var cloneFunctions = List.empty[CommandLineFunction] + var cloneFunctions = List.empty[ScatterGatherableFunction] for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() initCloneFunction(cloneFunction, i) @@ -290,7 +290,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) this.setupCloneFunction(cloneFunction, index) - cloneFunction.isGather = false } /** @@ -361,7 +360,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param Sub directory under the scatter gather directory. * @return temporary directory under this scatter gather directory. */ - private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-" + subDir) + private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala index 9a5681e4d..6e35221a5 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala @@ -1,6 +1,11 @@ package org.broadinstitute.sting.queue.function.scattergather import org.broadinstitute.sting.commandline.Argument +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.apache.commons.io.FileUtils +import org.broadinstitute.sting.queue.QException +import collection.JavaConversions._ +import java.io.PrintWriter /** * Merges a text file. @@ -8,9 +13,50 @@ import org.broadinstitute.sting.commandline.Argument * By default uses mergeText.sh in Sting/shell. * The format of the call is [.. ] */ -class SimpleTextGatherFunction extends GatherFunction { +class SimpleTextGatherFunction extends GatherFunction with InProcessFunction { @Argument(doc="merge text script") var mergeTextScript = "mergeText.sh" - def commandLine = "%s %s%s".format(mergeTextScript, originalOutput, repeat(" ", gatherParts)) + + def run() = { + if (gatherParts.size < 1) { + throw new QException("No files to gather to output: " + originalOutput) + } else if (gatherParts.size == 1) { + FileUtils.copyFile(gatherParts(0), originalOutput) + } else { + val writer = new PrintWriter(originalOutput) + var startLine = 0 + + val readerA = FileUtils.lineIterator(gatherParts(0)) + val readerB = FileUtils.lineIterator(gatherParts(1)) + var headersMatch = true + while (headersMatch) { + if (readerA.hasNext && readerB.hasNext) { + val headerA = readerA.nextLine + val headerB = readerB.nextLine + headersMatch = headerA == headerB + if (headersMatch) { + startLine += 1 + writer.println(headerA) + } + } else { + headersMatch = false + } + } + readerA.close + readerB.close + + for (file <- gatherParts) { + val reader = FileUtils.lineIterator(file) + var lineNum = 0 + while (reader.hasNext && lineNum < startLine) { + reader.nextLine + lineNum += 1 + } + while (reader.hasNext) + writer.println(reader.nextLine) + } + writer.close + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala b/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala index 0b322301e..59a0a600d 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala @@ -3,7 +3,7 @@ package org.broadinstitute.sting.queue.util import java.io.File /** - * Base class for a command line job. + * Base utility class for a command line job. */ abstract class CommandLineJob { var command: String = _ diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index 52c772827..e61a85677 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -1,7 +1,7 @@ package org.broadinstitute.sting.queue.util -import java.io.{IOException, File} import org.apache.commons.io.FileUtils +import java.io.{FileReader, IOException, File} /** * A collection of utilities for modifying java.io. @@ -107,8 +107,8 @@ object IOUtils { def writeContents(file: File, content: String) = FileUtils.writeStringToFile(file, content) - def writeTempFile(content: String, prefix: String, suffix: String = "") = { - val tempFile = absolute(File.createTempFile(prefix, suffix, networkTempDir)) + def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File = networkTempDir) = { + val tempFile = absolute(File.createTempFile(prefix, suffix, directory)) writeContents(tempFile, content) tempFile } @@ -133,6 +133,12 @@ object IOUtils { directories(level) } + /** + * A mix of getCanonicalFile and getAbsoluteFile that returns the + * absolute path to the file without deferencing symbolic links. + * @param file the file. + * @return the absolute path to the file. + */ def absolute(file: File) = { var fileAbs = file.getAbsoluteFile var names = List.empty[String] @@ -167,4 +173,30 @@ object IOUtils { new File(names.mkString("/", "/", "")) } + + /** + * Returns the last lines of the file. + * NOTE: This is only safe to run on smaller files! + * @param file File to read. + * @param count Maximum number of lines to return. + * @return The last count lines from file. + */ + def tail(file: File, count: Int) = { + var tailLines = List.empty[String] + var reader = new FileReader(file) + try { + val iterator = org.apache.commons.io.IOUtils.lineIterator(reader) + var lineCount = 0 + while (iterator.hasNext) { + val line = iterator.nextLine + lineCount += 1 + if (lineCount > count) + tailLines = tailLines.tail + tailLines :+= line + } + } finally { + org.apache.commons.io.IOUtils.closeQuietly(reader) + } + tailLines + } } diff --git a/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala new file mode 100644 index 000000000..39c12d2a6 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala @@ -0,0 +1,11 @@ +package org.broadinstitute.sting.queue.util + +import org.broadinstitute.sting.queue.QException + +/** + * Captures the exit code and error text from a failed process. + */ +class JobExitException(var exitText: String, var commandLine: Array[String], var exitCode: Int, var stdErr: String) + extends QException("%s%nCommand line:%n%s%nExit code: %s%nStandard error contained: %n%s" + .format(exitText, commandLine.mkString(" "), exitCode, stdErr)) { +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala index f18ad4304..4a19f9fa4 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala @@ -2,10 +2,9 @@ package org.broadinstitute.sting.queue.util import java.util.regex.Pattern import collection.JavaConversions._ -import org.broadinstitute.sting.queue.QException /** - * An job submitted to LSF. This class is designed to work somewhat like + * A job submitted to LSF. This class is designed to work somewhat like * java.lang.Process, but has some extensions. * * @author A subset of the original BroadCore ported to scala by Khalid Shakir @@ -24,7 +23,7 @@ class LsfJob extends CommandLineJob with Logging { * Starts the job. Command must exist. The job will be submitted to LSF. */ def run() = { - assert(bsubJobId == null, "LSF job was already started") + assert(bsubJobId == null, "LSF job was already submitted") assert(command != null, "Command was not set on LSF job") assert(outputFile != null, "Output file must be set on LSF job") @@ -33,32 +32,20 @@ class LsfJob extends CommandLineJob with Logging { val stdoutSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) - // This is really nice for debugging, but spits out way too much stuff otherwise! - // log.info("About to execute LSF command: " + StringUtils.join(argArray, " ")); - - // Get environment vars and strip out LD_ASSUME_KERNEL - // This is necessary since GAP servers on linux 2.4.x kernel and can be removed when - // its no longer true. Only 'classic' LSF queue has 2.4 kernel-based machines. - // launch the bsub job from the current directory val processSettings = new ProcessController.ProcessSettings( bsubCommand, environmentVariables, null, stdinSettings, stdoutSettings, stderrSettings, false) val bsubOutput = processController.exec(processSettings) if (bsubOutput.exitValue != 0) { - logger.error("Failed to submit LSF job, got exit code %s. Standard error contained: %n%s" - .format(bsubOutput.exitValue, content(bsubOutput.stderr))) - throw new QException("Failed to submit LSF job, got exit code %s.".format(bsubOutput.exitValue)) + throw new JobExitException("Failed to submit LSF job.", bsubCommand, + bsubOutput.exitValue, content(bsubOutput.stderr)) } // get the LSF job ID val matcher = LsfJob.JOB_ID.matcher(bsubOutput.stdout.content) matcher.find() bsubJobId = matcher.group - - // set job name to LSF_ if not set already - if (name == null) - name = "lsf_job_" + bsubJobId } /** @@ -136,6 +123,12 @@ class LsfJob extends CommandLineJob with Logging { .toMap } +/** + * A job submitted to LSF. This class is designed to work somewhat like + * java.lang.Process, but has some extensions. + * + * @author A subset of the original BroadCore ported to scala by Khalid Shakir + */ object LsfJob { /** Used to search the stdout for the job id. */ private val JOB_ID = Pattern.compile("\\d+") diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala new file mode 100644 index 000000000..e53681652 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala @@ -0,0 +1,26 @@ +package org.broadinstitute.sting.queue.util + +/** + * bkills a list of lsf jobs. + */ +class LsfKillJob(jobs: List[LsfJob]) extends CommandLineJob with Logging { + command = "bkill " + jobs.map(_.bsubJobId).mkString(" ") + + def run() = { + // capture the output for debugging + val stdinSettings = new ProcessController.InputStreamSettings(null, null) + val stdoutSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) + val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) + + val bkillCommand = (List("bkill") ++ jobs.map(_.bsubJobId)).toArray + + // launch the bsub job from the current directory + val processSettings = new ProcessController.ProcessSettings( + bkillCommand, null, null, stdinSettings, stdoutSettings, stderrSettings, false) + val bkillOutput = processController.exec(processSettings) + + if (bkillOutput.exitValue != 0) { + throw new JobExitException("Failed to kill LSF jobs.", bkillCommand, bkillOutput.exitValue, content(bkillOutput.stderr)) + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala index 9e278302b..92b63c628 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala @@ -253,14 +253,14 @@ object ProcessController extends Logging { private val STDERR_KEY = "stderr" /** Tracks running processes so that they can be killed as the JVM shuts down. */ - private val running = new HashSet[Process]() - Runtime.getRuntime.addShutdownHook(new Thread { - /** Kills running processes as the JVM shuts down. */ - override def run = for (process <- running.clone) { + private val running = new HashSet[Process] + + def shutdown() = { + for (process <- running.clone) { logger.warn("Killing: " + process) process.destroy } - }) + } /** Empty stream settings used when no output is requested. */ private object EmptyStreamSettings extends OutputStreamSettings(0, null, false) diff --git a/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala b/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala index b9458f4c6..863d2a2ea 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala @@ -17,16 +17,15 @@ class ShellJob extends CommandLineJob with Logging { val stdinSettings = new ProcessController.InputStreamSettings(null, this.inputFile) val stdoutSettings = new ProcessController.OutputStreamSettings(bufferSize, this.outputFile, true) val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, errorFile, true) + val commandLine = Array("sh", "-c", command) val processSettings = new ProcessController.ProcessSettings( - Array("sh", "-c", command), null, this.workingDir, stdinSettings, stdoutSettings, stderrSettings, redirectError) + commandLine, null, this.workingDir, stdinSettings, stdoutSettings, stderrSettings, redirectError) val output = processController.exec(processSettings) if (output.exitValue != 0) { val streamOutput = if (redirectError) output.stdout else output.stderr - logger.error("Failed to run job, got exit code %s. Error contained: %n%s" - .format(output.exitValue, content(streamOutput))) - throw new QException("Failed to run job, got exit code %s.".format(output.exitValue)) + throw new JobExitException("Failed to run job.", commandLine, output.exitValue, content(streamOutput)) } } } diff --git a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala index 255ebe79b..6982f4978 100644 --- a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala @@ -99,4 +99,18 @@ class IOUtilsUnitTest extends BaseTest { dir = IOUtils.absolute(new File("/./.directory/")) Assert.assertEquals(new File("/.directory"), dir) } + + @Test + def testTail = { + val lines = List( + "chr18_random 4262 3154410390 50 51", + "chr19_random 301858 3154414752 50 51", + "chr21_random 1679693 3154722662 50 51", + "chr22_random 257318 3156435963 50 51", + "chrX_random 1719168 3156698441 50 51") + val tail = IOUtils.tail(new File(BaseTest.hg18Reference + ".fai"), 5) + Assert.assertEquals(5, tail.size) + for (i <- 0 until 5) + Assert.assertEquals(lines(i), tail(i)) + } }