diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java index b4d96a124..a6893636e 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java @@ -116,8 +116,9 @@ public class GATKExtensionsGenerator extends CommandLineProgram { argumentFields.addAll(RodBindField.getRodArguments(walkerType, trackBuilder)); argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType)); - writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, - walkerName, String.format("analysis_type = \"%s\"%n", walkerName), argumentFields); + writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, walkerName, + String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName), + argumentFields); } } } diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 1da296e42..374ac0157 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -42,16 +42,16 @@ public abstract class BaseTest { /** our log, which we want to capture anything from org.broadinstitute.sting */ public static Logger logger = Logger.getRootLogger(); - protected static String hg18Reference = "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta"; - protected static String hg19Reference = "/seq/references/Homo_sapiens_assembly19/v0/Homo_sapiens_assembly19.fasta"; - protected static String b36KGReference = "/humgen/1kg/reference/human_b36_both.fasta"; - protected static String b37KGReference = "/humgen/1kg/reference/human_g1k_v37.fasta"; - protected static String GATKDataLocation = "/humgen/gsa-hpprojects/GATK/data/"; - protected static String validationDataLocation = GATKDataLocation + "Validation_Data/"; - protected static String evaluationDataLocation = GATKDataLocation + "Evaluation_Data/"; - protected static String comparisonDataLocation = GATKDataLocation + "Comparisons/"; + public static String hg18Reference = "/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta"; + public static String hg19Reference = "/seq/references/Homo_sapiens_assembly19/v0/Homo_sapiens_assembly19.fasta"; + public static String b36KGReference = "/humgen/1kg/reference/human_b36_both.fasta"; + public static String b37KGReference = "/humgen/1kg/reference/human_g1k_v37.fasta"; + public static String GATKDataLocation = "/humgen/gsa-hpprojects/GATK/data/"; + public static String validationDataLocation = GATKDataLocation + "Validation_Data/"; + public static String evaluationDataLocation = GATKDataLocation + "Evaluation_Data/"; + public static String comparisonDataLocation = GATKDataLocation + "Comparisons/"; - protected static String testDir = "testdata/"; + public String testDir = "testdata/"; protected static boolean alreadySetup = false; diff --git a/scala/qscript/kshakir/CleanBamFile.scala b/scala/qscript/kshakir/CleanBamFile.scala index cf2f4c32e..77da34e1a 100644 --- a/scala/qscript/kshakir/CleanBamFile.scala +++ b/scala/qscript/kshakir/CleanBamFile.scala @@ -175,7 +175,11 @@ class CleanBamFile extends QScript { bamIndex.bamFile = fixedBam bamIndex.bamFileIndex = swapExt(fixedBam, "bam", "bam.bai") - val importer = new ImportSingleValueFunction + val importer = new ImportSingleValueFunction { + /** Files that this job should wait on before running. */ + @Input(doc="Explicit job dependencies", required=false) + var jobDependencies: List[File] = Nil + } importer.jobQueue = shortJobQueue importer.jarFile = firehoseImportJar importer.host = firehoseHost diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index a18a8ef69..12e24af96 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue import java.io.File import java.util.Arrays import org.broadinstitute.sting.queue.engine.QGraph -import org.broadinstitute.sting.queue.util.{Logging, ScalaCompoundArgumentTypeDescriptor} import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util.{ProcessController, Logging, ScalaCompoundArgumentTypeDescriptor} /** * Entry point of Queue. Compiles and runs QScripts passed in to the command line. @@ -17,9 +17,6 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false) private var bsubAllJobs = false - @Argument(fullName="bsub_wait_jobs", shortName="bsubWait", doc="Wait for bsub submitted jobs before exiting", required=false) - private var bsubWaitJobs = false - @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) private var run = false @@ -46,10 +43,10 @@ class QCommandLine extends CommandLineProgram with Logging { * functions, and then builds and runs a QGraph based on the dependencies. */ def execute = { + val qGraph = new QGraph qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs - qGraph.bsubWaitJobs = bsubWaitJobs qGraph.skipUpToDateJobs = skipUpToDate qGraph.dotFile = dotFile qGraph.expandedDotFile = expandedDotFile @@ -65,6 +62,14 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } + Runtime.getRuntime.addShutdownHook(new Thread { + /** Kills running processes as the JVM shuts down. */ + override def run = { + qGraph.shutdown() + ProcessController.shutdown() + } + }) + if ( ! getStatus ) { logger.info("Running generated graph") qGraph.run @@ -73,8 +78,13 @@ class QCommandLine extends CommandLineProgram with Logging { qGraph.checkStatus } - logger.info("Done") - 0 + if (qGraph.hasFailed) { + logger.info("Done with errors") + 1 + } else { + logger.info("Done") + 0 + } } /** diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala index b97bf176f..808a5ea81 100644 --- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -22,9 +22,6 @@ class QSettings { @Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false) var memoryLimit: Option[Int] = None - - @Argument(fullName="runJobsIfPrecedingFail", shortName="runIfFail", doc="If this flag is set then ALL jobs will run even if the previous jobs fail.", required=false) - var runJobsIfPrecedingFail = false } /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala index ec9d0d819..c07ff2ff2 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -1,7 +1,6 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.{CommandLineFunction, QFunction} -import scala.collection.immutable.ListSet +import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util.IOUtils import java.io.File @@ -9,62 +8,17 @@ import java.io.File * Dispatches jobs to a compute cluster. */ trait DispatchJobRunner extends JobRunner { - /** Type of the job. */ - type DispatchJobType - /** An internal cache of all the jobs that have run by command line function. */ - private var dispatchJobs = Map.empty[CommandLineFunction, DispatchJobType] - /** An internal list of functions that have no other dependencies. */ - private var waitJobsByGraph = Map.empty[QGraph, ListSet[DispatchJobType]] - - /** - * Adds the job to the internal cache of previous jobs and removes the previous jobs that - * the job was dependent on from the list of function that have no dependencies. - * @param function CommandLineFunction to add to the list. - * @param qGraph Current qGraph being iterated over. - * @param dispatchJob The job that is being added to the cache. - * @param previousJobs The previous jobs that the job was dependent one. - */ - protected def addJob(function: CommandLineFunction, qGraph: QGraph, - dispatchJob: DispatchJobType, previousJobs: Iterable[DispatchJobType]) = { - dispatchJobs += function -> dispatchJob - var waitJobs = getWaitJobs(qGraph) - for (previousJob <- previousJobs) - waitJobs -= previousJob - waitJobs += dispatchJob - waitJobsByGraph += qGraph -> waitJobs - } - - /** - * Walks up the graph looking for the previous LsfJobs. - * @param function Function to examine for a previous command line job. - * @param qGraph The graph that contains the jobs. - * @return A list of prior jobs. - */ - protected def previousJobs(function: CommandLineFunction, qGraph: QGraph) : List[DispatchJobType] = - qGraph.previousJobs(function).map(dispatchJobs(_)) - - /** - * Returns a set of jobs that have no following jobs in the graph. - * @param qGraph The graph that contains the jobs. - * @return ListSet[DispatchJobType] of previous jobs that have no dependent jobs. - */ - protected def getWaitJobs(qGraph: QGraph) = { - if (!waitJobsByGraph.contains(qGraph)) - waitJobsByGraph += qGraph -> ListSet.empty[DispatchJobType] - waitJobsByGraph(qGraph) - } - /** * Builds a command line that can be run to force an automount of the directories. * @param function Function to look jobDirectories. - * @return A "cd [&& cd ]" command. + * @return A "cd '' [&& cd '']" command. */ protected def mountCommand(function: CommandLineFunction) = { var dirs = Set.empty[File] for (dir <- function.jobDirectories) dirs += IOUtils.dirLevel(dir, 2) if (dirs.size > 0) - Some(dirs.mkString("cd ", " && cd ", "")) + Some(dirs.mkString("cd '", "' && cd '", "'")) else None } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala new file mode 100644 index 000000000..28d73c7fb --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala @@ -0,0 +1,38 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} + +/** + * Only logs the command to run. Doesn't actually run it. + */ +class DryRunner(function: QFunction) extends JobRunner with Logging { + /** + * Dry runs the function logging the command lines. + * @param function Command to run. + */ + // TODO: Why do we need the dry runner? Can we just use a dryRun() method to log per JobRunner? + def start() = { + function match { + case clf: CommandLineFunction => { + if (logger.isDebugEnabled) { + logger.debug(clf.commandDirectory + " > " + clf.commandLine) + } else { + logger.info(clf.commandLine) + } + logger.info("Output written to " + clf.jobOutputFile) + if (clf.jobErrorFile != null) { + logger.info("Errors written to " + clf.jobErrorFile) + } else { + if (logger.isDebugEnabled) + logger.info("Errors also written to " + clf.jobOutputFile) + } + } + case qFunction => { + logger.info(qFunction.description) + } + } + } + + def status = RunnerStatus.DONE +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala new file mode 100644 index 000000000..ce300b7a2 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -0,0 +1,41 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.QFunction + +/** + * An edge in the QGraph that runs a QFunction. + * The edge is created first to determine inter-node dependencies, + * and then the runner is specified later when the time comes to + * execute the function in the edge. + */ +class FunctionEdge(var function: QFunction) extends QEdge { + var runner: JobRunner =_ + + private var currentStatus = { + val doneOutputs = function.doneOutputs + val failOutputs = function.failOutputs + if (failOutputs.exists(_.exists)) + RunnerStatus.FAILED + else if (doneOutputs.forall(_.exists)) + RunnerStatus.DONE + else + RunnerStatus.PENDING + } + + def status = { + if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) + if (runner != null) + currentStatus = runner.status + currentStatus + } + + def resetPending() = { + currentStatus = RunnerStatus.PENDING + function.doneOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete) + } + + def inputs = function.inputs + def outputs = function.outputs + override def dotString = function.dotString +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala new file mode 100644 index 000000000..22c65bf2e --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -0,0 +1,41 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.queue.util.Logging + +/** + * Runs a function that executes in process and does not fork out an external process. + */ +class InProcessRunner(function: InProcessFunction) extends JobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + + def start() = { + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + function.description) + } else { + logger.info("Starting: " + function.description) + } + + function.doneOutputs.foreach(_.delete) + function.failOutputs.foreach(_.delete) + runStatus = RunnerStatus.RUNNING + try { + function.run() + function.doneOutputs.foreach(_.createNewFile) + runStatus = RunnerStatus.DONE + logger.info("Done: " + function.description) + } catch { + case e => { + runStatus = RunnerStatus.FAILED + try { + function.failOutputs.foreach(_.createNewFile) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.description, e) + } + } + } + + def status = runStatus +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 93ff4cc1c..74625992f 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -1,16 +1,21 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.CommandLineFunction - /** * Base interface for job runners. */ trait JobRunner { /** - * Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction - * in which case it waits for all other terminal functions to complete. + * Runs the function. + * After the function returns the status of the function should + * be RUNNING, FAILED, or DONE. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) -} \ No newline at end of file + def start() + + /** + * Returns the current run status. + * Must only be called AFTER start(). + * @return RUNNING, DONE, or FAILED. + */ + def status: RunnerStatus.Value +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 61761e82c..f8262bd46 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -1,22 +1,39 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.function.{CommandLineFunction, DispatchWaitFunction} +import java.io.File import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging} +import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs on an LSF compute cluster. */ -class LsfJobRunner extends DispatchJobRunner with Logging { - type DispatchJobType = LsfJob +class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + + var job: LsfJob = _ + + /** A file to look for to validate that the function ran to completion. */ + private var jobStatusPath: String = _ + + /** A temporary job done file to let Queue know that the process ran successfully. */ + private lazy val jobDoneFile = new File(jobStatusPath + ".done") + + /** A temporary job done file to let Queue know that the process exited with an error. */ + private lazy val jobFailFile = new File(jobStatusPath + ".fail") + + /** A generated pre-exec shell script. */ + private var preExec: File = _ + + /** A generated post-exec shell script. */ + private var postExec: File = _ /** * Dispatches the function on the LSF cluster. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) = { - val job = new LsfJob - job.name = function.jobName + def start() = { + job = new LsfJob + // job.name = function.jobName TODO: Make setting the job name optional. job.outputFile = function.jobOutputFile job.errorFile = function.jobErrorFile job.project = function.jobProject @@ -32,56 +49,114 @@ class LsfJobRunner extends DispatchJobRunner with Logging { if (function.memoryLimit.isDefined) job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - if ( ! function.commandLine.contains("mkdir")) // wild hack -- ignore mkdirs ?? - job.postExecCommand = function.doneOutputs.foldLeft("python /humgen/gsa-scr1/chartl/sting/python/lsf_post_touch.py ")((b,a) => b + a.getAbsolutePath+" ") - // hacky trailing space, so sue me -- CH + preExec = writePreExec(function) + job.preExecCommand = "sh " + preExec - val previous: Iterable[LsfJob] = - if (function.isInstanceOf[DispatchWaitFunction]) { - job.waitForCompletion = true - getWaitJobs(qGraph) - } else { - previousJobs(function, qGraph) - } - - mountCommand(function) match { - case Some(command) => job.preExecCommand = command - case None => /* ignore */ - } - - if (previous.size > 0) - job.extraBsubArgs ++= List("-w", dependencyExpression(previous, - function.jobRunOnlyIfPreviousSucceed, qGraph.dryRun)) - - addJob(function, qGraph, job, previous) + postExec = writePostExec(function) + job.postExecCommand = "sh " + postExec if (logger.isDebugEnabled) { - logger.debug(function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) + logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) } else { - logger.info(job.bsubCommand.mkString(" ")) + logger.info("Starting: " + job.bsubCommand.mkString(" ")) } - if (!qGraph.dryRun) - job.run + runStatus = RunnerStatus.RUNNING + job.run() + jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString + logger.info("Submitted LSF job id: " + job.bsubJobId) } /** - * Returns the dependency expression for the prior jobs. - * @param jobs Previous jobs this job is dependent on. - * @param runOnSuccess Run the job only if the previous jobs succeed. - * @param dryRun If the current run is a dry run. - * @return The dependency expression for the prior jobs. + * Updates and returns the status by looking for job status files. + * After the job status files are detected they are cleaned up from + * the file system and the status is cached. + * + * Note, these temporary job status files are currently different from the + * .done files used to determine if a file has been created successfully. */ - private def dependencyExpression(jobs: Iterable[LsfJob], - runOnSuccess: Boolean, dryRun: Boolean) = { - val jobIds = if (dryRun) - jobs.toSet[LsfJob].map("\"" + _.name + "\"") - else - jobs.toSet[LsfJob].map(_.bsubJobId) + def status = { + if (logger.isDebugEnabled) { + logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists)) + logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists)) + } - if (runOnSuccess) - jobIds.mkString("done(", ") && done(", ")") - else - jobIds.mkString("ended(", ") && ended(", ")") + if (jobFailFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.FAILED + logger.info("Error: " + job.bsubCommand.mkString(" ")) + tailError() + } else if (jobDoneFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.DONE + logger.info("Done: " + job.bsubCommand.mkString(" ")) + } + + runStatus + } + + /** + * Removes all temporary files used for this LSF job. + */ + private def removeTemporaryFiles() = { + preExec.delete() + postExec.delete() + jobDoneFile.delete() + jobFailFile.delete() + } + + /** + * Outputs the last lines of the error logs. + */ + private def tailError() = { + val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile + val tailLines = IOUtils.tail(errorFile, 100) + val nl = "%n".format() + logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) + } + + /** + * Writes a pre-exec file to cleanup any status files and + * optionally mount any automount directories on the node. + * @return the file path to the pre-exec. + */ + private def writePreExec(function: CommandLineFunction): File = { + val preExec = new StringBuilder + + preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(function.commandDirectory)) + function.doneOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) + preExec.append("rm -f '%s/'.$LSB_JOBID.fail%n".format(function.commandDirectory)) + function.failOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) + + mountCommand(function).foreach(command => + preExec.append("%s%n".format(command))) + + IOUtils.writeTempFile(preExec.toString, ".preExec", "", function.commandDirectory) + } + + /** + * Writes a post-exec file to create the status files. + * @return the file path to the post-exec. + */ + private def writePostExec(function: CommandLineFunction): File = { + val postExec = new StringBuilder + + val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString + val touchFail = function.failOutputs.map("touch '%s'%n".format(_)).mkString + + postExec.append("""| + |if [ "${LSB_JOBPEND:-unset}" != "unset" ]; then + | exit 0 + |fi + | + |JOB_STAT_ROOT='%s/'.$LSB_JOBID + |if [ "$LSB_JOBEXIT_STAT" == "0" ]; then + |%stouch "$JOB_STAT_ROOT".done + |else + |%stouch "$JOB_STAT_ROOT".fail + |fi + |""".stripMargin.format(function.commandDirectory, touchDone, touchFail)) + + IOUtils.writeTempFile(postExec.toString, ".postExec", "", function.commandDirectory) } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala similarity index 63% rename from scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala rename to scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala index c43567809..6f42a705b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.queue.function +package org.broadinstitute.sting.queue.engine import java.io.File @@ -6,10 +6,11 @@ import java.io.File * Utility class to map a set of inputs to set of outputs. * The QGraph uses this function internally to map between user defined functions. */ -class MappingFunction(val inputs: Set[File], val outputs: Set[File]) extends QFunction { +class MappingEdge(val inputs: Set[File], val outputs: Set[File]) extends QEdge { /** * For debugging purposes returns . * @return */ override def toString = "" + override def dotString = "" } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala new file mode 100644 index 000000000..265d60a74 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala @@ -0,0 +1,23 @@ +package org.broadinstitute.sting.queue.engine + +import java.io.File + +/** + * An edge in the QGraph + */ +trait QEdge { + /** + * Set of inputs for this function. + */ + def inputs: Set[File] + + /** + * Set of outputs for this function. + */ + def outputs: Set[File] + + /** + * The function description in .dot files + */ + def dotString = "" +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 1756ef2c8..43923746b 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -4,25 +4,22 @@ import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.graph.SimpleDirectedGraph import scala.collection.JavaConversions import scala.collection.JavaConversions._ -import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction -import org.broadinstitute.sting.queue.util.Logging import org.jgrapht.alg.CycleDetector import org.jgrapht.EdgeFactory import org.jgrapht.ext.DOTExporter import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.{QSettings, QException} -import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction} -import collection.mutable.HashMap +import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, ScatterGatherableFunction} +import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} +import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging} /** * The internal dependency tracker between sets of function input and output files. */ class QGraph extends Logging { - var status = false var dryRun = true var bsubAllJobs = false - var bsubWaitJobs = false var skipUpToDateJobs = false var dotFile: File = _ var expandedDotFile: File = _ @@ -34,14 +31,48 @@ class QGraph extends Logging { * Adds a QScript created CommandLineFunction to the graph. * @param command Function to add to the graph. */ - def add(command: CommandLineFunction) { - addFunction(command) + def add(command: QFunction) { + try { + command.qSettings = this.qSettings + command.freeze + addEdge(new FunctionEdge(command)) + } catch { + case e: Exception => + throw new QException("Error adding function: " + command, e) + } } + + private def scatterGatherable(edge: FunctionEdge) = { + edge.function match { + case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true + case _ => false + } + } + + /** * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ def run = { + val numMissingValues = fillGraph + val isReady = numMissingValues == 0 + + if (isReady || this.dryRun) { + runJobs() + } + + if (numMissingValues > 0) { + logger.error("Total missing values: " + numMissingValues) + } + + if (isReady && this.dryRun) { + logger.info("Dry run completed successfully!") + logger.info("Re-run with \"-run\" to execute the functions.") + } + } + + private def fillGraph = { fill if (dotFile != null) renderToDot(dotFile) @@ -49,15 +80,15 @@ class QGraph extends Logging { if (numMissingValues == 0 && bsubAllJobs) { logger.debug("Scatter gathering jobs.") - var scatterGathers = List.empty[ScatterGatherableFunction] + var scatterGathers = List.empty[FunctionEdge] loop({ - case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => - scatterGathers :+= scatterGather + case edge: FunctionEdge if (scatterGatherable(edge)) => + scatterGathers :+= edge }) - var addedFunctions = List.empty[CommandLineFunction] + var addedFunctions = List.empty[QFunction] for (scatterGather <- scatterGathers) { - val functions = scatterGather.generateFunctions() + val functions = scatterGather.function.asInstanceOf[ScatterGatherableFunction].generateFunctions() if (this.debugMode) logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) addedFunctions ++= functions @@ -65,7 +96,7 @@ class QGraph extends Logging { this.jobGraph.removeAllEdges(scatterGathers) prune - addedFunctions.foreach(this.addFunction(_)) + addedFunctions.foreach(this.add(_)) fill val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile @@ -74,46 +105,33 @@ class QGraph extends Logging { numMissingValues = validate } - val isReady = numMissingValues == 0 - - if ( (isReady || this.dryRun) && ! this.status ) - runJobs - - if (numMissingValues > 0) { - logger.error("Total missing values: " + numMissingValues) - } - - if (isReady && this.dryRun && ! this.status) { - logger.info("Dry run completed successfully!") - logger.info("Re-run with \"-run\" to execute the functions.") - } + numMissingValues } def checkStatus = { // build up the full DAG with scatter-gather jobs - this.status = true - run - runStatus + fillGraph + logStatus } /** - * Walks up the graph looking for the previous LsfJobs. + * Walks up the graph looking for the previous command line edges. * @param function Function to examine for a previous command line job. * @param qGraph The graph that contains the jobs. * @return A list of prior jobs. */ - def previousJobs(function: QFunction) : List[CommandLineFunction] = { - var previous = List.empty[CommandLineFunction] + def previousFunctions(edge: QEdge) : List[FunctionEdge] = { + var previous = List.empty[FunctionEdge] - val source = this.jobGraph.getEdgeSource(function) + val source = this.jobGraph.getEdgeSource(edge) for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { incomingEdge match { // Stop recursing when we find a job along the edge and return its job id - case commandLineFunction: CommandLineFunction => previous :+= commandLineFunction + case functionEdge: FunctionEdge => previous :+= functionEdge - // For any other type of edge find the LSF jobs preceding the edge - case qFunction: QFunction => previous ++= previousJobs(qFunction) + // For any other type of edge find the jobs preceding the edge + case edge: QEdge => previous ++= previousFunctions(edge) } } previous @@ -125,8 +143,6 @@ class QGraph extends Logging { */ private def fill = { fillIn - if (skipUpToDateJobs) - removeUpToDate prune } @@ -136,34 +152,34 @@ class QGraph extends Logging { private def fillIn = { // clone since edgeSet is backed by the graph JavaConversions.asSet(jobGraph.edgeSet).clone.foreach { - case cmd: CommandLineFunction => { + case cmd: FunctionEdge => { addCollectionOutputs(cmd.outputs) addCollectionInputs(cmd.inputs) } - case map: MappingFunction => /* do nothing for mapping functions */ + case map: MappingEdge => /* do nothing for mapping edges */ } } - /** - * Removes functions that are up to date. - */ - private def removeUpToDate = { - var upToDateJobs = Set.empty[CommandLineFunction] + private def getReadyJobs = { + var readyJobs = List.empty[FunctionEdge] loop({ - case f if (upToDate(f, upToDateJobs)) => { - logger.info("Skipping command because it is up to date: %n%s".format(f.commandLine)) - upToDateJobs += f + case f: FunctionEdge => { + if (this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING) + readyJobs :+= f } }) - for (upToDateJob <- upToDateJobs) - jobGraph.removeEdge(upToDateJob) + readyJobs } - /** - * Returns true if the all previous functions in the graph are up to date, and the function is up to date. - */ - private def upToDate(commandLineFunction: CommandLineFunction, upToDateJobs: Set[CommandLineFunction]) = { - this.previousJobs(commandLineFunction).forall(upToDateJobs.contains(_)) && commandLineFunction.upToDate + private def getRunningJobs = { + var runningJobs = List.empty[FunctionEdge] + loop({ + case f: FunctionEdge => { + if (f.status == RunnerStatus.RUNNING) + runningJobs :+= f + } + }) + runningJobs } /** @@ -190,15 +206,15 @@ class QGraph extends Logging { private def validate = { var numMissingValues = 0 JavaConversions.asSet(jobGraph.edgeSet).foreach { - case cmd: CommandLineFunction => - val missingFieldValues = cmd.missingFields + case cmd: FunctionEdge => + val missingFieldValues = cmd.function.missingFields if (missingFieldValues.size > 0) { numMissingValues += missingFieldValues.size - logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine)) + logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description)) for (missing <- missingFieldValues) logger.error(" " + missing) } - case map: MappingFunction => /* do nothing for mapping functions */ + case map: MappingEdge => /* do nothing for mapping edges */ } val detector = new CycleDetector(jobGraph) @@ -215,157 +231,170 @@ class QGraph extends Logging { /** * Runs the jobs by traversing the graph. */ - private def runJobs = { - val runner = if (bsubAllJobs) new LsfJobRunner else new ShellJobRunner + private def runJobs() = { + loop({ case f: FunctionEdge => { + val isDone = this.skipUpToDateJobs && + f.status == RunnerStatus.DONE && + this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) + if (!isDone) + f.resetPending() + }}) - val numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size + var readyJobs = getReadyJobs + var runningJobs = Set.empty[FunctionEdge] + while (readyJobs.size + runningJobs.size > 0) { + var exitedJobs = List.empty[FunctionEdge] + runningJobs.foreach(runner => { + if (runner.status != RunnerStatus.RUNNING) + exitedJobs :+= runner + }) + exitedJobs.foreach(runner => runningJobs -= runner) - logger.info("Number of jobs: %s".format(numJobs)) - if (this.debugMode) { - val numNodes = jobGraph.vertexSet.size - logger.debug("Number of nodes: %s".format(numNodes)) - } - var numNodes = 0 - - loop( - edgeFunction = { case f => runner.run(f, this) }, - nodeFunction = { - case node => { - if (this.debugMode) - logger.debug("Visiting: " + node) - numNodes += 1 + readyJobs.foreach(f => { + f.runner = newRunner(f.function) + f.runner.start() + if (f.status == RunnerStatus.RUNNING) { + runningJobs += f } }) - if (this.debugMode) - logger.debug("Done walking %s nodes.".format(numNodes)) - - if (bsubAllJobs && bsubWaitJobs) { - logger.info("Waiting for jobs to complete.") - val wait = new DispatchWaitFunction - wait.qSettings = this.qSettings - wait.freeze - runner.run(wait, this) + if (readyJobs.size == 0 && runningJobs.size > 0) + Thread.sleep(30000L) + readyJobs = getReadyJobs } } + private def newRunner(f: QFunction) = { + if (this.dryRun) + new DryRunner(f) + else { + f match { + case cmd: CommandLineFunction => + if (this.bsubAllJobs) + new LsfJobRunner(cmd) + else + new ShellJobRunner(cmd) + case inProc: InProcessFunction => + new InProcessRunner(inProc) + case _ => + throw new QException("Unexpected function: " + f) + } + } + } + + /** + * Tracks analysis status. + */ + private class AnalysisStatus(val analysisName: String) { + var status = RunnerStatus.PENDING + var scatter = new ScatterGatherStatus + var gather = new ScatterGatherStatus + } + + /** + * Tracks scatter gather status. + */ + private class ScatterGatherStatus { + var total = 0 + var done = 0 + var failed = 0 + } + /** * Gets job statuses by traversing the graph and looking for status-related files */ - private def runStatus = { - var statuses: HashMap[String,HashMap[String,Int]] = new HashMap[String,HashMap[String,Int]] - loop( - edgeFunction = { case edgeCLF => { - if ( edgeCLF.analysisName != null && ! edgeCLF.outputs.forall(file => file.getName.endsWith(".out") || file.getName.endsWith(".err") )) { - if ( ! statuses.keySet.contains(edgeCLF.analysisName) ) { - statuses.put(edgeCLF.analysisName,emptyStatusMap) - } - updateMap(statuses(edgeCLF.analysisName),edgeCLF) - } - } - }) - formatStatus(statuses) - } + private def logStatus = { + var statuses = Map.empty[String, AnalysisStatus] + loop({ + case edgeCLF: FunctionEdge if (edgeCLF.function.analysisName != null) => + updateStatus(statuses.get(edgeCLF.function.analysisName) match { + case Some(status) => status + case None => + val status = new AnalysisStatus(edgeCLF.function.analysisName) + statuses += edgeCLF.function.analysisName -> status + status + }, edgeCLF) + }) - /** - * Creates an empty map with keys for status updates, todo -- make this nicer somehow - */ - private def emptyStatusMap = { - var sMap = new HashMap[String,Int] - sMap.put("status",-1) - sMap.put("sgTotal",0) - sMap.put("sgDone",0) - sMap.put("sgRunning",0) - sMap.put("sgFailed",0) - // note -- pending = total - done - run - failed - sMap + statuses.values.toList.sortBy(_.analysisName).foreach(status => { + if (status.scatter.total + status.gather.total > 0) { + var sgStatus = RunnerStatus.PENDING + if (status.scatter.failed + status.gather.failed > 0) + sgStatus = RunnerStatus.FAILED + else if (status.scatter.done + status.gather.done == status.scatter.total + status.gather.total) + sgStatus = RunnerStatus.DONE + else if (status.scatter.done + status.gather.done > 0) + sgStatus = RunnerStatus.RUNNING + status.status = sgStatus + } + + var info = status.analysisName + ": [" + status.status + "]" + if (status.scatter.total + status.gather.total > 1) { + info += formatSGStatus(status.scatter, "s") + info += formatSGStatus(status.gather, "g") + } + logger.info(info) + }) } /** * Updates a status map with scatter/gather status information (e.g. counts) */ - private def updateMap(stats: HashMap[String,Int], clf: CommandLineFunction) = { - if ( clf.isGather ) { - logger.debug(clf.analysisName+": "+clf.doneOutputs.mkString(", ")) - if ( clf.isDone ) { - stats("status") = 1 - } else { - stats("status") = 0 - } + private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = { + if (edge.function.isInstanceOf[GatherFunction]) { + updateSGStatus(stats.gather, edge) + } else if (edge.function.isInstanceOf[ScatterGatherableFunction]) { + updateSGStatus(stats.scatter, edge) } else { - stats("sgTotal") = (stats("sgTotal") + 1) - if ( clf.isDone ) { - stats("sgDone") = (stats("sgDone") + 1) + stats.status = edge.status + } + } + + private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = { + stats.total += 1 + edge.status match { + case RunnerStatus.DONE => { + stats.done += 1 } + case RunnerStatus.FAILED => { + stats.failed += 1 + } + /* can't tell the difference between pending and running right now! */ + case RunnerStatus.PENDING => + case RunnerStatus.RUNNING => } } /** - * Formats a complete status map (analysis name --> map {string, int}) into nice strings + * Formats a status into nice strings */ - private def formatStatus(stats: HashMap[String,HashMap[String,Int]]) = { - stats.foreach{ case(analysisName, status) => { - var infoStr = analysisName - val doneInt = status("status") - if ( doneInt == 1 ) { - infoStr += " [DONE]" - } else if ( doneInt == 0 ) { - infoStr += " [NOT DONE]" - } else { - infoStr += " [UNKNOWN]" - } - - if ( status("sgTotal") > 0 ) { - val sgTot = status("sgTotal") - val sgDone = status("sgDone") - val sgRun = status("sgRunning") - val sgFailed = status("sgFailed") - val sgPend = (sgTot - sgDone - sgRun - sgFailed) - infoStr += " %dt/%dd/%dr/%dp/%df".format(sgTot,sgDone,sgRun,sgPend,sgFailed) - } - - logger.info(infoStr) - } - - } + private def formatSGStatus(stats: ScatterGatherStatus, prefix: String) = { + " %s:%dt/%dd/%df".format( + prefix, stats.total, stats.done, stats.failed) } /** * Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction. * @return A new graph */ - private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] { - def createEdge(input: QNode, output: QNode) = new MappingFunction(input.files, output.files)}) + private def newGraph = new SimpleDirectedGraph[QNode, QEdge](new EdgeFactory[QNode, QEdge] { + def createEdge(input: QNode, output: QNode) = new MappingEdge(input.files, output.files)}) - /** - * Adds a generic QFunction to the graph. - * @param f Generic QFunction to add to the graph. - */ - private def addFunction(f: QFunction): Unit = { - try { - f match { - case cmd: CommandLineFunction => cmd.qSettings = this.qSettings - case map: MappingFunction => /* do nothing for mapping functions */ - } - f.freeze - val inputs = QNode(f.inputs) - val outputs = QNode(f.outputs) - val newSource = jobGraph.addVertex(inputs) - val newTarget = jobGraph.addVertex(outputs) - val removedEdges = jobGraph.removeAllEdges(inputs, outputs) - val added = jobGraph.addEdge(inputs, outputs, f) - if (this.debugMode) { - logger.debug("Mapped from: " + inputs) - logger.debug("Mapped to: " + outputs) - logger.debug("Mapped via: " + f) - logger.debug("Removed edges: " + removedEdges) - logger.debug("New source?: " + newSource) - logger.debug("New target?: " + newTarget) - logger.debug("") - } - } catch { - case e: Exception => - throw new QException("Error adding function: " + f, e) + private def addEdge(edge: QEdge) = { + val inputs = QNode(edge.inputs) + val outputs = QNode(edge.outputs) + val newSource = jobGraph.addVertex(inputs) + val newTarget = jobGraph.addVertex(outputs) + val removedEdges = jobGraph.removeAllEdges(inputs, outputs) + val added = jobGraph.addEdge(inputs, outputs, edge) + if (this.debugMode) { + logger.debug("Mapped from: " + inputs) + logger.debug("Mapped to: " + outputs) + logger.debug("Mapped via: " + edge) + logger.debug("Removed edges: " + removedEdges) + logger.debug("New source?: " + newSource) + logger.debug("New target?: " + newTarget) + logger.debug("") } } @@ -399,25 +428,17 @@ class QGraph extends Logging { jobGraph.getEdge(QNode(input), QNode(output)) != null || jobGraph.getEdge(QNode(output), QNode(input)) != null if (!hasEdge) - addFunction(new MappingFunction(input, output)) + addEdge(new MappingEdge(input, output)) } - /** - * Returns true if the edge is an internal mapping edge. - * @param edge Edge to check. - * @return true if the edge is an internal mapping edge. - */ - private def isMappingEdge(edge: QFunction) = - edge.isInstanceOf[MappingFunction] - /** * Returns true if the edge is mapping edge that is not needed because it does * not direct input or output from a user generated CommandLineFunction. * @param edge Edge to check. * @return true if the edge is not needed in the graph. */ - private def isFiller(edge: QFunction) = { - if (isMappingEdge(edge)) { + private def isFiller(edge: QEdge) = { + if (edge.isInstanceOf[MappingEdge]) { if (jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0) true else if (jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0) @@ -428,7 +449,7 @@ class QGraph extends Logging { /** * Returns true if the node is not connected to any edges. - * @param node Node (set of files) to check + * @param node Node (set of files) to check. * @return true if this set of files is not needed in the graph. */ private def isOrphan(node: QNode) = @@ -439,12 +460,12 @@ class QGraph extends Logging { * @param edgeFunction Optional function to run for each edge visited. * @param nodeFunction Optional function to run for each node visited. */ - private def loop(edgeFunction: PartialFunction[CommandLineFunction, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { + private def loop(edgeFunction: PartialFunction[QEdge, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { val iterator = new TopologicalOrderIterator(this.jobGraph) - iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { - override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { - case cmd: CommandLineFunction => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) - case map: MappingFunction => /* do nothing for mapping functions */ + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = event.getEdge match { + case cmd: FunctionEdge => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) + case map: MappingEdge => /* do nothing for mapping functions */ } }) iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node)) @@ -460,8 +481,8 @@ class QGraph extends Logging { // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a // todo -- visualizeString() command, or something that shows inputs / outputs - val ve = new org.jgrapht.ext.EdgeNameProvider[QFunction] { - def getEdgeName( function: QFunction ) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") + val ve = new org.jgrapht.ext.EdgeNameProvider[QEdge] { + def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") } //val iterator = new TopologicalOrderIterator(qGraph.jobGraph) @@ -469,4 +490,35 @@ class QGraph extends Logging { out.close } + + /** + * Returns true if any of the jobs in the graph have a status of failed. + * @return true if any of the jobs in the graph have a status of failed. + */ + def hasFailed = { + this.jobGraph.edgeSet.exists(edge => { + edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED + }) + } + + /** + * Kills any forked jobs still running. + */ + def shutdown() { + val lsfJobs = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner].job) + if (lsfJobs.size > 0) { + for (jobs <- lsfJobs.grouped(10)) { + try { + val bkill = new LsfKillJob(jobs) + logger.info(bkill.command) + bkill.run() + } catch { + case jee: JobExitException => + logger.error("Unable to kill all jobs:%n%s".format(jee.getMessage)) + case e => + logger.error("Unable to kill jobs.", e) + } + } + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala new file mode 100644 index 000000000..78c834616 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala @@ -0,0 +1,8 @@ +package org.broadinstitute.sting.queue.engine + +object RunnerStatus extends Enumeration { + val PENDING = Value("pending") + val RUNNING = Value("running") + val FAILED = Value("failed") + val DONE = Value("done") +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index e2088c6b4..88bc24a63 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -1,18 +1,19 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.util.{Logging, ShellJob} +import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob} import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs one at a time locally */ -class ShellJobRunner extends JobRunner with Logging { +class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Logging { + private var runStatus: RunnerStatus.Value = _ + /** * Runs the function on the local shell. * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. */ - def run(function: CommandLineFunction, qGraph: QGraph) = { + def start() = { val job = new ShellJob job.command = function.commandLine job.workingDir = function.commandDirectory @@ -20,10 +21,11 @@ class ShellJobRunner extends JobRunner with Logging { job.errorFile = function.jobErrorFile if (logger.isDebugEnabled) { - logger.debug(function.commandDirectory + " > " + function.commandLine) + logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine) } else { - logger.info(function.commandLine) + logger.info("Starting: " + function.commandLine) } + logger.info("Output written to " + function.jobOutputFile) if (function.jobErrorFile != null) { logger.info("Errors written to " + function.jobErrorFile) @@ -32,7 +34,25 @@ class ShellJobRunner extends JobRunner with Logging { logger.info("Errors also written to " + function.jobOutputFile) } - if (!qGraph.dryRun) - job.run + function.doneOutputs.foreach(_.delete) + function.failOutputs.foreach(_.delete) + runStatus = RunnerStatus.RUNNING + try { + job.run() + function.doneOutputs.foreach(_.createNewFile) + runStatus = RunnerStatus.DONE + logger.info("Done: " + function.commandLine) + } catch { + case e: JobExitException => + runStatus = RunnerStatus.FAILED + try { + function.failOutputs.foreach(_.createNewFile) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.commandLine, e) + } } + + def status = runStatus } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala index 01de9c8f9..40beec1fb 100755 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala @@ -4,5 +4,5 @@ package org.broadinstitute.sting.queue.extensions.gatk * Splits intervals by contig instead of evenly. */ class ContigScatterFunction extends IntervalScatterFunction { - splitIntervalsScript = "splitIntervalsByContig.py" + splitByContig = true } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index dfb94d48f..ff8cc4c7c 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -1,16 +1,82 @@ package org.broadinstitute.sting.queue.extensions.gatk -import org.broadinstitute.sting.commandline.Argument -import org.broadinstitute.sting.queue.function.scattergather.ScatterFunction +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.commandline.ArgumentSource +import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, ScatterFunction} +import org.broadinstitute.sting.utils.interval.IntervalUtils +import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceDataSource +import java.io.File +import net.sf.picard.util.IntervalList +import net.sf.samtools.SAMFileHeader +import collection.JavaConversions._ +import org.broadinstitute.sting.utils.{GenomeLoc, GenomeLocSortedSet, GenomeLocParser} /** - * An interval scatter function that allows the script to be swapped out. - * The syntax of the script must be: - * [.. ] + * An interval scatter function. */ -class IntervalScatterFunction extends ScatterFunction { - @Argument(doc="Interval split script") - var splitIntervalsScript: String = "splitIntervals.sh" +class IntervalScatterFunction extends ScatterFunction with InProcessFunction { + var splitByContig = false - def commandLine = "%s %s%s".format(splitIntervalsScript, originalInput, repeat(" ", scatterParts)) + private var referenceSequence: File = _ + private var intervals: List[String] = Nil + + override def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = { + val gatk = originalFunction.asInstanceOf[CommandLineGATK] + referenceSequence = gatk.reference_sequence + intervals = gatk.intervalsString + if (gatk.intervals != null) + intervals ::= gatk.intervals.toString + } + + def run() = { + val referenceSource = new ReferenceDataSource(referenceSequence) + GenomeLocParser.setupRefContigOrdering(referenceSource.getReference); + val locs = { + // TODO: Abstract genome analysis engine has richer logic for parsing. We need to use it! + if (intervals.size == 0) { + GenomeLocSortedSet.createSetFromSequenceDictionary(referenceSource.getReference.getSequenceDictionary).toList + } else { + IntervalUtils.parseIntervalArguments(intervals, false) + } + } + + val fileHeader = new SAMFileHeader + fileHeader.setSequenceDictionary(referenceSource.getReference.getSequenceDictionary) + + var intervalList: IntervalList = null + var fileIndex = -1 + var locIndex = 0 + + if (splitByContig) { + var contig: String = null + for (loc <- locs) { + if (contig != loc.getContig && (fileIndex + 1) < scatterParts.size) { + if (fileIndex >= 0) + intervalList.write(scatterParts(fileIndex)) + fileIndex += 1 + intervalList = new IntervalList(fileHeader) + } + locIndex += 1 + intervalList.add(toInterval(loc, locIndex)) + } + intervalList.write(scatterParts(fileIndex)) + } else { + var locsPerFile = locs.size / this.scatterParts.size + if (locs.size % this.scatterParts.size != 0) locsPerFile += 1 + for (loc <- locs) { + if (locIndex % locsPerFile == 0) { + if (fileIndex >= 0) + intervalList.write(scatterParts(fileIndex)) + fileIndex += 1 + intervalList = new IntervalList(fileHeader) + } + locIndex += 1 + intervalList.add(toInterval(loc, locIndex)) + } + intervalList.write(scatterParts(fileIndex)) + } + } + + private def toInterval(loc: GenomeLoc, locIndex: Int) = + new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, true, "interval_" + locIndex) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 31d0d5541..07d35c1e1 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -1,12 +1,10 @@ package org.broadinstitute.sting.queue.function import org.broadinstitute.sting.queue.util._ -import java.lang.annotation.Annotation import org.broadinstitute.sting.commandline._ import java.io.File import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather} -import org.broadinstitute.sting.queue.{QSettings, QException} /** * A command line that will be run in a pipeline. @@ -14,24 +12,12 @@ import org.broadinstitute.sting.queue.{QSettings, QException} trait CommandLineFunction extends QFunction with Logging { def commandLine: String - /** Analysis function name */ - var analysisName: String = _ - - /** Set to true if this is a standalone or gather function */ - var isGather: Boolean = true - - /** Default settings */ - var qSettings: QSettings = _ - /** Upper memory limit */ var memoryLimit: Option[Int] = None /** Whether a job is restartable */ var jobRestartable = true - /** Directory to run the command in. */ - var commandDirectory: File = IOUtils.CURRENT_DIR - /** Prefix for automatic job name creation */ var jobNamePrefix: String = _ @@ -45,14 +31,7 @@ trait CommandLineFunction extends QFunction with Logging { var jobQueue: String = _ /** Temporary directory to write any files */ - var jobTempDir: File = new File(System.getProperty("java.io.tmpdir")) - - /** If true this function will run only if the jobs it is dependent on succeed. */ - var jobRunOnlyIfPreviousSucceed = true - - /** Files that this job should wait on before running. */ - @Input(doc="Explicit job dependencies", required=false) - var jobDependencies: List[File] = Nil + var jobTempDir: File = IOUtils.javaTempDir /** File to redirect any output. Defaults to .out */ @Output(doc="File to redirect any output", required=false) @@ -64,15 +43,6 @@ trait CommandLineFunction extends QFunction with Logging { @Gather(classOf[SimpleTextGatherFunction]) var jobErrorFile: File = _ - /** The complete list of fields on this CommandLineFunction. */ - lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList - /** The @Input fields on this CommandLineFunction. */ - lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) - /** The @Output fields on this CommandLineFunction. */ - lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) - /** The @Argument fields on this CommandLineFunction. */ - lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) - /** * Returns set of directories required to run the command. * @return Set of directories required to run the command. @@ -87,179 +57,20 @@ trait CommandLineFunction extends QFunction with Logging { dirs } - /** - * Returns the input files for this function. - * @return Set[File] inputs for this function. - */ - def inputs = getFieldFiles(inputFields) + override protected def useStatusOutput(file: File) = + file != jobOutputFile && file != jobErrorFile - /** - * Returns the output files for this function. - * @return Set[File] outputs for this function. - */ - def outputs = getFieldFiles(outputFields) - - def doneOutputs = getDoneFiles(outputFields) - - /** - * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. - * @param fields Fields to get files. - * @return Set[File] for the fields. - */ - private def getFieldFiles(fields: List[ArgumentSource]): Set[File] = { - var files = Set.empty[File] - for (field <- fields) - files ++= getFieldFiles(field) - files - } - - /** - * Returns true if all outputs already exist and are older that the inputs. - * If there are no outputs then returns false. - * @return true if all outputs already exist and are older that the inputs. - */ - def upToDate = { - val inputFiles = inputs - if ( doneOutputs.size > 0 && doneOutputs.forall(_.exists) ) { - val maxInput = inputFiles.foldLeft(Long.MinValue)((date,file) => date.max(file.lastModified)) - val minDone = doneOutputs.foldLeft(Long.MaxValue)((date,file) => date.min(file.lastModified)) - maxInput < minDone - } else false - //val inputFiles = inputs - //val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile)) - //if (outputFiles.size > 0 && outputFiles.forall(_.exists)) { - // val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified)) - // val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified)) - // maxInput < minOutput - //} else false - } - - /** - * Gets the files from the field. The field must be a File, a FileExtension, or a List or Set of either. - * @param fields Field to get files. - * @return Set[File] for the field. - */ - def getFieldFiles(field: ArgumentSource): Set[File] = { - var files = Set.empty[File] - CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { - val file = fieldValueToFile(field, fieldValue) - if (file != null) - files += file - }) - files - } - - /** - * Gets the done files from the field. The field must be a File, a FileExtension, or a List or Set of either. - * @param fields Field to get files. - * @return Set[File] set of done files for the field. - */ - private def getDoneFiles(fields: List[ArgumentSource]): Set[File] = { - var doneFiles = Set.empty[File] - for ( field <- fields ) { - CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { - val outFile = fieldValueToFile(field,fieldValue) - if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory && ! outFile.getName.endsWith(".out")) { - doneFiles += new File(outFile.getParent + "/." + outFile.getName + ".done") - } - }) - } - doneFiles - - //for ( outFile <- outFiles ) { - // if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory ) - // doneFiles += new File(outFile.getParent + "." + outFile.getName + ".done") - //} - //doneFiles - } - - def isDone = { - doneOutputs.size == 0 || doneOutputs.forall(_.exists) - } - - /** - * Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different - * @return boolean -- if files are different - */ - private def filesAreDifferent(a: File, b: File): Boolean = { - if ( b == null ) - if ( a == null ) - return false - else - return true - else - if ( a == null ) - return true - else - return ! b.getAbsolutePath.equals(a.getAbsolutePath) - } - - /** - * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. - * @param field Field to get the file. - * @return File for the field. - */ - def getFieldFile(field: ArgumentSource): File = - fieldValueToFile(field, getFieldValue(field)) - - /** - * Converts the field value to a file. The field must be a File or a FileExtension. - * @param field Field to get the file. - * @param value Value of the File or FileExtension or null. - * @return Null if value is null, otherwise the File. - * @throws QException if the value is not a File or FileExtension. - */ - private def fieldValueToFile(field: ArgumentSource, value: Any): File = value match { - case file: File => file - case null => null - case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown)) - } - - /** - * Resets the field to the temporary directory. - * @param field Field to get and set the file. - * @param tempDir new root for the file. - */ - def resetFieldFile(field: ArgumentSource, tempDir: File): File = { - getFieldValue(field) match { - case fileExtension: FileExtension => { - val newFile = IOUtils.resetParent(tempDir, fileExtension) - val newFileExtension = fileExtension.withPath(newFile.getPath) - setFieldValue(field, newFileExtension) - newFileExtension - } - case file: File => { - if (file.getClass != classOf[File]) - throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); - val newFile = IOUtils.resetParent(tempDir, file) - setFieldValue(field, newFile) - newFile - } - case null => null - case unknown => - throw new QException("Unable to set file from %s: %s".format(field, unknown)) - } - } + override def description = commandLine /** * The function description in .dot files */ override def dotString = jobName + " => " + commandLine - /** - * Sets all field values and makes them canonical so that the graph can - * match the inputs of one function to the output of another using equals(). - */ - final override def freeze = { - freezeFieldValues - canonFieldValues - super.freeze - } - /** * Sets all field values. */ - def freezeFieldValues = { + override def freezeFieldValues = { if (jobNamePrefix == null) jobNamePrefix = qSettings.jobNamePrefix @@ -272,57 +83,15 @@ trait CommandLineFunction extends QFunction with Logging { if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined) memoryLimit = qSettings.memoryLimit - if (qSettings.runJobsIfPrecedingFail) - jobRunOnlyIfPreviousSucceed = false - if (jobName == null) jobName = CommandLineFunction.nextJobName(jobNamePrefix) if (jobOutputFile == null) jobOutputFile = new File(jobName + ".out") - commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory) + super.freezeFieldValues } - /** - * Makes all field values canonical so that the graph can match the - * inputs of one function to the output of another using equals(). - */ - def canonFieldValues = { - for (field <- this.functionFields) { - var fieldValue = this.getFieldValue(field) - fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] - this.setFieldValue(field, fieldValue) - } - } - - /** - * Set value to a uniform value across functions. - * Base implementation changes any relative path to an absolute path. - * @param value to be updated - * @return the modified value, or a copy if the value is immutable - */ - protected def canon(value: Any) = { - value match { - case fileExtension: FileExtension => - val newFile = absolute(fileExtension); - val newFileExtension = fileExtension.withPath(newFile.getPath) - newFileExtension - case file: File => - if (file.getClass != classOf[File]) - throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); - absolute(file) - case x => x - } - } - - /** - * Returns the absolute path to the file relative to the job command directory. - * @param file File to root relative to the command directory if it is not already absolute. - * @return The absolute path to file. - */ - private def absolute(file: File) = IOUtils.subDir(commandDirectory, file) - /** * Repeats parameters with a prefix/suffix if they are set otherwise returns "". * Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString. @@ -350,89 +119,6 @@ trait CommandLineFunction extends QFunction with Logging { format: (String, Any, String) => String = formatValue("%s")) = if (hasValue(param)) format(prefix, param, suffix) else "" - /** - * Returns fields that do not have values which are required. - * @return List[String] names of fields missing values. - */ - def missingFields: List[String] = { - val missingInputs = missingFields(inputFields, classOf[Input]) - val missingOutputs = missingFields(outputFields, classOf[Output]) - val missingArguments = missingFields(argumentFields, classOf[Argument]) - (missingInputs | missingOutputs | missingArguments).toList.sorted - } - - /** - * Returns fields that do not have values which are required. - * @param sources Fields to check. - * @param annotation Annotation. - * @return Set[String] names of fields missing values. - */ - private def missingFields(sources: List[ArgumentSource], annotation: Class[_ <: Annotation]): Set[String] = { - var missing = Set.empty[String] - for (source <- sources) { - if (isRequired(source, annotation)) - if (!hasFieldValue(source)) - if (!exclusiveOf(source, annotation).exists(otherSource => hasFieldValue(otherSource))) - missing += "@%s: %s - %s".format(annotation.getSimpleName, source.field.getName, doc(source, annotation)) - } - missing - } - - /** - * Scala sugar type for checking annotation required and exclusiveOf. - */ - private type ArgumentAnnotation = { - def required(): Boolean - def exclusiveOf(): String - def doc(): String - } - - /** - * Returns the isRequired value from the field. - * @param field Field to check. - * @param annotation Annotation. - * @return the isRequired value from the field annotation. - */ - private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required - - /** - * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field - * @param field Field to check. - * @param annotation Annotation. - * @return the Array[ArgumentSource] that may be set instead of the field. - */ - private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf - .split(",").map(_.trim).filter(_.length > 0) - .map(fieldName => functionFields.find(fieldName == _.field.getName) match { - case Some(x) => x - case None => throw new QException("Unable to find exclusion field %s on %s".format(fieldName, this.getClass.getSimpleName)) - }) - - /** - * Returns the doc value from the field. - * @param field Field to check. - * @param annotation Annotation. - * @return the doc value from the field annotation. - */ - private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc - - /** - * Returns true if the field has a value. - * @param source Field to check for a value. - * @return true if the field has a value. - */ - protected def hasFieldValue(source: ArgumentSource) = this.hasValue(this.getFieldValue(source)) - - /** - * Returns false if the value is null or an empty collection. - * @param value Value to test for null, or a collection to test if it is empty. - * @return false if the value is null, or false if the collection is empty, otherwise true. - */ - private def hasValue(param: Any) = CollectionUtils.isNotNullOrNotEmpty(param) - /** * Returns "" if the value is null or an empty collection, otherwise return the value.toString. * @param format Format string if the value has a value @@ -450,27 +136,6 @@ trait CommandLineFunction extends QFunction with Logging { case x => format.format(x) }) + suffix - /** - * Gets the value of a field. - * @param source Field to get the value for. - * @return value of the field. - */ - def getFieldValue(source: ArgumentSource) = ReflectionUtils.getValue(invokeObj(source), source.field) - - /** - * Gets the value of a field. - * @param source Field to set the value for. - * @return value of the field. - */ - def setFieldValue(source: ArgumentSource, value: Any) = ReflectionUtils.setValue(invokeObj(source), source.field, value) - - /** - * Walks gets the fields in this object or any collections in that object - * recursively to find the object holding the field to be retrieved or set. - * @param source Field find the invoke object for. - * @return Object to invoke the field on. - */ - private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _)) } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala deleted file mode 100644 index 893345ad7..000000000 --- a/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala +++ /dev/null @@ -1,15 +0,0 @@ -package org.broadinstitute.sting.queue.function - -import java.io.File - -/** An internal class that is used by bsub to wait on all other jobs before exiting. */ -class DispatchWaitFunction extends CommandLineFunction { - /** - * Returns the command line "echo". - * @return echo - */ - def commandLine = "echo" - - jobQueue = "hour" - jobOutputFile = File.createTempFile("Q-wait", ".out") -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala new file mode 100644 index 000000000..42da4774d --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -0,0 +1,12 @@ +package org.broadinstitute.sting.queue.function + +import java.io.File + +/** + * Runs a function in process. + */ +trait InProcessFunction extends QFunction { + def run() + protected def useStatusOutput(file: File) = true + def description = this.getClass.getSimpleName +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 68a4bf4bc..4bc120d39 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -1,6 +1,11 @@ package org.broadinstitute.sting.queue.function import java.io.File +import java.lang.annotation.Annotation +import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util.{CollectionUtils, IOUtils, ReflectionUtils} +import org.broadinstitute.sting.queue.{QException, QSettings} +import collection.JavaConversions._ /** * The base interface for all functions in Queue. @@ -9,24 +14,300 @@ import java.io.File */ trait QFunction { /** - * After a function is frozen no more updates are allowed by the user. - * The function is allow to make necessary updates internally to make sure - * the inputs and outputs will be equal to other inputs and outputs. + * Analysis function name */ - def freeze = {} + var analysisName: String = _ + + /** Default settings */ + var qSettings: QSettings = _ + + /** Directory to run the command in. */ + var commandDirectory: File = IOUtils.CURRENT_DIR /** - * Set of inputs for this function. + * Description of this command line function. */ - def inputs: Set[File] - - /** - * Set of outputs for this function. - */ - def outputs: Set[File] + def description: String /** * The function description in .dot files */ def dotString = "" + + protected def useStatusOutput(file: File): Boolean + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + private def statusPaths = outputs + .filter(file => useStatusOutput(file)) + .map(file => file.getParentFile + "/." + file.getName) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def doneOutputs = statusPaths.map(path => new File(path + ".done")) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def failOutputs = statusPaths.map(path => new File(path + ".fail")) + + /** The complete list of fields on this CommandLineFunction. */ + lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList + /** The @Input fields on this CommandLineFunction. */ + lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) + /** The @Output fields on this CommandLineFunction. */ + lazy val outputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Output])) + /** The @Argument fields on this CommandLineFunction. */ + lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) + + /** + * Returns the input files for this function. + * @return Set[File] inputs for this function. + */ + def inputs = getFieldFiles(inputFields) + + /** + * Returns the output files for this function. + * @return Set[File] outputs for this function. + */ + def outputs = getFieldFiles(outputFields) + + /** + * Returns fields that do not have values which are required. + * @return List[String] names of fields missing values. + */ + def missingFields: List[String] = { + val missingInputs = missingFields(inputFields, classOf[Input]) + val missingOutputs = missingFields(outputFields, classOf[Output]) + val missingArguments = missingFields(argumentFields, classOf[Argument]) + (missingInputs | missingOutputs | missingArguments).toList.sorted + } + + /** + * Returns fields that do not have values which are required. + * @param sources Fields to check. + * @param annotation Annotation. + * @return Set[String] names of fields missing values. + */ + private def missingFields(sources: List[ArgumentSource], annotation: Class[_ <: Annotation]): Set[String] = { + var missing = Set.empty[String] + for (source <- sources) { + if (isRequired(source, annotation)) + if (!hasFieldValue(source)) + if (!exclusiveOf(source, annotation).exists(otherSource => hasFieldValue(otherSource))) + missing += "@%s: %s - %s".format(annotation.getSimpleName, source.field.getName, doc(source, annotation)) + } + missing + } + + /** + * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. + * @param fields Fields to get files. + * @return Set[File] for the fields. + */ + private def getFieldFiles(fields: List[ArgumentSource]): Set[File] = { + var files = Set.empty[File] + for (field <- fields) + files ++= getFieldFiles(field) + files + } + + /** + * Gets the files from the field. The field must be a File, a FileExtension, or a List or Set of either. + * @param fields Field to get files. + * @return Set[File] for the field. + */ + def getFieldFiles(field: ArgumentSource): Set[File] = { + var files = Set.empty[File] + CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { + val file = fieldValueToFile(field, fieldValue) + if (file != null) + files += file + }) + files + } + + /** + * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. + * @param field Field to get the file. + * @return File for the field. + */ + def getFieldFile(field: ArgumentSource): File = + fieldValueToFile(field, getFieldValue(field)) + + /** + * Converts the field value to a file. The field must be a File or a FileExtension. + * @param field Field to get the file. + * @param value Value of the File or FileExtension or null. + * @return Null if value is null, otherwise the File. + * @throws QException if the value is not a File or FileExtension. + */ + private def fieldValueToFile(field: ArgumentSource, value: Any): File = value match { + case file: File => file + case null => null + case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown)) + } + + /** + * Resets the field to the temporary directory. + * @param field Field to get and set the file. + * @param tempDir new root for the file. + */ + def resetFieldFile(field: ArgumentSource, tempDir: File): File = { + getFieldValue(field) match { + case fileExtension: FileExtension => { + val newFile = IOUtils.resetParent(tempDir, fileExtension) + val newFileExtension = fileExtension.withPath(newFile.getPath) + setFieldValue(field, newFileExtension) + newFileExtension + } + case file: File => { + if (file.getClass != classOf[File]) + throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); + val newFile = IOUtils.resetParent(tempDir, file) + setFieldValue(field, newFile) + newFile + } + case null => null + case unknown => + throw new QException("Unable to set file from %s: %s".format(field, unknown)) + } + } + + + /** + * After a function is frozen no more updates are allowed by the user. + * The function is allow to make necessary updates internally to make sure + * the inputs and outputs will be equal to other inputs and outputs. + */ + final def freeze = { + freezeFieldValues + canonFieldValues + } + + def freezeFieldValues = { + commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory) + } + + /** + * Makes all field values canonical so that the graph can match the + * inputs of one function to the output of another using equals(). + */ + def canonFieldValues = { + for (field <- this.functionFields) { + var fieldValue = this.getFieldValue(field) + fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] + this.setFieldValue(field, fieldValue) + } + } + + /** + * Set value to a uniform value across functions. + * Base implementation changes any relative path to an absolute path. + * @param value to be updated + * @return the modified value, or a copy if the value is immutable + */ + protected def canon(value: Any) = { + value match { + case fileExtension: FileExtension => + val newFile = absolute(fileExtension); + val newFileExtension = fileExtension.withPath(newFile.getPath) + newFileExtension + case file: File => + if (file.getClass != classOf[File]) + throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); + absolute(file) + case x => x + } + } + + /** + * Returns the absolute path to the file relative to the job command directory. + * @param file File to root relative to the command directory if it is not already absolute. + * @return The absolute path to file. + */ + private def absolute(file: File) = IOUtils.subDir(commandDirectory, file) + + + /** + * Scala sugar type for checking annotation required and exclusiveOf. + */ + private type ArgumentAnnotation = { + def required(): Boolean + def exclusiveOf(): String + def doc(): String + } + + /** + * Returns the isRequired value from the field. + * @param field Field to check. + * @param annotation Annotation. + * @return the isRequired value from the field annotation. + */ + private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required + + /** + * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field + * @param field Field to check. + * @param annotation Annotation. + * @return the Array[ArgumentSource] that may be set instead of the field. + */ + private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf + .split(",").map(_.trim).filter(_.length > 0) + .map(fieldName => functionFields.find(fieldName == _.field.getName) match { + case Some(x) => x + case None => throw new QException("Unable to find exclusion field %s on %s".format(fieldName, this.getClass.getSimpleName)) + }) + + /** + * Returns the doc value from the field. + * @param field Field to check. + * @param annotation Annotation. + * @return the doc value from the field annotation. + */ + private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc + + /** + * Returns true if the field has a value. + * @param source Field to check for a value. + * @return true if the field has a value. + */ + protected def hasFieldValue(source: ArgumentSource) = this.hasValue(this.getFieldValue(source)) + + /** + * Returns false if the value is null or an empty collection. + * @param value Value to test for null, or a collection to test if it is empty. + * @return false if the value is null, or false if the collection is empty, otherwise true. + */ + protected def hasValue(param: Any) = CollectionUtils.isNotNullOrNotEmpty(param) + + /** + * Gets the value of a field. + * @param source Field to get the value for. + * @return value of the field. + */ + def getFieldValue(source: ArgumentSource) = ReflectionUtils.getValue(invokeObj(source), source.field) + + /** + * Gets the value of a field. + * @param source Field to set the value for. + * @return value of the field. + */ + def setFieldValue(source: ArgumentSource, value: Any) = ReflectionUtils.setValue(invokeObj(source), source.field, value) + + /** + * Walks gets the fields in this object or any collections in that object + * recursively to find the object holding the field to be retrieved or set. + * @param source Field find the invoke object for. + * @return Object to invoke the field on. + */ + private def invokeObj(source: ArgumentSource) = source.parentFields.foldLeft[AnyRef](this)(ReflectionUtils.getValue(_, _)) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala index 7b8e49501..0e0123b16 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala @@ -1,8 +1,9 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File -import org.broadinstitute.sting.commandline.{Argument, Input} +import org.broadinstitute.sting.commandline.Input +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.apache.commons.io.FileUtils /** * Removes the temporary directories for scatter / gather. @@ -10,17 +11,12 @@ import org.broadinstitute.sting.commandline.{Argument, Input} * By default uses rm -rf. * The format of the call is [.. ] */ -class CleanupTempDirsFunction extends CommandLineFunction { +class CleanupTempDirsFunction extends InProcessFunction { @Input(doc="Original outputs of the gather functions") var originalOutputs: Set[File] = Set.empty[File] @Input(doc="Temporary directories to be deleted") var tempDirectories: List[File] = Nil - @Argument(doc="rmdir script or command") - var rmdirScript = "rm -rf" - - override def upToDate = tempDirectories.forall(!_.exists) - - def commandLine = "%s%s".format(rmdirScript, repeat(" '", tempDirectories, "'")) + def run() = tempDirectories.foreach(FileUtils.deleteDirectory(_)) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala index 2b2ae420f..b257616d1 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -1,8 +1,8 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File -import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.commandline.{Argument, Output, Input} +import org.broadinstitute.sting.commandline.{Output, Input} +import org.broadinstitute.sting.queue.function.InProcessFunction /** * Creates the temporary directories for scatter / gather. @@ -10,22 +10,14 @@ import org.broadinstitute.sting.commandline.{Argument, Output, Input} * By default uses mkdir -pv * The format of the call is [.. ] */ -class CreateTempDirsFunction extends CommandLineFunction { +class CreateTempDirsFunction extends InProcessFunction { @Input(doc="Original inputs to the scattered function") var originalInputs: Set[File] = Set.empty[File] @Output(doc="Temporary directories to create") var tempDirectories: List[File] = Nil - @Argument(doc="mkdir script or command") - var mkdirScript = "mkdir -pv" + override protected def useStatusOutput(file: File) = false - override def upToDate = tempDirectories.forall(_.exists) - - def commandLine = "%s%s".format(mkdirScript, repeat(" '", tempDirectories, "'")) - - /** - * This function is creating the directories, so returns just this command directory. - */ - override def jobDirectories = Set(commandDirectory) + def run() = tempDirectories.foreach(_.mkdirs) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index f5886865a..96030e99f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -1,13 +1,13 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.{CommandLineFunction} import java.io.File import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.queue.function.QFunction /** * Base class for Gather command line functions. */ -trait GatherFunction extends CommandLineFunction { +trait GatherFunction extends QFunction { @Input(doc="Parts to gather back into the original output") var gatherParts: List[File] = Nil diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index b0a8ab794..d1cfd4916 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -1,13 +1,13 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.queue.function.QFunction /** * Base class for Scatter command line functions. */ -trait ScatterFunction extends CommandLineFunction { +trait ScatterFunction extends QFunction { @Input(doc="Original input to scatter") var originalInput: File = _ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 883e6f95e..a003b2737 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.commandline.ArgumentSource -import org.broadinstitute.sting.queue.function.CommandLineFunction import com.rits.cloning.Cloner +import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** * A function that can be run faster by splitting it up into pieces and then joining together the results. @@ -84,10 +84,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { * Returns a list of scatter / gather and clones of this function * that can be run in parallel to produce the same output as this * command line function. - * @return List[CommandLineFunction] to run instead of this function. + * @return List[QFunction] to run instead of this function. */ def generateFunctions() = { - var functions = List.empty[CommandLineFunction] + var functions = List.empty[QFunction] var tempDirectories = List.empty[File] // Only depend on input fields that have a value @@ -112,7 +112,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { } // Create the clone functions for running the parallel jobs - var cloneFunctions = List.empty[CommandLineFunction] + var cloneFunctions = List.empty[ScatterGatherableFunction] for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() initCloneFunction(cloneFunction, i) @@ -290,7 +290,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) this.setupCloneFunction(cloneFunction, index) - cloneFunction.isGather = false } /** @@ -361,7 +360,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param Sub directory under the scatter gather directory. * @return temporary directory under this scatter gather directory. */ - private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-" + subDir) + private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) } /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala index 9a5681e4d..6e35221a5 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala @@ -1,6 +1,11 @@ package org.broadinstitute.sting.queue.function.scattergather import org.broadinstitute.sting.commandline.Argument +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.apache.commons.io.FileUtils +import org.broadinstitute.sting.queue.QException +import collection.JavaConversions._ +import java.io.PrintWriter /** * Merges a text file. @@ -8,9 +13,50 @@ import org.broadinstitute.sting.commandline.Argument * By default uses mergeText.sh in Sting/shell. * The format of the call is [.. ] */ -class SimpleTextGatherFunction extends GatherFunction { +class SimpleTextGatherFunction extends GatherFunction with InProcessFunction { @Argument(doc="merge text script") var mergeTextScript = "mergeText.sh" - def commandLine = "%s %s%s".format(mergeTextScript, originalOutput, repeat(" ", gatherParts)) + + def run() = { + if (gatherParts.size < 1) { + throw new QException("No files to gather to output: " + originalOutput) + } else if (gatherParts.size == 1) { + FileUtils.copyFile(gatherParts(0), originalOutput) + } else { + val writer = new PrintWriter(originalOutput) + var startLine = 0 + + val readerA = FileUtils.lineIterator(gatherParts(0)) + val readerB = FileUtils.lineIterator(gatherParts(1)) + var headersMatch = true + while (headersMatch) { + if (readerA.hasNext && readerB.hasNext) { + val headerA = readerA.nextLine + val headerB = readerB.nextLine + headersMatch = headerA == headerB + if (headersMatch) { + startLine += 1 + writer.println(headerA) + } + } else { + headersMatch = false + } + } + readerA.close + readerB.close + + for (file <- gatherParts) { + val reader = FileUtils.lineIterator(file) + var lineNum = 0 + while (reader.hasNext && lineNum < startLine) { + reader.nextLine + lineNum += 1 + } + while (reader.hasNext) + writer.println(reader.nextLine) + } + writer.close + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala b/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala index 0b322301e..59a0a600d 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/CommandLineJob.scala @@ -3,7 +3,7 @@ package org.broadinstitute.sting.queue.util import java.io.File /** - * Base class for a command line job. + * Base utility class for a command line job. */ abstract class CommandLineJob { var command: String = _ diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index 52c772827..e61a85677 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -1,7 +1,7 @@ package org.broadinstitute.sting.queue.util -import java.io.{IOException, File} import org.apache.commons.io.FileUtils +import java.io.{FileReader, IOException, File} /** * A collection of utilities for modifying java.io. @@ -107,8 +107,8 @@ object IOUtils { def writeContents(file: File, content: String) = FileUtils.writeStringToFile(file, content) - def writeTempFile(content: String, prefix: String, suffix: String = "") = { - val tempFile = absolute(File.createTempFile(prefix, suffix, networkTempDir)) + def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File = networkTempDir) = { + val tempFile = absolute(File.createTempFile(prefix, suffix, directory)) writeContents(tempFile, content) tempFile } @@ -133,6 +133,12 @@ object IOUtils { directories(level) } + /** + * A mix of getCanonicalFile and getAbsoluteFile that returns the + * absolute path to the file without deferencing symbolic links. + * @param file the file. + * @return the absolute path to the file. + */ def absolute(file: File) = { var fileAbs = file.getAbsoluteFile var names = List.empty[String] @@ -167,4 +173,30 @@ object IOUtils { new File(names.mkString("/", "/", "")) } + + /** + * Returns the last lines of the file. + * NOTE: This is only safe to run on smaller files! + * @param file File to read. + * @param count Maximum number of lines to return. + * @return The last count lines from file. + */ + def tail(file: File, count: Int) = { + var tailLines = List.empty[String] + var reader = new FileReader(file) + try { + val iterator = org.apache.commons.io.IOUtils.lineIterator(reader) + var lineCount = 0 + while (iterator.hasNext) { + val line = iterator.nextLine + lineCount += 1 + if (lineCount > count) + tailLines = tailLines.tail + tailLines :+= line + } + } finally { + org.apache.commons.io.IOUtils.closeQuietly(reader) + } + tailLines + } } diff --git a/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala new file mode 100644 index 000000000..39c12d2a6 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala @@ -0,0 +1,11 @@ +package org.broadinstitute.sting.queue.util + +import org.broadinstitute.sting.queue.QException + +/** + * Captures the exit code and error text from a failed process. + */ +class JobExitException(var exitText: String, var commandLine: Array[String], var exitCode: Int, var stdErr: String) + extends QException("%s%nCommand line:%n%s%nExit code: %s%nStandard error contained: %n%s" + .format(exitText, commandLine.mkString(" "), exitCode, stdErr)) { +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala index f18ad4304..4a19f9fa4 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfJob.scala @@ -2,10 +2,9 @@ package org.broadinstitute.sting.queue.util import java.util.regex.Pattern import collection.JavaConversions._ -import org.broadinstitute.sting.queue.QException /** - * An job submitted to LSF. This class is designed to work somewhat like + * A job submitted to LSF. This class is designed to work somewhat like * java.lang.Process, but has some extensions. * * @author A subset of the original BroadCore ported to scala by Khalid Shakir @@ -24,7 +23,7 @@ class LsfJob extends CommandLineJob with Logging { * Starts the job. Command must exist. The job will be submitted to LSF. */ def run() = { - assert(bsubJobId == null, "LSF job was already started") + assert(bsubJobId == null, "LSF job was already submitted") assert(command != null, "Command was not set on LSF job") assert(outputFile != null, "Output file must be set on LSF job") @@ -33,32 +32,20 @@ class LsfJob extends CommandLineJob with Logging { val stdoutSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) - // This is really nice for debugging, but spits out way too much stuff otherwise! - // log.info("About to execute LSF command: " + StringUtils.join(argArray, " ")); - - // Get environment vars and strip out LD_ASSUME_KERNEL - // This is necessary since GAP servers on linux 2.4.x kernel and can be removed when - // its no longer true. Only 'classic' LSF queue has 2.4 kernel-based machines. - // launch the bsub job from the current directory val processSettings = new ProcessController.ProcessSettings( bsubCommand, environmentVariables, null, stdinSettings, stdoutSettings, stderrSettings, false) val bsubOutput = processController.exec(processSettings) if (bsubOutput.exitValue != 0) { - logger.error("Failed to submit LSF job, got exit code %s. Standard error contained: %n%s" - .format(bsubOutput.exitValue, content(bsubOutput.stderr))) - throw new QException("Failed to submit LSF job, got exit code %s.".format(bsubOutput.exitValue)) + throw new JobExitException("Failed to submit LSF job.", bsubCommand, + bsubOutput.exitValue, content(bsubOutput.stderr)) } // get the LSF job ID val matcher = LsfJob.JOB_ID.matcher(bsubOutput.stdout.content) matcher.find() bsubJobId = matcher.group - - // set job name to LSF_ if not set already - if (name == null) - name = "lsf_job_" + bsubJobId } /** @@ -136,6 +123,12 @@ class LsfJob extends CommandLineJob with Logging { .toMap } +/** + * A job submitted to LSF. This class is designed to work somewhat like + * java.lang.Process, but has some extensions. + * + * @author A subset of the original BroadCore ported to scala by Khalid Shakir + */ object LsfJob { /** Used to search the stdout for the job id. */ private val JOB_ID = Pattern.compile("\\d+") diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala new file mode 100644 index 000000000..e53681652 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala @@ -0,0 +1,26 @@ +package org.broadinstitute.sting.queue.util + +/** + * bkills a list of lsf jobs. + */ +class LsfKillJob(jobs: List[LsfJob]) extends CommandLineJob with Logging { + command = "bkill " + jobs.map(_.bsubJobId).mkString(" ") + + def run() = { + // capture the output for debugging + val stdinSettings = new ProcessController.InputStreamSettings(null, null) + val stdoutSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) + val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, null, false) + + val bkillCommand = (List("bkill") ++ jobs.map(_.bsubJobId)).toArray + + // launch the bsub job from the current directory + val processSettings = new ProcessController.ProcessSettings( + bkillCommand, null, null, stdinSettings, stdoutSettings, stderrSettings, false) + val bkillOutput = processController.exec(processSettings) + + if (bkillOutput.exitValue != 0) { + throw new JobExitException("Failed to kill LSF jobs.", bkillCommand, bkillOutput.exitValue, content(bkillOutput.stderr)) + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala b/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala index 9e278302b..92b63c628 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ProcessController.scala @@ -253,14 +253,14 @@ object ProcessController extends Logging { private val STDERR_KEY = "stderr" /** Tracks running processes so that they can be killed as the JVM shuts down. */ - private val running = new HashSet[Process]() - Runtime.getRuntime.addShutdownHook(new Thread { - /** Kills running processes as the JVM shuts down. */ - override def run = for (process <- running.clone) { + private val running = new HashSet[Process] + + def shutdown() = { + for (process <- running.clone) { logger.warn("Killing: " + process) process.destroy } - }) + } /** Empty stream settings used when no output is requested. */ private object EmptyStreamSettings extends OutputStreamSettings(0, null, false) diff --git a/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala b/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala index b9458f4c6..863d2a2ea 100755 --- a/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ShellJob.scala @@ -17,16 +17,15 @@ class ShellJob extends CommandLineJob with Logging { val stdinSettings = new ProcessController.InputStreamSettings(null, this.inputFile) val stdoutSettings = new ProcessController.OutputStreamSettings(bufferSize, this.outputFile, true) val stderrSettings = new ProcessController.OutputStreamSettings(FIVE_MB, errorFile, true) + val commandLine = Array("sh", "-c", command) val processSettings = new ProcessController.ProcessSettings( - Array("sh", "-c", command), null, this.workingDir, stdinSettings, stdoutSettings, stderrSettings, redirectError) + commandLine, null, this.workingDir, stdinSettings, stdoutSettings, stderrSettings, redirectError) val output = processController.exec(processSettings) if (output.exitValue != 0) { val streamOutput = if (redirectError) output.stdout else output.stderr - logger.error("Failed to run job, got exit code %s. Error contained: %n%s" - .format(output.exitValue, content(streamOutput))) - throw new QException("Failed to run job, got exit code %s.".format(output.exitValue)) + throw new JobExitException("Failed to run job.", commandLine, output.exitValue, content(streamOutput)) } } } diff --git a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala index 255ebe79b..6982f4978 100644 --- a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala @@ -99,4 +99,18 @@ class IOUtilsUnitTest extends BaseTest { dir = IOUtils.absolute(new File("/./.directory/")) Assert.assertEquals(new File("/.directory"), dir) } + + @Test + def testTail = { + val lines = List( + "chr18_random 4262 3154410390 50 51", + "chr19_random 301858 3154414752 50 51", + "chr21_random 1679693 3154722662 50 51", + "chr22_random 257318 3156435963 50 51", + "chrX_random 1719168 3156698441 50 51") + val tail = IOUtils.tail(new File(BaseTest.hg18Reference + ".fai"), 5) + Assert.assertEquals(5, tail.size) + for (i <- 0 until 5) + Assert.assertEquals(lines(i), tail(i)) + } }