diff --git a/build.xml b/build.xml index 070eab192..dea30a7a9 100644 --- a/build.xml +++ b/build.xml @@ -549,7 +549,7 @@ - + diff --git a/ivy.xml b/ivy.xml index a078c0f40..9bb536839 100644 --- a/ivy.xml +++ b/ivy.xml @@ -40,9 +40,6 @@ - - - diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 12e24af96..bd361c610 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -26,8 +26,8 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) private var expandedDotFile: File = _ - @Argument(fullName="skip_up_to_date", shortName="skipUpToDate", doc="Does not run command line functions that don't depend on other jobs if the outputs exist and are older than the inputs.", required=false) - private var skipUpToDate = false + @Argument(fullName="start_clean", shortName="clean", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) + private var startClean = false @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden private var runScripts = false @@ -47,7 +47,7 @@ class QCommandLine extends CommandLineProgram with Logging { val qGraph = new QGraph qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs - qGraph.skipUpToDateJobs = skipUpToDate + qGraph.startClean = startClean qGraph.dotFile = dotFile qGraph.expandedDotFile = expandedDotFile qGraph.qSettings = qSettings @@ -71,10 +71,8 @@ class QCommandLine extends CommandLineProgram with Logging { }) if ( ! getStatus ) { - logger.info("Running generated graph") qGraph.run } else { - logger.info("Checking pipeline status") qGraph.checkStatus } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index 22c65bf2e..ea372a439 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -16,19 +16,19 @@ class InProcessRunner(function: InProcessFunction) extends JobRunner with Loggin logger.info("Starting: " + function.description) } - function.doneOutputs.foreach(_.delete) - function.failOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) runStatus = RunnerStatus.RUNNING try { function.run() - function.doneOutputs.foreach(_.createNewFile) + function.doneOutputs.foreach(_.createNewFile()) runStatus = RunnerStatus.DONE logger.info("Done: " + function.description) } catch { case e => { runStatus = RunnerStatus.FAILED try { - function.failOutputs.foreach(_.createNewFile) + function.failOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index f8262bd46..3a486993b 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -21,6 +21,9 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with /** A temporary job done file to let Queue know that the process exited with an error. */ private lazy val jobFailFile = new File(jobStatusPath + ".fail") + /** A generated exec shell script. */ + private var exec: File = _ + /** A generated pre-exec shell script. */ private var preExec: File = _ @@ -38,7 +41,6 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with job.errorFile = function.jobErrorFile job.project = function.jobProject job.queue = function.jobQueue - job.command = function.commandLine if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory)) job.workingDir = function.commandDirectory @@ -49,10 +51,16 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with if (function.memoryLimit.isDefined) job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - preExec = writePreExec(function) + job.name = function.commandLine.take(1000) + + // TODO: Look into passing in a single chained script as recommended by Doug instead of pre, exec, and post. + exec = writeExec() + job.command = "sh " + exec + + preExec = writePreExec() job.preExecCommand = "sh " + preExec - postExec = writePostExec(function) + postExec = writePostExec() job.postExecCommand = "sh " + postExec if (logger.isDebugEnabled) { @@ -61,6 +69,10 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with logger.info("Starting: " + job.bsubCommand.mkString(" ")) } + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + runStatus = RunnerStatus.RUNNING job.run() jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString @@ -98,7 +110,8 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with /** * Removes all temporary files used for this LSF job. */ - private def removeTemporaryFiles() = { + def removeTemporaryFiles() = { + exec.delete() preExec.delete() postExec.delete() jobDoneFile.delete() @@ -115,12 +128,21 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) } + /** + * Writes an exec file to cleanup any status files and + * optionally mount any automount directories on the node. + * @return the file path to the pre-exec. + */ + private def writeExec() = { + IOUtils.writeTempFile(function.commandLine, ".exec", "", function.commandDirectory) + } + /** * Writes a pre-exec file to cleanup any status files and * optionally mount any automount directories on the node. * @return the file path to the pre-exec. */ - private def writePreExec(function: CommandLineFunction): File = { + private def writePreExec() = { val preExec = new StringBuilder preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(function.commandDirectory)) @@ -138,7 +160,7 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with * Writes a post-exec file to create the status files. * @return the file path to the post-exec. */ - private def writePostExec(function: CommandLineFunction): File = { + private def writePostExec() = { val postExec = new StringBuilder val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 43923746b..9dda763cf 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -10,9 +10,9 @@ import org.jgrapht.ext.DOTExporter import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.{QSettings, QException} -import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Logging} +import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} /** * The internal dependency tracker between sets of function input and output files. @@ -20,7 +20,7 @@ import org.broadinstitute.sting.queue.util.{JobExitException, LsfKillJob, Loggin class QGraph extends Logging { var dryRun = true var bsubAllJobs = false - var skipUpToDateJobs = false + var startClean = false var dotFile: File = _ var expandedDotFile: File = _ var qSettings: QSettings = _ @@ -42,15 +42,6 @@ class QGraph extends Logging { } } - - private def scatterGatherable(edge: FunctionEdge) = { - edge.function match { - case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true - case _ => false - } - } - - /** * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ @@ -59,6 +50,7 @@ class QGraph extends Logging { val isReady = numMissingValues == 0 if (isReady || this.dryRun) { + logger.info("Running jobs.") runJobs() } @@ -73,31 +65,34 @@ class QGraph extends Logging { } private def fillGraph = { + logger.info("Generating graph.") fill if (dotFile != null) renderToDot(dotFile) var numMissingValues = validate if (numMissingValues == 0 && bsubAllJobs) { - logger.debug("Scatter gathering jobs.") - var scatterGathers = List.empty[FunctionEdge] - loop({ - case edge: FunctionEdge if (scatterGatherable(edge)) => - scatterGathers :+= edge - }) + logger.info("Generating scatter gather jobs.") + val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) var addedFunctions = List.empty[QFunction] for (scatterGather <- scatterGathers) { - val functions = scatterGather.function.asInstanceOf[ScatterGatherableFunction].generateFunctions() + val functions = scatterGather.asInstanceOf[FunctionEdge] + .function.asInstanceOf[ScatterGatherableFunction] + .generateFunctions() if (this.debugMode) logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) addedFunctions ++= functions } + logger.info("Removing original jobs.") this.jobGraph.removeAllEdges(scatterGathers) prune + + logger.info("Adding scatter gather jobs.") addedFunctions.foreach(this.add(_)) + logger.info("Regenerating graph.") fill val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile if (scatterGatherDotFile != null) @@ -108,9 +103,22 @@ class QGraph extends Logging { numMissingValues } + private def scatterGatherable(edge: QEdge) = { + edge match { + case functionEdge: FunctionEdge => { + functionEdge.function match { + case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => true + case _ => false + } + } + case _ => false + } + } + def checkStatus = { // build up the full DAG with scatter-gather jobs fillGraph + logger.info("Checking pipeline status.") logStatus } @@ -161,25 +169,18 @@ class QGraph extends Logging { } private def getReadyJobs = { - var readyJobs = List.empty[FunctionEdge] - loop({ - case f: FunctionEdge => { - if (this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING) - readyJobs :+= f - } - }) - readyJobs + jobGraph.edgeSet.filter{ + case f: FunctionEdge => + this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING + case _ => false + }.map(_.asInstanceOf[FunctionEdge]) } private def getRunningJobs = { - var runningJobs = List.empty[FunctionEdge] - loop({ - case f: FunctionEdge => { - if (f.status == RunnerStatus.RUNNING) - runningJobs :+= f - } - }) - runningJobs + jobGraph.edgeSet.filter{ + case f: FunctionEdge => f.status == RunnerStatus.RUNNING + case _ => false + }.map(_.asInstanceOf[FunctionEdge]) } /** @@ -232,13 +233,13 @@ class QGraph extends Logging { * Runs the jobs by traversing the graph. */ private def runJobs() = { - loop({ case f: FunctionEdge => { - val isDone = this.skipUpToDateJobs && + foreachFunction(f => { + val isDone = !this.startClean && f.status == RunnerStatus.DONE && this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) if (!isDone) f.resetPending() - }}) + }) var readyJobs = getReadyJobs var runningJobs = Set.empty[FunctionEdge] @@ -305,8 +306,8 @@ class QGraph extends Logging { */ private def logStatus = { var statuses = Map.empty[String, AnalysisStatus] - loop({ - case edgeCLF: FunctionEdge if (edgeCLF.function.analysisName != null) => + foreachFunction(edgeCLF => { + if (edgeCLF.function.analysisName != null) { updateStatus(statuses.get(edgeCLF.function.analysisName) match { case Some(status) => status case None => @@ -314,6 +315,7 @@ class QGraph extends Logging { statuses += edgeCLF.function.analysisName -> status status }, edgeCLF) + } }) statuses.values.toList.sortBy(_.analysisName).foreach(status => { @@ -343,7 +345,7 @@ class QGraph extends Logging { private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = { if (edge.function.isInstanceOf[GatherFunction]) { updateSGStatus(stats.gather, edge) - } else if (edge.function.isInstanceOf[ScatterGatherableFunction]) { + } else if (edge.function.isInstanceOf[CloneFunction]) { updateSGStatus(stats.scatter, edge) } else { stats.status = edge.status @@ -456,19 +458,14 @@ class QGraph extends Logging { (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 /** - * Utility function for looping over the internal graph and running functions. - * @param edgeFunction Optional function to run for each edge visited. - * @param nodeFunction Optional function to run for each node visited. + * Utility function for running a method over all function edges. + * @param edgeFunction Function to run for each FunctionEdge. */ - private def loop(edgeFunction: PartialFunction[QEdge, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { - val iterator = new TopologicalOrderIterator(this.jobGraph) - iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { - override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = event.getEdge match { - case cmd: FunctionEdge => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) - case map: MappingEdge => /* do nothing for mapping functions */ - } - }) - iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node)) + private def foreachFunction(f: (FunctionEdge) => Unit) = { + jobGraph.edgeSet.foreach{ + case functionEdge: FunctionEdge => f(functionEdge) + case _ => + } } /** @@ -482,7 +479,7 @@ class QGraph extends Logging { // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a // todo -- visualizeString() command, or something that shows inputs / outputs val ve = new org.jgrapht.ext.EdgeNameProvider[QEdge] { - def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") + def getEdgeName(function: QEdge) = if (function.dotString == null) "" else function.dotString.replace("\"", "\\\"") } //val iterator = new TopologicalOrderIterator(qGraph.jobGraph) @@ -505,11 +502,11 @@ class QGraph extends Logging { * Kills any forked jobs still running. */ def shutdown() { - val lsfJobs = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner].job) - if (lsfJobs.size > 0) { - for (jobs <- lsfJobs.grouped(10)) { + val lsfJobRunners = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner]) + if (lsfJobRunners.size > 0) { + for (jobRunners <- lsfJobRunners.filterNot(_.job.bsubJobId == null).grouped(10)) { try { - val bkill = new LsfKillJob(jobs) + val bkill = new LsfKillJob(jobRunners.map(_.job)) logger.info(bkill.command) bkill.run() } catch { @@ -518,6 +515,11 @@ class QGraph extends Logging { case e => logger.error("Unable to kill jobs.", e) } + try { + jobRunners.foreach(_.removeTemporaryFiles()) + } catch { + case e => /* ignore */ + } } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index 88bc24a63..a1afd7e40 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -34,19 +34,22 @@ class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Loggi logger.info("Errors also written to " + function.jobOutputFile) } - function.doneOutputs.foreach(_.delete) - function.failOutputs.foreach(_.delete) + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) runStatus = RunnerStatus.RUNNING try { job.run() - function.doneOutputs.foreach(_.createNewFile) + function.doneOutputs.foreach(_.createNewFile()) runStatus = RunnerStatus.DONE logger.info("Done: " + function.commandLine) } catch { case e: JobExitException => runStatus = RunnerStatus.FAILED try { - function.failOutputs.foreach(_.createNewFile) + function.failOutputs.foreach(_.createNewFile()) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 07d35c1e1..564b65941 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -57,7 +57,7 @@ trait CommandLineFunction extends QFunction with Logging { dirs } - override protected def useStatusOutput(file: File) = + override def useStatusOutput(file: File) = file != jobOutputFile && file != jobErrorFile override def description = commandLine diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala index 42da4774d..754d2761b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -7,6 +7,6 @@ import java.io.File */ trait InProcessFunction extends QFunction { def run() - protected def useStatusOutput(file: File) = true + def useStatusOutput(file: File) = true def description = this.getClass.getSimpleName } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 4bc120d39..a599c9b08 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -34,7 +34,11 @@ trait QFunction { */ def dotString = "" - protected def useStatusOutput(file: File): Boolean + /** + * Returns true if the file should be used for status output. + * @return true if the file should be used for status output. + */ + def useStatusOutput(file: File): Boolean /** * Returns the output files for this function. @@ -57,7 +61,7 @@ trait QFunction { def failOutputs = statusPaths.map(path => new File(path + ".fail")) /** The complete list of fields on this CommandLineFunction. */ - lazy val functionFields: List[ArgumentSource] = ParsingEngine.extractArgumentSources(this.getClass).toList + lazy val functionFields: List[ArgumentSource] = initFunctionFields /** The @Input fields on this CommandLineFunction. */ lazy val inputFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Input])) /** The @Output fields on this CommandLineFunction. */ @@ -65,6 +69,11 @@ trait QFunction { /** The @Argument fields on this CommandLineFunction. */ lazy val argumentFields = functionFields.filter(source => ReflectionUtils.hasAnnotation(source.field, classOf[Argument])) + /** + * Called at most once, returns the list of fields for this function. + */ + protected def initFunctionFields = ParsingEngine.extractArgumentSources(this.getClass).toList + /** * Returns the input files for this function. * @return Set[File] inputs for this function. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala new file mode 100644 index 000000000..85ea2606e --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -0,0 +1,84 @@ +package org.broadinstitute.sting.queue.function.scattergather + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.commandline.ArgumentSource +import java.io.File + +/** + * Shadow clones another command line function. + */ +class CloneFunction extends CommandLineFunction { + var originalFunction: ScatterGatherableFunction = _ + var index: Int = _ + + private var overriddenFields = Map.empty[ArgumentSource, Any] + + private def withScatterPart[A](f: () => A): A = { + var originalValues = Map.empty[ArgumentSource, Any] + overriddenFields.foreach{ + case (field, overrideValue) => { + originalValues += field -> originalFunction.getFieldValue(field) + originalFunction.setFieldValue(field, overrideValue) + } + } + try { + f() + } finally { + originalValues.foreach{ + case (name, value) => + originalFunction.setFieldValue(name, value) + } + } + } + + override def dotString = originalFunction.dotString + override def description = originalFunction.description + override protected def initFunctionFields = originalFunction.functionFields + override def useStatusOutput(file: File) = + file != jobOutputFile && file != jobErrorFile && originalFunction.useStatusOutput(file) + + override def freezeFieldValues = { + if (this.analysisName == null) + this.analysisName = originalFunction.analysisName + if (this.qSettings == null) + this.qSettings = originalFunction.qSettings + if (this.memoryLimit.isEmpty && originalFunction.memoryLimit.isDefined) + this.memoryLimit = originalFunction.memoryLimit + if (this.jobTempDir == null) + this.jobTempDir = originalFunction.jobTempDir + if (this.jobQueue == null) + this.jobQueue = originalFunction.jobQueue + if (this.jobProject == null) + this.jobProject = originalFunction.jobProject + if (this.jobName == null) + this.jobName = originalFunction.jobName + if (this.jobOutputFile == null) + this.jobOutputFile = overriddenFile("jobOutputFile").get + if (this.jobErrorFile == null) + this.jobErrorFile = overriddenFile("jobErrorFile").getOrElse(null) + super.freezeFieldValues + } + + def commandLine = withScatterPart(() => originalFunction.commandLine) + + override def getFieldValue(source: ArgumentSource) = { + overriddenFields.get(source) match { + case Some(value) => value.asInstanceOf[AnyRef] + case None => { + val value = originalFunction.getFieldValue(source) + overriddenFields += source -> value + value + } + } + } + + override def setFieldValue(source: ArgumentSource, value: Any) = { + overriddenFields += source -> value + } + + private def overriddenFile(name: String) = { + overriddenFields + .find{case (key, _) => key.field.getName == name} + .map{case (_, value) => value.asInstanceOf[File]} + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala index b257616d1..e2d22d834 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -17,7 +17,7 @@ class CreateTempDirsFunction extends InProcessFunction { @Output(doc="Temporary directories to create") var tempDirectories: List[File] = Nil - override protected def useStatusOutput(file: File) = false + override def useStatusOutput(file: File) = false def run() = tempDirectories.foreach(_.mkdirs) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 96030e99f..e9649adc8 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -23,9 +23,9 @@ trait GatherFunction extends QFunction { /** * Sets the clone function creating one of the inputs for this gather function. - * @param cloneFunction The clone of the ScatterGatherableFunction. + * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. * @param gatherField The field to be gathered. */ - def setCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int, gatherField: ArgumentSource) = {} + def setCloneFunction(cloneFunction: CloneFunction, index: Int, gatherField: ArgumentSource) = {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index d1cfd4916..9203a693d 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -26,9 +26,9 @@ trait ScatterFunction extends QFunction { /** * Sets the clone function using one of the outputs of this scatter function. - * @param cloneFunction The clone of the ScatterGatherableFunction. + * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. * @param scatterField The field being scattered. */ - def setCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int, scatterField: ArgumentSource) = {} + def setCloneFunction(cloneFunction: CloneFunction, index: Int, scatterField: ArgumentSource) = {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index a003b2737..01ba6fe95 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.commandline.ArgumentSource -import com.rits.cloning.Cloner import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** @@ -59,10 +58,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Allows external modification of the cloned function. - * @param cloneFunction The clone of this ScatterGatherableFunction + * @param cloneFunction A clone wrapper of this ScatterGatherableFunction * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - var setupCloneFunction: PartialFunction[(ScatterGatherableFunction, Int), Unit] = _ + var setupCloneFunction: PartialFunction[(CloneFunction, Int), Unit] = _ /** * Allows external modification of the CleanupTempDirsFunction that will remove the temporary directories. @@ -112,7 +111,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { } // Create the clone functions for running the parallel jobs - var cloneFunctions = List.empty[ScatterGatherableFunction] + var cloneFunctions = List.empty[CloneFunction] for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() initCloneFunction(cloneFunction, i) @@ -168,6 +167,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected lazy val scatterField = this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get + /** + * Retrieves the original field value for the scatter field. + */ + protected lazy val originalInput = getFieldFile(scatterField) + /** * Creates a new initialize CreateTempDirsFunction that will create the temporary directories. * @return A CreateTempDirsFunction that will create the temporary directories. @@ -224,7 +228,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = { scatterFunction.qSettings = this.qSettings scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) - scatterFunction.originalInput = this.getFieldFile(scatterField) + scatterFunction.originalInput = this.originalInput scatterFunction.setOriginalFunction(this, scatterField) if (this.setupScatterFunction != null) if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField)) @@ -270,14 +274,9 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Creates a new clone of this ScatterGatherableFunction, setting the scatterCount to 1 so it doesn't infinitely scatter. - * @return A clone of this ScatterGatherableFunction + * @return An uninitialized clone wrapper for ScatterGatherableFunction */ - protected def newCloneFunction(): ScatterGatherableFunction = { - val cloneFunction = ScatterGatherableFunction.cloner.deepClone(this) - // Make sure clone doesn't get scattered - cloneFunction.scatterCount = 1 - cloneFunction - } + protected def newCloneFunction() = new CloneFunction /** * Initializes the cloned function created by newCloneFunction() by setting it's commandDirectory to a temporary directory under scatterDirectory. @@ -285,7 +284,9 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param cloneFunction The clone of this ScatterGatherableFunction * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - protected def initCloneFunction(cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def initCloneFunction(cloneFunction: CloneFunction, index: Int) = { + cloneFunction.originalFunction = this + cloneFunction.index = index cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+index) if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) @@ -303,7 +304,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param cloneFunction Clone of this ScatterGatherableFunction. * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ - protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { // Reset the input of the clone to the the scatterGatherTempDir dir and add it as an output of the scatter val scatterPart = IOUtils.resetParent(cloneFunction.commandDirectory, scatterFunction.originalInput) scatterFunction.scatterParts :+= scatterPart @@ -318,7 +319,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param gatherFunction Function that will create the pieces including the piece that will go to cloneFunction. * @param gatherField The field to be gathered. */ - protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: ScatterGatherableFunction, index: Int) = { + protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { val gatherPart = cloneFunction.resetFieldFile(gatherField, cloneFunction.commandDirectory) gatherFunction.gatherParts :+= gatherPart gatherFunction.setCloneFunction(cloneFunction, index, gatherField) @@ -362,11 +363,3 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ private def scatterGatherTempDir(subDir: String) = IOUtils.subDir(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) } - -/** - * A function that can be run faster by splitting it up into pieces and then joining together the results. - */ -object ScatterGatherableFunction { - /** Used to deep clone a ScatterGatherableFunction. */ - private lazy val cloner = new Cloner -} diff --git a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala index e53681652..1ea1e8df5 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/LsfKillJob.scala @@ -3,7 +3,7 @@ package org.broadinstitute.sting.queue.util /** * bkills a list of lsf jobs. */ -class LsfKillJob(jobs: List[LsfJob]) extends CommandLineJob with Logging { +class LsfKillJob(jobs: Traversable[LsfJob]) extends CommandLineJob with Logging { command = "bkill " + jobs.map(_.bsubJobId).mkString(" ") def run() = { diff --git a/settings/ivysettings.xml b/settings/ivysettings.xml index 383882a2a..244cd1075 100644 --- a/settings/ivysettings.xml +++ b/settings/ivysettings.xml @@ -8,7 +8,6 @@ - @@ -18,7 +17,6 @@ -