diff --git a/scala/src/org/broadinstitute/sting/queue/QArguments.scala b/scala/src/org/broadinstitute/sting/queue/QArguments.scala index 869ef9adf..5c921231b 100755 --- a/scala/src/org/broadinstitute/sting/queue/QArguments.scala +++ b/scala/src/org/broadinstitute/sting/queue/QArguments.scala @@ -9,6 +9,7 @@ import java.util.Properties class QArguments(args: Array[String]) { var bsubAllJobs = false + var bsubWaitJobs = false var dryRun = false val scripts = new ListBuffer[String] var inputPaths = List.empty[File] @@ -28,6 +29,8 @@ class QArguments(args: Array[String]) { dryRun = true if (isFlagged(filtered, "-bsub")) bsubAllJobs = true + if (isFlagged(filtered, "-bsubWait")) + bsubWaitJobs = true for (arg <- getArgs(filtered, "-P")) addProperties(arg) for (arg <- getArgs(filtered, "-I")) diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index a1ff6d29b..aa2861e17 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -97,6 +97,8 @@ object QScript { val qGraph = new QGraph qGraph.dryRun = qArgs.dryRun qGraph.bsubAllJobs = qArgs.bsubAllJobs + qGraph.bsubWaitJobs = qArgs.bsubWaitJobs + qGraph.properties = qArgs.properties for (function <- functions) qGraph.add(function) qGraph.fillIn diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala index 5df3d17b0..88f48c1e5 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -2,15 +2,28 @@ package org.broadinstitute.sting.queue.engine import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction} +import scala.collection.immutable.ListSet trait DispatchJobRunner { type DispatchJobType private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType] + private var waitJobsByGraph = Map.empty[QGraph, ListSet[DispatchJobType]] + /** + * Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction + * in which case it waits for all other terminal functions to complete. + */ def dispatch(function: DispatchFunction, qGraph: QGraph) - protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) = + protected def addJob(function: DispatchFunction, qGraph: QGraph, + dispatchJob: DispatchJobType, previousJobs: List[DispatchJobType]) = { dispatchJobs += function -> dispatchJob + var waitJobs = getWaitJobs(qGraph) + for (previousJob <- previousJobs) + waitJobs -= previousJob + waitJobs += dispatchJob + waitJobsByGraph += qGraph -> waitJobs + } /** * Walks up the graph looking for the previous LsfJobs @@ -31,4 +44,13 @@ trait DispatchJobRunner { } previous } + + /** + * Returns a set of jobs that have no following jobs in the graph. + */ + protected def getWaitJobs(qGraph: QGraph) = { + if (!waitJobsByGraph.contains(qGraph)) + waitJobsByGraph += qGraph -> ListSet.empty[DispatchJobType] + waitJobsByGraph(qGraph) + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 5f29f35f0..dc0780527 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -2,9 +2,9 @@ package org.broadinstitute.sting.queue.engine import collection.JavaConversions._ import edu.mit.broad.core.lsf.LocalLsfJob -import org.broadinstitute.sting.queue.function.DispatchFunction import java.util.ArrayList import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, DispatchFunction} trait LsfJobRunner extends DispatchJobRunner with Logging { type DispatchJobType = LocalLsfJob @@ -24,13 +24,20 @@ trait LsfJobRunner extends DispatchJobRunner with Logging { if (function.memoryLimit.isDefined) extraArgs :::= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - val previous = previousJobs(function, qGraph) + val previous = + if (function.isInstanceOf[DispatchWaitFunction]) { + extraArgs :+= "-K" + getWaitJobs(qGraph).toList + } else { + previousJobs(function, qGraph) + } + if (previous.size > 0) extraArgs :::= List("-w", dependencyExpression(previous)) job.setExtraBsubArgs(new ArrayList(extraArgs)) - addJob(function, job) + addJob(function, qGraph, job, previous) if (logger.isDebugEnabled) { logger.debug(function.commandDirectory + " > " + job.getBsubCommand.mkString(" ")) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index b952e13cf..5ebaba6d9 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -13,6 +13,8 @@ import org.jgrapht.EdgeFactory class QGraph extends Logging { var dryRun = true var bsubAllJobs = false + var bsubWaitJobs = false + var properties = Map.empty[String, String] val jobGraph = newGraph def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala index 3e2c5a46c..0831e184f 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala @@ -4,7 +4,7 @@ import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter} import collection.JavaConversions._ import org.broadinstitute.sting.queue.util.Logging -import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, DispatchFunction, QFunction} +import org.broadinstitute.sting.queue.function._ /** * Loops over the job graph running jobs as the edges are traversed @@ -35,5 +35,13 @@ abstract class TopologicalJobScheduler(private val qGraph: QGraph) } if (logger.isTraceEnabled) logger.trace("Done walking %s nodes.".format(numNodes)) + + if (qGraph.bsubAllJobs && qGraph.bsubWaitJobs) { + logger.info("Waiting for jobs to complete.") + val wait = new DispatchWaitFunction + wait.properties = qGraph.properties + wait.freeze + dispatch(wait, qGraph) + } } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala new file mode 100644 index 000000000..6bcafa87a --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/function/DispatchWaitFunction.scala @@ -0,0 +1,11 @@ +package org.broadinstitute.sting.queue.function + +import java.io.File + +class DispatchWaitFunction extends CommandLineFunction { + def commandLine = "echo" + + jobQueue = "short" + jobOutputFile = File.createTempFile("Q-wait", ".out") + jobErrorFile = File.createTempFile("Q-wait", ".err") +}