From db47230dd95d28845e448878637e2538aa69883d Mon Sep 17 00:00:00 2001 From: kshakir Date: Thu, 7 Oct 2010 01:19:18 +0000 Subject: [PATCH] Wrapping ScatterGatherableFunctions with a facade instead of using slower clone library. Will require keeping Clone's facade code in sync with CommandLineFunction but runs *much* faster. Shell invoking scripts so that even really long shell scripts make it through LSF. Using the truncated (up to 1000 characters) of the command line for the job name for use with bjobs. Switched the default from re-running everything to re-running only files that need to be regenerated. --skip_up_to_date replaced with --start_clean for those who want to regenerate everything. Updated logging to let users know when the scatter gather generator is running, which still takes a while but is orders of magnatudes faster for large lists of functions. (40s for a 100 function graph exploding to a 2500 function graph) git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4448 348d0f76-0448-11de-a6fe-93d51630548a --- build.xml | 2 +- ivy.xml | 3 - .../sting/queue/QCommandLine.scala | 8 +- .../sting/queue/engine/InProcessRunner.scala | 8 +- .../sting/queue/engine/LsfJobRunner.scala | 34 ++++- .../sting/queue/engine/QGraph.scala | 116 +++++++++--------- .../sting/queue/engine/ShellJobRunner.scala | 11 +- .../queue/function/CommandLineFunction.scala | 2 +- .../queue/function/InProcessFunction.scala | 2 +- .../sting/queue/function/QFunction.scala | 13 +- .../scattergather/CloneFunction.scala | 84 +++++++++++++ .../CreateTempDirsFunction.scala | 2 +- .../scattergather/GatherFunction.scala | 4 +- .../scattergather/ScatterFunction.scala | 4 +- .../ScatterGatherableFunction.scala | 39 +++--- .../sting/queue/util/LsfKillJob.scala | 2 +- settings/ivysettings.xml | 2 - 17 files changed, 221 insertions(+), 115 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala diff --git a/build.xml b/build.xml index 070eab192..dea30a7a9 100644 --- a/build.xml +++ b/build.xml @@ -549,7 +549,7 @@ - + diff --git a/ivy.xml b/ivy.xml index a078c0f40..9bb536839 100644 --- a/ivy.xml +++ b/ivy.xml @@ -40,9 +40,6 @@ - - - diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 12e24af96..bd361c610 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -26,8 +26,8 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) private var expandedDotFile: File = _ - @Argument(fullName="skip_up_to_date", shortName="skipUpToDate", doc="Does not run command line functions that don't depend on other jobs if the outputs exist and are older than the inputs.", required=false) - private var skipUpToDate = false + @Argument(fullName="start_clean", shortName="clean", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) + private var startClean = false @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden private var runScripts = false @@ -47,7 +47,7 @@ class QCommandLine extends CommandLineProgram with Logging { val qGraph = new QGraph qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs - qGraph.skipUpToDateJobs = skipUpToDate + qGraph.startClean = startClean qGraph.dotFile = dotFile qGraph.expandedDotFile = expandedDotFile qGraph.qSettings = qSettings @@ -71,10 +71,8 @@ class QCommandLine extends CommandLineProgram with Logging { }) if ( ! getStatus ) { - logger.info("Running generated graph") qGraph.run } else { - logger.info("Checking pipeline status") qGraph.checkStatus } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index 22c65bf2e..ea372a439 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -16,19 +16,19 @@ class InProcessRunner(function: InProcessFunction) extends JobRunner with Loggin logger.info("Starting: " + function.description) } - function.doneOutputs.foreach(_.delete) - function.failOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) runStatus = RunnerStatus.RUNNING try { function.run() - function.doneOutputs.foreach(_.createNewFile) + function.doneOutputs.foreach(_.createNewFile()) runStatus = RunnerStatus.DONE logger.info("Done: " + function.description) } catch { case e => { runStatus = RunnerStatus.FAILED try { - function.failOutputs.foreach(_.createNewFile) + function.failOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index f8262bd46..3a486993b 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -21,6 +21,9 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with /** A temporary job done file to let Queue know that the process exited with an error. */ private lazy val jobFailFile = new File(jobStatusPath + ".fail") + /** A generated exec shell script. */ + private var exec: File = _ + /** A generated pre-exec shell script. */ private var preExec: File = _ @@ -38,7 +41,6 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with job.errorFile = function.jobErrorFile job.project = function.jobProject job.queue = function.jobQueue - job.command = function.commandLine if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory)) job.workingDir = function.commandDirectory @@ -49,10 +51,16 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with if (function.memoryLimit.isDefined) job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - preExec = writePreExec(function) + job.name = function.commandLine.take(1000) + + // TODO: Look into passing in a single chained script as recommended by Doug instead of pre, exec, and post. + exec = writeExec() + job.command = "sh " + exec + + preExec = writePreExec() job.preExecCommand = "sh " + preExec - postExec = writePostExec(function) + postExec = writePostExec() job.postExecCommand = "sh " + postExec if (logger.isDebugEnabled) { @@ -61,6 +69,10 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with logger.info("Starting: " + job.bsubCommand.mkString(" ")) } + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + runStatus = RunnerStatus.RUNNING job.run() jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString @@ -98,7 +110,8 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with /** * Removes all temporary files used for this LSF job. */ - private def removeTemporaryFiles() = { + def removeTemporaryFiles() = { + exec.delete() preExec.delete() postExec.delete() jobDoneFile.delete() @@ -115,12 +128,21 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) } + /** + * Writes an exec file to cleanup any status files and + * optionally mount any automount directories on the node. + * @return the file path to the pre-exec. + */ + private def writeExec() = { + IOUtils.writeTempFile(function.commandLine, ".exec", "", function.commandDirectory) + } + /** * Writes a pre-exec file to cleanup any status files and * optionally mount any automount directories on the node. * @return the file path to the pre-exec. */ - private def writePreExec(function: CommandLineFunction): File = { + private def writePreExec() = { val preExec = new StringBuilder preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(function.commandDirectory)) @@ -138,7 +160,7 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with * Writes a post-exec file to create the status files. * @return the file path to the post-exec. */ - private def writePostExec(function: CommandLineFunction): File = { + private def writePostExec() = { val postExec = new StringBuilder val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 43923746b..9dda763cf 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -10,9 +10,9 @@ import org.jgrapht.ext.DOTExporter import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.{QSettings, QException} -import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging} +import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} /** * The internal dependency tracker between sets of function input and output files. @@ -20,7 +20,7 @@ import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Loggin class QGraph extends Logging { var dryRun = true var bsubAllJobs = false - var skipUpToDateJobs = false + var startClean = false var dotFile: File = _ var expandedDotFile: File = _ var qSettings: QSettings = _ @@ -42,15 +42,6 @@ class QGraph extends Logging { } } - - private def scatterGatherable(edge: FunctionEdge) = { - edge.function match { - case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true - case _ => false - } - } - - /** * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ @@ -59,6 +50,7 @@ class QGraph extends Logging { val isReady = numMissingValues == 0 if (isReady || this.dryRun) { + logger.info("Running jobs.") runJobs() } @@ -73,31 +65,34 @@ class QGraph extends Logging { } private def fillGraph = { + logger.info("Generating graph.") fill if (dotFile != null) renderToDot(dotFile) var numMissingValues = validate if (numMissingValues == 0 && bsubAllJobs) { - logger.debug("Scatter gathering jobs.") - var scatterGathers = List.empty[FunctionEdge] - loop({ - case edge: FunctionEdge if (scatterGatherable(edge)) => - scatterGathers :+= edge - }) + logger.info("Generating scatter gather jobs.") + val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) var addedFunctions = List.empty[QFunction] for (scatterGather <- scatterGathers) { - val functions = scatterGather.function.asInstanceOf[ScatterGatherableFunction].generateFunctions() + val functions = scatterGather.asInstanceOf[FunctionEdge] + .function.asInstanceOf[ScatterGatherableFunction] + .generateFunctions() if (this.debugMode) logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) addedFunctions ++= functions } + logger.info("Removing original jobs.") this.jobGraph.removeAllEdges(scatterGathers) prune + + logger.info("Adding scatter gather jobs.") addedFunctions.foreach(this.add(_)) + logger.info("Regenerating graph.") fill val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile if (scatterGatherDotFile != null) @@ -108,9 +103,22 @@ class QGraph extends Logging { numMissingValues } + private def scatterGatherable(edge: QEdge) = { + edge match { + case functionEdge: FunctionEdge => { + functionEdge.function match { + case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true + case _ => false + } + } + case _ => false + } + } + def checkStatus = { // build up the full DAG with scatter-gather jobs fillGraph + logger.info("Checking pipeline status.") logStatus } @@ -161,25 +169,18 @@ class QGraph extends Logging { } private def getReadyJobs = { - var readyJobs = List.empty[FunctionEdge] - loop({ - case f: FunctionEdge => { - if (this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING) - readyJobs :+= f - } - }) - readyJobs + jobGraph.edgeSet.filter{ + case f: FunctionEdge => + this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING + case _ => false + }.map(_.asInstanceOf[FunctionEdge]) } private def getRunningJobs = { - var runningJobs = List.empty[FunctionEdge] - loop({ - case f: FunctionEdge => { - if (f.status == RunnerStatus.RUNNING) - runningJobs :+= f - } - }) - runningJobs + jobGraph.edgeSet.filter{ + case f: FunctionEdge => f.status == RunnerStatus.RUNNING + case _ => false + }.map(_.asInstanceOf[FunctionEdge]) } /** @@ -232,13 +233,13 @@ class QGraph extends Logging { * Runs the jobs by traversing the graph. */ private def runJobs() = { - loop({ case f: FunctionEdge => { - val isDone = this.skipUpToDateJobs && + foreachFunction(f => { + val isDone = !this.startClean && f.status == RunnerStatus.DONE && this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) if (!isDone) f.resetPending() - }}) + }) var readyJobs = getReadyJobs var runningJobs = Set.empty[FunctionEdge] @@ -305,8 +306,8 @@ class QGraph extends Logging { */ private def logStatus = { var statuses = Map.empty[String, AnalysisStatus] - loop({ - case edgeCLF: FunctionEdge if (edgeCLF.function.analysisName != null) => + foreachFunction(edgeCLF => { + if (edgeCLF.function.analysisName != null) { updateStatus(statuses.get(edgeCLF.function.analysisName) match { case Some(status) => status case None => @@ -314,6 +315,7 @@ class QGraph extends Logging { statuses += edgeCLF.function.analysisName -> status status }, edgeCLF) + } }) statuses.values.toList.sortBy(_.analysisName).foreach(status => { @@ -343,7 +345,7 @@ class QGraph extends Logging { private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = { if (edge.function.isInstanceOf[GatherFunction]) { updateSGStatus(stats.gather, edge) - } else if (edge.function.isInstanceOf[ScatterGatherableFunction]) { + } else if (edge.function.isInstanceOf[CloneFunction]) { updateSGStatus(stats.scatter, edge) } else { stats.status = edge.status @@ -456,19 +458,14 @@ class QGraph extends Logging { (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 /** - * Utility function for looping over the internal graph and running functions. - * @param edgeFunction Optional function to run for each edge visited. - * @param nodeFunction Optional function to run for each node visited. + * Utility function for running a method over all function edges. + * @param edgeFunction Function to run for each FunctionEdge. */ - private def loop(edgeFunction: PartialFunction[QEdge, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { - val iterator = new TopologicalOrderIterator(this.jobGraph) - iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { - override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = event.getEdge match { - case cmd: FunctionEdge => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) - case map: MappingEdge => /* do nothing for mapping functions */ - } - }) - iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node)) + private def foreachFunction(f: (FunctionEdge) => Unit) = { + jobGraph.edgeSet.foreach{ + case functionEdge: FunctionEdge => f(functionEdge) + case _ => + } } /** @@ -482,7 +479,7 @@ class QGraph extends Logging { // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a // todo -- visualizeString() command, or something that shows inputs / outputs val ve = new org.jgrapht.ext.EdgeNameProvider[QEdge] { - def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") + def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") } //val iterator = new TopologicalOrderIterator(qGraph.jobGraph) @@ -505,11 +502,11 @@ class QGraph extends Logging { * Kills any forked jobs still running. */ def shutdown() { - val lsfJobs = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner].job) - if (lsfJobs.size > 0) { - for (jobs <- lsfJobs.grouped(10)) { + val lsfJobRunners = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner]) + if (lsfJobRunners.size > 0) { + for (jobRunners <- lsfJobRunners.filterNot(_.job.bsubJobId == null).grouped(10)) { try { - val bkill = new LsfKillJob(jobs) + val bkill = new LsfKillJob(jobRunners.map(_.job)) logger.info(bkill.command) bkill.run() } catch { @@ -518,6 +515,11 @@ class QGraph extends Logging { case e => logger.error("Unable to kill jobs.", e) } + try { + jobRunners.foreach(_.removeTemporaryFiles()) + } catch { + case e => /* ignore */ + } } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index 88bc24a63..a1afd7e40 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -34,19 +34,22 @@ class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Loggi logger.info("Errors also written to " + function.jobOutputFile) } - function.doneOutputs.foreach(_.delete) - function.failOutputs.foreach(_.delete) + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) runStatus = RunnerStatus.RUNNING try { job.run() - function.doneOutputs.foreach(_.createNewFile) + function.doneOutputs.foreach(_.createNewFile()) runStatus = RunnerStatus.DONE logger.info("Done: " + function.commandLine) } catch { case e: JobExitException => runStatus = RunnerStatus.FAILED try { - function.failOutputs.foreach(_.createNewFile) + function.failOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 07d35c1e1..564b65941 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -57,7 +57,7 @@ trait CommandLineFunction extends QFunction with Logging { dirs } - override protected def useStatusOutput(file: File) = + override def useStatusOutput(file: File) = file != jobOutputFile && file != jobErrorFile override def description = commandLine diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala index 42da4774d..754d2761b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -7,6 +7,6 @@ import java.io.File */ trait InProcessFunction extends QFunction { def run() - protected def useStatusOutput(file: File) = true + def useStatusOutput(file: File) = true def description = this.getClass.getSimpleName } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 4bc120d39..a599c9b08 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -34,7 +34,11 @@ trait QFunction { */ def dotString = "" - protected def useStatusOutput(file: File): Boolean + /** + * Returns true if the file should be used for status output. + * @return true if the file should be used for status output. + */ + def useStatusOutput(file: File): Boolean /** * Returns the output files for this function. @@ -57,7 +61,7 @@ trait QFunction { def failOutputs = statusPaths.map(path => new File(path + ".fail")) /** The complete list of fields on this CommandLineFunction. */ - lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList + lazy val functionFields: List[ArgumentSource] = initFunctionFields /** The @Input fields on this CommandLineFunction. */ lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) /** The @Output fields on this CommandLineFunction. */ @@ -65,6 +69,11 @@ trait QFunction { /** The @Argument fields on this CommandLineFunction. */ lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) + /** + * Called at most once, returns the list of fields for this function. + */ + protected def initFunctionFields = ParsingEngine.extractArgumentSources(this.getClass).toList + /** * Returns the input files for this function. * @return Set[File] inputs for this function. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala new file mode 100644 index 000000000..85ea2606e --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -0,0 +1,84 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.commandline.ArgumentSource +import java.io.File + +/** + * Shadow clones another command line function. + */ +class CloneFunction extends CommandLineFunction { + var originalFunction: ScatterGatherableFunction = _ + var index: Int = _ + + private var overriddenFields = Map.empty[ArgumentSource, Any] + + private def withScatterPart[A](f: () => A): A = { + var originalValues = Map.empty[ArgumentSource, Any] + overriddenFields.foreach{ + case (field, overrideValue) => { + originalValues += field -> originalFunction.getFieldValue(field) + originalFunction.setFieldValue(field, overrideValue) + } + } + try { + f() + } finally { + originalValues.foreach{ + case (name, value) => + originalFunction.setFieldValue(name, value) + } + } + } + + override def dotString = originalFunction.dotString + override def description = originalFunction.description + override protected def initFunctionFields = originalFunction.functionFields + override def useStatusOutput(file: File) = + file != jobOutputFile && file != jobErrorFile && originalFunction.useStatusOutput(file) + + override def freezeFieldValues = { + if (this.analysisName == null) + this.analysisName = originalFunction.analysisName + if (this.qSettings == null) + this.qSettings = originalFunction.qSettings + if (this.memoryLimit.isEmpty && originalFunction.memoryLimit.isDefined) + this.memoryLimit = originalFunction.memoryLimit + if (this.jobTempDir == null) + this.jobTempDir = originalFunction.jobTempDir + if (this.jobQueue == null) + this.jobQueue = originalFunction.jobQueue + if (this.jobProject == null) + this.jobProject = originalFunction.jobProject + if (this.jobName == null) + this.jobName = originalFunction.jobName + if (this.jobOutputFile == null) + this.jobOutputFile = overriddenFile("jobOutputFile").get + if (this.jobErrorFile == null) + this.jobErrorFile = overriddenFile("jobErrorFile").getOrElse(null) + super.freezeFieldValues + } + + def commandLine = withScatterPart(() => originalFunction.commandLine) + + override def getFieldValue(source: ArgumentSource) = { + overriddenFields.get(source) match { + case Some(value) => value.asInstanceOf[AnyRef] + case None => { + val value = originalFunction.getFieldValue(source) + overriddenFields += source -> value + value + } + } + } + + override def setFieldValue(source: ArgumentSource, value: Any) = { + overriddenFields += source -> value + } + + private def overriddenFile(name: String) = { + overriddenFields + .find{case (key, _) => key.field.getName == name} + .map{case (_, value) => value.asInstanceOf[File]} + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala index b257616d1..e2d22d834 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -17,7 +17,7 @@ class CreateTempDirsFunction extends InProcessFunction { @Output(doc="Temporary directories to create") var tempDirectories: List[File] = Nil - override protected def useStatusOutput(file: File) = false + override def useStatusOutput(file: File) = false def run() = tempDirectories.foreach(_.mkdirs) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 96030e99f..e9649adc8 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -23,9 +23,9 @@ trait GatherFunction extends QFunction { /** * Sets the clone function creating one of the inputs for this gather function. - * @param cloneFunction The clone of the ScatterGatherableFunction. + * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. * @param gatherField The field to be gathered. */ - def setCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int, gatherField: ArgumentSource) = {} + def setCloneFunction(cloneFunction: CloneFunction, index: Int, gatherField: ArgumentSource) = {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index d1cfd4916..9203a693d 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -26,9 +26,9 @@ trait ScatterFunction extends QFunction { /** * Sets the clone function using one of the outputs of this scatter function. - * @param cloneFunction The clone of the ScatterGatherableFunction. + * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. * @param scatterField The field being scattered. */ - def setCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int, scatterField: ArgumentSource) = {} + def setCloneFunction(cloneFunction: CloneFunction, index: Int, scatterField: ArgumentSource) = {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index a003b2737..01ba6fe95 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.commandline.ArgumentSource -import com.rits.cloning.Cloner import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** @@ -59,10 +58,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Allows external modification of the cloned function. - * @param cloneFunction The clone of this ScatterGatherableFunction + * @param cloneFunction A clone wrapper of this ScatterGatherableFunction * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - var setupCloneFunction: PartialFunction[(ScatterGatherableFunction, Int), Unit] = _ + var setupCloneFunction: PartialFunction[(CloneFunction, Int), Unit] = _ /** * Allows external modification of the CleanupTempDirsFunction that will remove the temporary directories. @@ -112,7 +111,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { } // Create the clone functions for running the parallel jobs - var cloneFunctions = List.empty[ScatterGatherableFunction] + var cloneFunctions = List.empty[CloneFunction] for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() initCloneFunction(cloneFunction, i) @@ -168,6 +167,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected lazy val scatterField = this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get + /** + * Retrieves the original field value for the scatter field. + */ + protected lazy val originalInput = getFieldFile(scatterField) + /** * Creates a new initialize CreateTempDirsFunction that will create the temporary directories. * @return A CreateTempDirsFunction that will create the temporary directories. @@ -224,7 +228,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = { scatterFunction.qSettings = this.qSettings scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) - scatterFunction.originalInput = this.getFieldFile(scatterField) + scatterFunction.originalInput = this.originalInput scatterFunction.setOriginalFunction(this, scatterField) if (this.setupScatterFunction != null) if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField)) @@ -270,14 +274,9 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Creates a new clone of this ScatterGatherableFunction, setting the scatterCount to 1 so it doesn't infinitely scatter. - * @return A clone of this ScatterGatherableFunction + * @return An uninitialized clone wrapper for ScatterGatherableFunction */ - protected def newCloneFunction(): ScatterGatherableFunction = { - val cloneFunction = ScatterGatherableFunction.cloner.deepClone(this) - // Make sure clone doesn't get scattered - cloneFunction.scatterCount = 1 - cloneFunction - } + protected def newCloneFunction() = new CloneFunction /** * Initializes the cloned function created by newCloneFunction() by setting it's commandDirectory to a temporary directory under scatterDirectory. @@ -285,7 +284,9 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param cloneFunction The clone of this ScatterGatherableFunction * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - protected def initCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def initCloneFunction(cloneFunction: CloneFunction, index: Int) = { + cloneFunction.originalFunction = this + cloneFunction.index = index cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+index) if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) @@ -303,7 +304,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param cloneFunction Clone of this ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { // Reset the input of the clone to the the scatterGatherTempDir dir and add it as an output of the scatter val scatterPart = IOUtils.resetParent(cloneFunction.commandDirectory, scatterFunction.originalInput) scatterFunction.scatterParts :+= scatterPart @@ -318,7 +319,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param gatherFunction Function that will create the pieces including the piece that will go to cloneFunction. * @param gatherField The field to be gathered. */ - protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { val gatherPart = cloneFunction.resetFieldFile(gatherField, cloneFunction.commandDirectory) gatherFunction.gatherParts :+= gatherPart gatherFunction.setCloneFunction(cloneFunction, index, gatherField) @@ -362,11 +363,3 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) } - -/** - * A function that can be run faster by splitting it up into pieces and then joining together the results. - */ -object ScatterGatherableFunction { - /** Used to deep clone a ScatterGatherableFunction. */ - private lazy val cloner = new Cloner -} diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala index e53681652..1ea1e8df5 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala @@ -3,7 +3,7 @@ package org.broadinstitute.sting.queue.util /** * bkills a list of lsf jobs. */ -class LsfKillJob(jobs: List[LsfJob]) extends CommandLineJob with Logging { +class LsfKillJob(jobs: Traversable[LsfJob]) extends CommandLineJob with Logging { command = "bkill " + jobs.map(_.bsubJobId).mkString(" ") def run() = { diff --git a/settings/ivysettings.xml b/settings/ivysettings.xml index 383882a2a..244cd1075 100644 --- a/settings/ivysettings.xml +++ b/settings/ivysettings.xml @@ -8,7 +8,6 @@ - @@ -18,7 +17,6 @@ -