diff --git a/java/src/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBat.java b/java/src/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBat.java index 1beb8fb86..98d8792c6 100644 --- a/java/src/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBat.java +++ b/java/src/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBat.java @@ -6409,7 +6409,28 @@ public class LibBat { /** * < Detail reservation information for each kind of resource */ - public reserveItem.ByReference items; + /* + TODO: Go against the JNA recommendations at + http://jna.java.net/#structure_use and instead change each structure + to use Pointers instead of Structure.ByReference while also providing + Structure(Pointer) constructors for use for future API users. + + LSF will often reuse memory for structure arrays but will set the + array size / count (reserveCnt above) to zero when the array should + not be accessed. When LSF has reused memory and points to a non-null + structure pointer (items) the inner structure may contain further + garbage pointers (especially items->resName). + + When JNA sees a non-null Structure.ByReference it will autoRead() the + member. When autoRead() eventually gets to the items->resName trying + to run strlen on the bad memory address causes a SIGSEGV. + + By using a Pointer instead of the Structure.ByReference JNA will not + automatically autoRead(), and the API user will have to pass the + pointer to the Structure on their own. + */ + //public reserveItem.ByReference items; + public Pointer items; /** * < Absolute priority scheduling (APS) string set by administrators to denote ADMIN factor APS value. diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index 863da1d4f..553116ce0 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -15,15 +15,6 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { /** Which directory to use for the job status files. */ protected def jobStatusDir = function.jobTempDir - /** The last time the status returned unknown. */ - protected var firstUnknownTime: Option[Long] = None - - /** Amount of time the status can return unknown before giving up. */ - protected val unknownStatusMaxSeconds = 5 * 60 - - /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */ - protected val retryExpiredSeconds = 5 * 60 - /** * Writes the function command line to an exec file. */ diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 0a165f10e..53a555aa6 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -18,6 +18,11 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod */ var retries = 0 + /** + * The depth of this edge in the graph. + */ + var depth = -1 + /** * Initializes with the current status of the function. */ diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala index a55e7822b..b8a4dd5a7 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala @@ -17,9 +17,16 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] { */ def create(function: TFunction): TRunner + /** + * Updates the status on a list of functions. + * @param runners Runners to update. + */ + def updateStatus(runners: List[TRunner]) { + } + /** * Stops a list of functions. - * @param runner Runners to stop. + * @param runners Runners to stop. */ def tryStop(runners: List[TRunner]) { } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala index de85b66b2..b0809ddd2 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobManager.scala @@ -9,5 +9,7 @@ class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner] def runnerType = classOf[Lsf706JobRunner] def functionType = classOf[CommandLineFunction] def create(function: CommandLineFunction) = new Lsf706JobRunner(function) + + override def updateStatus(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) } override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 08540fbd3..d4bead82d 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -7,7 +7,6 @@ import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC -import java.util.Date import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference import com.sun.jna.{StringArray, NativeLong} @@ -21,10 +20,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR Lsf706JobRunner /** Job Id of the currently executing job. */ - var jobId = -1L + private var jobId = -1L - /** Last known run status */ - private var runStatus: RunnerStatus.Value = _ + /** Last known status */ + private var lastStatus: RunnerStatus.Value = _ + + /** The last time the status was updated */ + protected var lastStatusUpdate: Long = _ /** * Dispatches the function on the LSF cluster. @@ -82,7 +84,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR // Allow advanced users to update the request. updateJobRun(request) - runStatus = RunnerStatus.RUNNING + updateStatus(RunnerStatus.RUNNING) Retry.attempt(() => { val reply = new submitReply jobId = LibBat.lsb_submit(request, reply) @@ -93,93 +95,24 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR } } - /** - * Updates and returns the status. - */ - def status = { - Lsf706JobRunner.lsfLibLock.synchronized { - var jobStatus = LibBat.JOB_STAT_UNKWN - var exitStatus = 0 - var exitInfo = 0 - var endTime: NativeLong = null + def status = this.lastStatus - var result = 0 - Retry.attempt(() => { - result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) - if (result < 0) - throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) - }, 0.5, 1, 2) - try { - if (result > 1) - throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId)) - else if (result == 1) { - val more = new IntByReference(result) - val jobInfo = LibBat.lsb_readjobinfo(more) - if (jobInfo == null) - throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId)) - jobStatus = jobInfo.status - exitStatus = jobInfo.exitStatus - exitInfo = jobInfo.exitInfo - endTime = jobInfo.endTime - } - } finally { - LibBat.lsb_closejobinfo() - } - - logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { - val now = new Date().getTime - - if (firstUnknownTime.isEmpty) { - firstUnknownTime = Some(now) - logger.debug("First unknown status for job id: " + jobId) - } - - if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runStatus = RunnerStatus.FAILED - logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description)) - } - } else { - // Reset the last time an unknown status was seen. - firstUnknownTime = None - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { - // Exited function that (probably) won't be retried. - runStatus = RunnerStatus.FAILED - } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { - // Done successfully. - runStatus = RunnerStatus.DONE - } - } - - runStatus - } + private def updateStatus(updatedStatus: RunnerStatus.Value) = { + this.lastStatus = updatedStatus + this.lastStatusUpdate = System.currentTimeMillis } - - /** - * Returns true if LSF is expected to retry running the function. - * @param exitInfo The reason the job exited. - * @param endTime THe time the job exited. - * @return true if LSF is expected to retry running the function. - */ - private def willRetry(exitInfo: Int, endTime: NativeLong) = { - exitInfo match { - case LibBat.EXIT_NORMAL => false - case _ => { - val seconds = LibC.difftime(LibC.time(null), endTime) - (seconds <= retryExpiredSeconds) - } - } - } - } object Lsf706JobRunner extends Logging { private val lsfLibLock = new Object private val SIGTERM = 15 + /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */ + private val retryExpiredSeconds = 5 * 60 + + /** Amount of time a job can go without status before giving up. */ + private val unknownStatusMaxSeconds = 5 * 60 + init() /** The name of the default queue. */ @@ -238,6 +171,44 @@ object Lsf706JobRunner extends Logging { } } + /** + * Updates the status of a list of jobs. + */ + def updateStatus(runners: List[Lsf706JobRunner]) { + var updatedRunners = List.empty[Lsf706JobRunner] + + Lsf706JobRunner.lsfLibLock.synchronized { + val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB) + if (result < 0) { + logger.error(LibBat.lsb_sperror("Unable to check LSF job info")) + } else { + try { + val more = new IntByReference(result) + while (more.getValue > 0) { + val jobInfo = LibBat.lsb_readjobinfo(more) + if (jobInfo == null) { + logger.error(LibBat.lsb_sperror("Unable to read LSF job info")) + more.setValue(0) + } else { + runners.find(runner => runner.jobId == jobInfo.jobId) match { + case Some(runner) => + updateRunnerStatus(runner, jobInfo) + updatedRunners :+= runner + case None => /* not our job */ + } + } + } + } finally { + LibBat.lsb_closejobinfo() + } + } + } + + for (runner <- runners.diff(updatedRunners)) { + checkUnknownStatus(runner) + } + } + /** * Tries to stop any running jobs. * @param runners Runners to stop. @@ -246,15 +217,67 @@ object Lsf706JobRunner extends Logging { lsfLibLock.synchronized { // lsb_killbulkjobs does not seem to forward SIGTERM, // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. - for (jobRunner <- runners.filterNot(_.jobId < 0)) { + for (runner <- runners.filterNot(_.jobId < 0)) { try { - if (LibBat.lsb_signaljob(jobRunner.jobId, SIGTERM) < 0) - logger.error(LibBat.lsb_sperror("Unable to kill job " + jobRunner.jobId)) + if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) } catch { case e => - logger.error("Unable to kill job " + jobRunner.jobId, e) + logger.error("Unable to kill job " + runner.jobId, e) } } } } + + private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) { + val jobStatus = jobInfo.status + val exitStatus = jobInfo.exitStatus + val exitInfo = jobInfo.exitInfo + val endTime = jobInfo.endTime + + logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo)) + + runner.updateStatus( + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { + // Done successfully. + RunnerStatus.DONE + } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { + // Exited function that (probably) won't be retried. + RunnerStatus.FAILED + } else { + // Note that we still saw the job in the system. + RunnerStatus.RUNNING + } + ) + } + + private def checkUnknownStatus(runner: Lsf706JobRunner) { + // TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs. + // Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod + // LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct) + // LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist) + logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId)) + val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate) + if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + runner.updateStatus(RunnerStatus.FAILED) + logger.error("Unable to read LSF status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description)) + } + } + + /** + * Returns true if LSF is expected to retry running the function. + * @param exitInfo The reason the job exited. + * @param endTime THe time the job exited. + * @return true if LSF is expected to retry running the function. + */ + private def willRetry(exitInfo: Int, endTime: NativeLong) = { + exitInfo match { + case LibBat.EXIT_NORMAL => false + case _ => { + val seconds = LibC.difftime(LibC.time(null), endTime) + (seconds <= retryExpiredSeconds) + } + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index ed71b586c..51df7267d 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -13,7 +13,7 @@ import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFu import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.apache.commons.lang.StringUtils import org.broadinstitute.sting.queue.util._ -import collection.immutable.TreeMap +import collection.immutable.{TreeSet, TreeMap} /** * The internal dependency tracker between sets of function input and output files. @@ -26,18 +26,24 @@ class QGraph extends Logging { private var numMissingValues = 0 private val jobGraph = newGraph + private val functionOrdering = Ordering.by[FunctionEdge, Iterable[Int]](edge => -graphDepth(edge) +: edge.function.addOrder) + private val fileOrdering = Ordering.by[File,String](_.getAbsolutePath) // A map of nodes by list of files. - private var nodeMap = TreeMap.empty[Iterable[File], QNode](Ordering.Iterable(Ordering.by[File,String](_.getAbsolutePath))) + private var nodeMap = TreeMap.empty[Iterable[File], QNode](Ordering.Iterable(fileOrdering)) // The next unique id for a node if not found in the nodeMap. private var nextNodeId = 0 private var running = true private val runningLock = new Object + private var runningJobs = Set.empty[FunctionEdge] + private val nl = "%n".format() private val inProcessManager = new InProcessJobManager private var commandLineManager: JobManager[CommandLineFunction, _<:JobRunner[CommandLineFunction]] = _ + private def managers = List[Any](inProcessManager, commandLineManager) + /** * Adds a QScript created CommandLineFunction to the graph. * @param command Function to add to the graph. @@ -48,8 +54,8 @@ class QGraph extends Logging { if (running) { command.qSettings = settings.qSettings command.freeze - val inputs = getQNode(command.inputs.toList.sortWith(_.compareTo(_) < 0)) - val outputs = getQNode(command.outputs.toList.sortWith(_.compareTo(_) < 0)) + val inputs = getQNode(command.inputs.toList.sorted(fileOrdering)) + val outputs = getQNode(command.outputs.toList.sorted(fileOrdering)) addEdge(new FunctionEdge(command, inputs, outputs)) } } @@ -143,19 +149,17 @@ class QGraph extends Logging { } /** - * Walks up the graph looking for the previous command line edges. - * @param function Function to examine for a previous command line job. - * @param qGraph The graph that contains the jobs. - * @return A list of prior jobs. + * Walks up the graph looking for the previous function edges. + * @param edge Graph edge to examine for the previous functions. + * @return A list of prior function edges. */ private def previousFunctions(edge: QEdge): List[FunctionEdge] = { var previous = List.empty[FunctionEdge] - val source = this.jobGraph.getEdgeSource(edge) for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { incomingEdge match { - // Stop recursing when we find a job along the edge and return its job id + // Stop recursing when we find a function edge and return it case functionEdge: FunctionEdge => previous :+= functionEdge // For any other type of edge find the jobs preceding the edge @@ -190,19 +194,12 @@ class QGraph extends Logging { }) } - private def getReadyJobs = { + private def getReadyJobs(): Set[FunctionEdge] = { jobGraph.edgeSet.filter{ case f: FunctionEdge => this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING case _ => false - }.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_)) - } - - private def getRunningJobs = { - jobGraph.edgeSet.filter{ - case f: FunctionEdge => f.status == RunnerStatus.RUNNING - case _ => false - }.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_)) + }.toSet.asInstanceOf[Set[FunctionEdge]] } /** @@ -260,8 +257,13 @@ class QGraph extends Logging { * Dry-runs the jobs by traversing the graph. */ private def dryRunJobs() { - updateGraphStatus(false) - var readyJobs = getReadyJobs + if (settings.startFromScratch) { + logger.info("Will remove outputs from previous runs.") + foreachFunction(_.resetToPending(false)) + } else + updateGraphStatus(false) + + var readyJobs = getReadyJobs() while (running && readyJobs.size > 0) { logger.debug("+++++++") readyJobs.foreach(edge => { @@ -270,7 +272,7 @@ class QGraph extends Logging { edge.markAsDone } }) - readyJobs = getReadyJobs + readyJobs = getReadyJobs() } } @@ -311,39 +313,45 @@ class QGraph extends Logging { } else updateGraphStatus(true) - var readyJobs = getReadyJobs - var runningJobs = Set.empty[FunctionEdge] + var readyJobs = TreeSet.empty[FunctionEdge](functionOrdering) + readyJobs ++= getReadyJobs() + runningJobs = Set.empty[FunctionEdge] + var lastRunningCheck = System.currentTimeMillis + while (running && readyJobs.size + runningJobs.size > 0) { - var exitedJobs = List.empty[FunctionEdge] + + while (running && readyJobs.size > 0 && nextRunningCheck(lastRunningCheck) > 0) { + val edge = readyJobs.head + edge.runner = newRunner(edge.function) + edge.start() + runningJobs += edge + readyJobs -= edge + } + + if (readyJobs.size == 0 && runningJobs.size > 0) + Thread.sleep(nextRunningCheck(lastRunningCheck)) + + lastRunningCheck = System.currentTimeMillis + updateStatus() + + var doneJobs = List.empty[FunctionEdge] var failedJobs = List.empty[FunctionEdge] - runningJobs.foreach(runner => runner.status match { + runningJobs.foreach(edge => edge.status match { + case RunnerStatus.DONE => doneJobs :+= edge + case RunnerStatus.FAILED => failedJobs :+= edge case RunnerStatus.RUNNING => /* do nothing while still running */ - case RunnerStatus.FAILED => exitedJobs :+= runner; failedJobs :+= runner - case RunnerStatus.DONE => exitedJobs :+= runner - }) - exitedJobs.foreach(runner => runningJobs -= runner) - - readyJobs.foreach(f => { - if (running) { - f.runner = newRunner(f.function) - f.start() - f.status match { - case RunnerStatus.RUNNING => runningJobs += f - case RunnerStatus.FAILED => failedJobs :+= f - case RunnerStatus.DONE => /* do nothing and move on */ - } - } }) - if (failedJobs.size > 0) { + runningJobs --= doneJobs + runningJobs --= failedJobs + + if (running && failedJobs.size > 0) { emailFailedJobs(failedJobs) checkRetryJobs(failedJobs) } - if (readyJobs.size == 0 && runningJobs.size > 0) - Thread.sleep(30000L) - readyJobs = getReadyJobs + readyJobs ++= getReadyJobs() } deleteIntermediateOutputs() @@ -356,6 +364,8 @@ class QGraph extends Logging { } } + private def nextRunningCheck(lastRunningCheck: Long) = 0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck)) + /** * Updates the status of edges in the graph. * @param cleanOutputs If true will delete outputs when setting edges to pending. @@ -388,6 +398,22 @@ class QGraph extends Logging { } } + /** + * Returns the graph depth for the function. + * @param edge Function edge to get the edge for. + * @return the graph depth for the function. + */ + private def graphDepth(edge: FunctionEdge): Int = { + if (edge.depth < 0) { + val previous = previousFunctions(edge) + if (previous.size == 0) + edge.depth = 0 + else + edge.depth = previous.map(f => graphDepth(f)).max + 1 + } + edge.depth + } + /** * From the previous edges, resets any that are marked as skipped to pending. * If those that are reset have skipped edges, those skipped edges are recursively also set @@ -415,7 +441,7 @@ class QGraph extends Logging { } private def emailFailedJobs(failed: List[FunctionEdge]) { - if (running && settings.statusEmailTo.size > 0) { + if (settings.statusEmailTo.size > 0) { val emailMessage = new EmailMessage emailMessage.from = settings.statusEmailFrom emailMessage.to = settings.statusEmailTo @@ -700,32 +726,10 @@ class QGraph extends Logging { jobGraph.edgeSet.toList .filter(_.isInstanceOf[FunctionEdge]) .asInstanceOf[List[FunctionEdge]] - .sortWith(compare(_,_)) + .sorted(functionOrdering) .foreach(edge => if (running) f(edge)) } - private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean = - compare(f1.function, f2.function) - - private def compare(f1: QFunction, f2: QFunction): Boolean = { - val len1 = f1.addOrder.size - val len2 = f2.addOrder.size - val len = len1 min len2 - - for (i <- 0 until len) { - val order1 = f1.addOrder(i) - val order2 = f2.addOrder(i) - if (order1 < order2) - return true - if (order1 > order2) - return false - } - if (len1 < len2) - return true - else - return false - } - /** * Utility function for running a method over all functions, but traversing the nodes in order of dependency. * @param edgeFunction Function to run for each FunctionEdge. @@ -800,6 +804,25 @@ class QGraph extends Logging { }) } + + private def updateStatus() { + val runners = runningJobs.map(_.runner) + for (mgr <- managers) { + if (mgr != null) { + val manager = mgr.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] + val managerRunners = runners + .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) + .asInstanceOf[Set[JobRunner[QFunction]]] + if (managerRunners.size > 0) + try { + manager.updateStatus(managerRunners.toList) + } catch { + case e => /* ignore */ + } + } + } + } + /** * Returns true if the graph was shutdown instead of exiting on its own. */ @@ -813,26 +836,29 @@ class QGraph extends Logging { running = false // Wait for the thread to finish and exit normally. runningLock.synchronized { - val runners = getRunningJobs.map(_.runner) - val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] - if (manager != null) { - val managerRunners = runners - .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) - .asInstanceOf[List[JobRunner[QFunction]]] - if (managerRunners.size > 0) - try { - manager.tryStop(managerRunners) - } catch { - case e => /* ignore */ + val runners = runningJobs.map(_.runner) + runningJobs = Set.empty[FunctionEdge] + for (mgr <- managers) { + if (mgr != null) { + val manager = mgr.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] + val managerRunners = runners + .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) + .asInstanceOf[Set[JobRunner[QFunction]]] + if (managerRunners.size > 0) + try { + manager.tryStop(managerRunners.toList) + } catch { + case e => /* ignore */ + } + for (runner <- managerRunners) { + try { + runner.removeTemporaryFiles() + } catch { + case e => /* ignore */ + } } - } - runners.foreach(runner => - try { - runner.removeTemporaryFiles() - } catch { - case e => /* ignore */ } - ) + } } } } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala index d880fa524..f1fd4057b 100755 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/ContigScatterFunction.scala @@ -35,12 +35,8 @@ class ContigScatterFunction extends GATKScatterFunction with InProcessFunction { // Include unmapped reads by default. this.includeUnmapped = true - protected override def maxIntervals = { - if (this.intervalFilesExist) - IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, true) - else - this.scatterCount - } + protected override def maxIntervals = + IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, true) def run() { IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, true) diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala index 953fe0830..1cef4959d 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala @@ -25,23 +25,24 @@ package org.broadinstitute.sting.queue.extensions.gatk import java.io.File -import org.broadinstitute.sting.queue.function.scattergather.CloneFunction import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction} +import org.broadinstitute.sting.commandline.Output +import util.Random /** * An scatter function that uses the Distributed GATK. */ -class DistributedScatterFunction extends GATKScatterFunction with InProcessFunction { - private final val processingTracker = "processingTracker" +class DistributedScatterFunction extends ScatterFunction with InProcessFunction { + @Output(doc="processingTracker") + var processingTracker: File = _ - this.scatterOutputFiles = List(new File(processingTracker)) - - override def initCloneInputs(cloneFunction: CloneFunction, index: Int) { - cloneFunction.setFieldValue("processingTracker", new File(this.commandDirectory, this.processingTracker)) + override def init() { + this.processingTracker = new File(this.commandDirectory, "processingTracker.%8d".format(Random.nextInt(100000000))) } - override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) { - /* no further work needed after init. */ + override def initCloneInputs(cloneFunction: CloneFunction, index: Int) { + cloneFunction.setFieldValue("processingTracker", this.processingTracker) } def run() { diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala index 4abac23d3..f753cd341 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala @@ -28,64 +28,53 @@ import org.broadinstitute.sting.utils.interval.IntervalUtils import java.io.File import collection.JavaConversions._ import org.broadinstitute.sting.queue.util.IOUtils -import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction} +import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterFunction} import org.broadinstitute.sting.commandline.Output trait GATKScatterFunction extends ScatterFunction { - /** The total number of clone jobs that will be created. */ - var scatterCount: Int = _ - - /** The reference sequence for the GATK function. */ - protected var referenceSequence: File = _ - /** The runtime field to set for specifying an interval file. */ private final val intervalsField = "intervals" /** The runtime field to set for specifying an interval string. */ private final val intervalsStringField = "intervalsString" + @Output(doc="Scatter function outputs") + var scatterOutputFiles: List[File] = Nil + + /** The original GATK function. */ + protected var originalGATK: CommandLineGATK = _ + + /** The reference sequence for the GATK function. */ + protected var referenceSequence: File = _ + /** The list of interval files ("/path/to/interval.list") or interval strings ("chr1", "chr2") to parse into smaller parts. */ protected var intervals: List[String] = Nil /** Whether the last scatter job should also include any unmapped reads. */ protected var includeUnmapped: Boolean = _ - @Output(doc="Scatter function outputs") - var scatterOutputFiles: List[File] = Nil + /** The total number of clone jobs that will be created. */ + override def scatterCount = if (intervalFilesExist) super.scatterCount min this.maxIntervals else super.scatterCount - /** - * Checks if the function is scatter gatherable. - * @param originalFunction Function to check. - * @return true if the function is a GATK function with the reference sequence set. - * @throws IllegalArgumentException if -BTI or -BTIMR are set. QScripts should not try to scatter gather with those option set. - */ - override def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = { - val gatk = originalFunction.asInstanceOf[CommandLineGATK] - if (gatk.BTI != null && gatk.BTIMR == null) - throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)") - gatk.reference_sequence != null - } - - /** - * Sets the scatter gatherable function. - * @param originalFunction Function to bind. - */ - override def setScatterGatherable(originalFunction: ScatterGatherableFunction) = { - val gatk = originalFunction.asInstanceOf[CommandLineGATK] - this.referenceSequence = gatk.reference_sequence - if (gatk.intervals.isEmpty && gatk.intervalsString.isEmpty) { + override def init() { + this.originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK] + this.referenceSequence = this.originalGATK.reference_sequence + if (this.originalGATK.intervals.isEmpty && this.originalGATK.intervalsString.isEmpty) { this.intervals ++= IntervalUtils.distinctContigs(this.referenceSequence).toList } else { - this.intervals ++= gatk.intervals.map(_.toString) - this.intervals ++= gatk.intervalsString.filterNot(interval => IntervalUtils.isUnmapped(interval)) - this.includeUnmapped = gatk.intervalsString.exists(interval => IntervalUtils.isUnmapped(interval)) + this.intervals ++= this.originalGATK.intervals.map(_.toString) + this.intervals ++= this.originalGATK.intervalsString.filterNot(interval => IntervalUtils.isUnmapped(interval)) + this.includeUnmapped = this.originalGATK.intervalsString.exists(interval => IntervalUtils.isUnmapped(interval)) } - - this.scatterCount = originalFunction.scatterCount - this.scatterCount = this.scatterCount min this.maxIntervals } - def initCloneInputs(cloneFunction: CloneFunction, index: Int) = { + override def isScatterGatherable = { + if (this.originalGATK.BTI != null && this.originalGATK.BTIMR.isEmpty) + throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)") + this.originalGATK.reference_sequence != null + } + + override def initCloneInputs(cloneFunction: CloneFunction, index: Int) { cloneFunction.setFieldValue(this.intervalsField, List(new File("scatter.intervals"))) if (index == this.scatterCount && this.includeUnmapped) cloneFunction.setFieldValue(this.intervalsStringField, List("unmapped")) @@ -93,7 +82,7 @@ trait GATKScatterFunction extends ScatterFunction { cloneFunction.setFieldValue(this.intervalsStringField, List.empty[String]) } - def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = { + override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) { val scatterPart = cloneFunction.getFieldValue(this.intervalsField) .asInstanceOf[List[File]] .map(file => IOUtils.absolute(cloneFunction.commandDirectory, file)) @@ -112,5 +101,5 @@ trait GATKScatterFunction extends ScatterFunction { * Returns the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time. * @return the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time. */ - protected def maxIntervals = this.scatterCount + protected def maxIntervals: Int } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index 94d284877..a651b3c12 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -32,12 +32,8 @@ import org.broadinstitute.sting.queue.function.InProcessFunction * An interval scatter function. */ class IntervalScatterFunction extends GATKScatterFunction with InProcessFunction { - protected override def maxIntervals = { - if (this.intervalFilesExist) - IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, false) - else - this.scatterCount - } + protected override def maxIntervals = + IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, false) def run() { IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, false) diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala index 95f311505..9ee7b8087 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -1,17 +1,37 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.extensions.gatk -import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, GatherFunction} +import org.broadinstitute.sting.queue.function.scattergather.GatherFunction /** * Merges a vcf text file. */ class VcfGatherFunction extends CombineVariants with GatherFunction { - private var originalGATK: CommandLineGATK = _ - - override def setScatterGatherable(originalFunction: ScatterGatherableFunction) { - this.originalGATK = originalFunction.asInstanceOf[CommandLineGATK] - } + private lazy val originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK] override def freezeFieldValues = { this.memoryLimit = Some(1) diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index bb983e6c0..fa627a7de 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -1,8 +1,31 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.function.scattergather import org.broadinstitute.sting.commandline.ArgumentSource import java.io.File -import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** @@ -10,7 +33,7 @@ import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} */ class CloneFunction extends CommandLineFunction { var originalFunction: ScatterGatherableFunction = _ - var index: Int = _ + var cloneIndex: Int = _ private var overriddenFields = Map.empty[ArgumentSource, Any] private var withScatterPartCount = 0 diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index cd60cbe02..16d31b777 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -10,20 +10,21 @@ import org.broadinstitute.sting.queue.util.IOUtils * Base class for Gather command line functions. */ trait GatherFunction extends QFunction { + var originalFunction: ScatterGatherableFunction = _ + + @Output(doc="The original output of the scattered function") + var originalOutput: File = _ + @Input(doc="Parts to gather back into the original output") var gatherParts: List[File] = Nil @Input(doc="Other log files that will be gathered before this output", required=false) var originalLogFiles: List[File] = Nil - @Output(doc="The original output of the scattered function") - var originalOutput: File = _ - /** - * Sets the original ScatterGatherableFunction to be gathered. - * @param originalFunction The original function to with inputs bind to this scatter function. + * Called to initialize the gather function values after all other values have been setup for this function. */ - def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} + def init() {} /** * Waits for gather parts to propagate over NFS or throws an exception. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index 2639521d9..632e2d39f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.function.scattergather import java.io.File @@ -8,26 +32,27 @@ import org.broadinstitute.sting.queue.function.QFunction * Base class for Scatter functions. */ trait ScatterFunction extends QFunction { + var originalFunction: ScatterGatherableFunction = _ + @Input(doc="Original inputs to scatter") var originalInputs: Set[File] = _ /** - * Returns true if the scatter function can scatter this original function. - * @param originalFunction The original function to check. - * @return true if the scatter function can scatter this original function. - */ - def isScatterGatherable(originalFunction: ScatterGatherableFunction) = true - - /** - * Sets the original ScatterGatherableFunction to be scattered. + * Called to initialize scatter function values after all other values have been setup for this function. * @param originalFunction The original function to with inputs bind to this scatter function. */ - def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} + def init() {} /** - * After a call to setScatterGatherable(), returns the number of clones that should be created. + * Returns true if the scatter function can scatter this original function. + * @return true if the scatter function can scatter this original function. */ - def scatterCount: Int + def isScatterGatherable = true + + /** + * Returns the number of clones that should be created. + */ + def scatterCount: Int = originalFunction.scatterCount /** * Initializes the input fields for the clone function. @@ -36,7 +61,7 @@ trait ScatterFunction extends QFunction { * @param cloneFunction CloneFunction to initialize. * @param index The one based scatter index. */ - def initCloneInputs(cloneFunction: CloneFunction, index: Int) + def initCloneInputs(cloneFunction: CloneFunction, index: Int) {} /** * Binds the input fields for the clone function to this scatter function. @@ -45,5 +70,5 @@ trait ScatterFunction extends QFunction { * @param cloneFunction CloneFunction to bind. * @param index The one based scatter index. */ - def bindCloneInputs(cloneFunction: CloneFunction, index: Int) + def bindCloneInputs(cloneFunction: CloneFunction, index: Int) {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 202d7230a..e9f034535 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.function.scattergather import java.io.File @@ -53,7 +77,40 @@ trait ScatterGatherableFunction extends CommandLineFunction { * and that the scatter function can scatter this instance. * @return true if the function is ready to be scatter / gathered. */ - def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable(this) + def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable + + /** + * Sets the scatter gather directory to the command directory if it is not already set. + */ + override def freezeFieldValues = { + super.freezeFieldValues + + if (this.scatterGatherDirectory == null) { + if (qSettings.jobScatterGatherDirectory != null) { + this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory) + } else { + this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather") + } + } + } + + /** + * The scatter function. + */ + private lazy val scatterFunction = { + val scatterFunction = newScatterFunction() + this.copySettingsTo(scatterFunction) + scatterFunction.originalFunction = this + scatterFunction.originalInputs = this.inputs + scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter") + scatterFunction.isIntermediate = true + scatterFunction.addOrder = this.addOrder :+ 1 + + initScatterFunction(scatterFunction) + scatterFunction.absoluteCommandDirectory() + scatterFunction.init() + scatterFunction + } /** * Returns a list of scatter / gather and clones of this function @@ -70,14 +127,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_)) // Create the scatter function based on @Scatter - this.copySettingsTo(scatterFunction) - scatterFunction.addOrder = this.addOrder :+ 1 - scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter") - scatterFunction.originalInputs = this.inputs - scatterFunction.isIntermediate = true - scatterFunction.setScatterGatherable(this) - initScatterFunction(scatterFunction) - scatterFunction.absoluteCommandDirectory() functions :+= scatterFunction // Ask the scatter function how many clones to create. @@ -93,20 +142,13 @@ trait ScatterGatherableFunction extends CommandLineFunction { var gatherOutputs = Map.empty[ArgumentSource, File] var gatherAddOrder = numClones + 2 for (gatherField <- outputFieldsWithValues) { - val gatherFunction = this.newGatherFunction(gatherField) val gatherOutput = getFieldFile(gatherField) - this.copySettingsTo(gatherFunction) - gatherFunction.addOrder = this.addOrder :+ gatherAddOrder - gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) - gatherFunction.originalOutput = gatherOutput - gatherFunction.setScatterGatherable(this) - initGatherFunction(gatherFunction, gatherField) - gatherFunction.absoluteCommandDirectory() - functions :+= gatherFunction - gatherFunctions += gatherField -> gatherFunction - gatherOutputs += gatherField -> gatherOutput - gatherAddOrder += 1 + val gatherFunction = this.newGatherFunction(gatherField) + this.copySettingsTo(gatherFunction) + gatherFunction.originalFunction = this + gatherFunction.originalOutput = gatherOutput + gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) // If this is a gather for a log file, make the gather intermediate just in case the log file name changes // Otherwise have the regular output function wait on the log files to gather if (isLogFile(gatherOutput)) { @@ -117,6 +159,17 @@ trait ScatterGatherableFunction extends CommandLineFunction { } else { gatherFunction.originalLogFiles = logFiles } + gatherFunction.addOrder = this.addOrder :+ gatherAddOrder + + initGatherFunction(gatherFunction, gatherField) + gatherFunction.absoluteCommandDirectory() + gatherFunction.init() + + functions :+= gatherFunction + gatherFunctions += gatherField -> gatherFunction + gatherOutputs += gatherField -> gatherOutput + + gatherAddOrder += 1 } // Create the clone functions for running the parallel jobs @@ -126,12 +179,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(cloneFunction) cloneFunction.originalFunction = this - cloneFunction.index = i + cloneFunction.cloneIndex = i + cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i) cloneFunction.addOrder = this.addOrder :+ (i+1) cloneFunction.isIntermediate = true // Setup the fields on the clone function, outputting each as a relative file in the sg directory. - cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i) scatterFunction.initCloneInputs(cloneFunction, i) for (gatherField <- outputFieldsWithValues) { val gatherPart = new File(gatherOutputs(gatherField).getName) @@ -160,21 +213,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { functions } - /** - * Sets the scatter gather directory to the command directory if it is not already set. - */ - override def freezeFieldValues = { - super.freezeFieldValues - - if (this.scatterGatherDirectory == null) { - if (qSettings.jobScatterGatherDirectory != null) { - this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory) - } else { - this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather") - } - } - } - /** * Creates a new ScatterFunction. * @return A ScatterFunction instantiated scatterClass @@ -242,11 +280,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.setupCloneFunction(cloneFunction, index) } - /** - * The scatter function. - */ - private lazy val scatterFunction = newScatterFunction() - /** * Returns a temporary directory under this scatter gather directory. * @param Sub directory under the scatter gather directory.