diff --git a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala index f72bdee22..1e51d75e2 100644 --- a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala +++ b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala @@ -149,4 +149,23 @@ class HelloWorldQueueTest { spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out") QueueTest.executeTest(spec) } + + @Test(timeOut=36000000) + def testHelloWorldParallelShell() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } + + @Test(timeOut=36000000) + def testHelloWorldParallelShellMaxConcurrentRun() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + + " -maxConcurrentRun 10" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } } diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala index 6a3e600bb..a34b9e6f9 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala @@ -434,7 +434,20 @@ class QGraph extends Logging { var doneJobs = Set.empty[FunctionEdge] var failedJobs = Set.empty[FunctionEdge] - while (running && readyJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) { + def startJobs: Boolean = { + + def canRunMoreConcurrentJobs: Boolean = + if(settings.maximumNumberOfConcurrentJobs.isDefined) + runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs.get + else + true + + running && readyJobs.size > 0 && + !readyRunningCheck(lastRunningCheck) && + canRunMoreConcurrentJobs + } + + while (startJobs) { val edge = readyJobs.head edge.runner = newRunner(edge.function) edge.start() diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala index eb610aa28..bc068682b 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala @@ -28,7 +28,7 @@ package org.broadinstitute.gatk.queue.engine import java.io.File import org.broadinstitute.gatk.queue.QSettings import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils} -import org.broadinstitute.gatk.utils.commandline.{Advanced, ArgumentCollection, Argument} +import org.broadinstitute.gatk.utils.commandline.{ClassType, Advanced, ArgumentCollection, Argument} /** * Command line options for a QGraph. @@ -80,6 +80,11 @@ class QGraphSettings { @Argument(fullName="disableJobReport", shortName="disableJobReport", doc="If provided, we will not create a job report", required=false) var disableJobReport: Boolean = false + @Advanced + @ClassType(classOf[Int]) + @Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false) + var maximumNumberOfConcurrentJobs: Option[Int] = None + @ArgumentCollection val emailSettings = new EmailSettings diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala new file mode 100644 index 000000000..c200538c6 --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala @@ -0,0 +1,70 @@ +/* +* Copyright 2012-2015 Broad Institute, Inc. +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.gatk.queue.engine.parallelshell + +import org.broadinstitute.gatk.queue.function.CommandLineFunction +import org.broadinstitute.gatk.queue.engine.CommandLineJobManager + +/** + * Runs multiple jobs locally without blocking. + * Use this with care as it might not be the most efficient way to run things. + * However, for some scenarios, such as running multiple single threaded + * programs concurrently it can be quite useful. + * + * All this code is based on the normal shell runner in GATK Queue and all + * credits for everything except the concurrency part goes to the GATK team. + * + * @author Johan Dahlberg + * + */ +class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunner] { + + def runnerType = classOf[ParallelShellJobRunner] + + /** + * Create new ParallelShellJobRunner + * @param function Function for the runner. + * @return a new ParallelShellJobRunner instance + */ + def create(function: CommandLineFunction) = + new ParallelShellJobRunner(function) + + /** + * Update the status of the specified jobrunners. + * @param runners Runners to update. + * @return runners which were updated. + */ + override def updateStatus( + runners: Set[ParallelShellJobRunner]): Set[ParallelShellJobRunner] = + runners.filter { runner => runner.updateJobStatus() } + + /** + * Stop the specified runners. + * @param runners Runners to stop. + */ + override def tryStop(runners: Set[ParallelShellJobRunner]) = + runners.foreach(_.tryStop()) +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala new file mode 100644 index 000000000..8afb80a25 --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala @@ -0,0 +1,151 @@ +/* +* Copyright 2012-2015 Broad Institute, Inc. +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.gatk.queue.engine.parallelshell + +import org.broadinstitute.gatk.queue.function.CommandLineFunction +import org.broadinstitute.gatk.queue.engine.{ RunnerStatus, CommandLineJobRunner } +import java.util.Date +import org.broadinstitute.gatk.utils.Utils +import org.broadinstitute.gatk.utils.runtime.{ ProcessSettings, OutputStreamSettings } +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.util.{ Success, Failure } +import org.broadinstitute.gatk.queue.util.Logging + +/** + * Runs multiple jobs locally without blocking. + * Use this with care as it might not be the most efficient way to run things. + * However, for some scenarios, such as running multiple single threaded + * programs concurrently it can be quite useful. + * + * All this code is based on the normal shell runner in GATK Queue and all + * credits for everything except the concurrency part goes to the GATK team. + * + * @author Johan Dahlberg - 20150611 + * + * @param function Command to run. + */ +class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { + + // Controller on the thread that started the job + val controller: ThreadSafeProcessController = new ThreadSafeProcessController() + + // Once the application exits this promise will be fulfilled. + val finalExitStatus = Promise[Int]() + + /** + * Runs the function on the local shell. + */ + def start() { + val commandLine = Array("sh", jobScript.getAbsolutePath) + val stdoutSettings = new OutputStreamSettings + val stderrSettings = new OutputStreamSettings + val mergeError = function.jobErrorFile == null + + stdoutSettings.setOutputFile(function.jobOutputFile, true) + if (function.jobErrorFile != null) + stderrSettings.setOutputFile(function.jobErrorFile, true) + + if (logger.isDebugEnabled) { + stdoutSettings.printStandard(true) + stderrSettings.printStandard(true) + } + + val processSettings = new ProcessSettings( + commandLine, mergeError, function.commandDirectory, null, + null, stdoutSettings, stderrSettings) + + updateJobRun(processSettings) + + getRunInfo.startTime = new Date() + getRunInfo.exechosts = Utils.resolveHostname() + updateStatus(RunnerStatus.RUNNING) + + // Run the command line process in a future. + val executedFuture = + future { controller.exec(processSettings) } + + // Register a callback on the completion of the future, making sure that + // the status of the job is updated accordingly. + executedFuture.onComplete { + case Success(exitStatus) => + logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) + finalExitStatus.success(exitStatus) + getRunInfo.doneTime = new Date() + exitStatusUpdateJobRunnerStatus(exitStatus) + case Failure(throwable) => + logger.debug( + "Failed in return from run with: " + + throwable.getClass.getCanonicalName + " :: " + + throwable.getMessage) + finalExitStatus.failure(throwable) + getRunInfo.doneTime = new Date() + updateStatus(RunnerStatus.FAILED) + } + } + + /** + * Possibly invoked from a shutdown thread, find and + * stop the controller from the originating thread + */ + def tryStop() = { + try { + controller.tryDestroy() + } catch { + case e: Exception => + logger.error("Unable to kill shell job: " + function.description, e) + } + } + + /** + * Update the status of the runner based on the exit status + * of the process. + */ + def exitStatusUpdateJobRunnerStatus(exitStatus: Int): Unit = { + exitStatus match { + case 0 => updateStatus(RunnerStatus.DONE) + case _ => updateStatus(RunnerStatus.FAILED) + } + } + + /** + * Attempts to get the status of a job by looking at if the finalExitStatus + * promise has completed or not. + * @return if the jobRunner has updated it's status or not. + */ + def updateJobStatus(): Boolean = { + if (finalExitStatus.isCompleted) { + val completedExitStatus = finalExitStatus.future.value.get.get + exitStatusUpdateJobRunnerStatus(completedExitStatus) + true + } else { + // Make sure the status is update here, otherwise Queue will think + // it's lots control over the job and kill it after 5 minutes. + updateStatus(status) + false + } + } +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala new file mode 100644 index 000000000..4bf3f994c --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala @@ -0,0 +1,106 @@ +/* +* Copyright 2012-2015 Broad Institute, Inc. +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.gatk.queue.engine.parallelshell + +import java.io.PrintWriter + +import org.broadinstitute.gatk.queue.util.Logging +import org.broadinstitute.gatk.utils.runtime.ProcessSettings +import scala.sys.process._ + +/** + * + */ +class ThreadSafeProcessController extends Logging { + + private var process: Option[Process] = None + + /** + * Construct a process logger writing the stdout and stderr of the + * process controlled by this instance to the files specified in + * the provided ProcessSettings instance. + * @param processSettings specifiying which files to write to + * @return a process logger which can be used by the `scala.sys.process` + */ + private def getProcessLogger(processSettings: ProcessSettings): ProcessLogger = { + + val (stdOutFile, stdErrFile) = { + + val stdOutFile = processSettings.getStdoutSettings.getOutputFile + + if(processSettings.getStderrSettings.getOutputFile != null) { + val stdErrFile = processSettings.getStderrSettings.getOutputFile + (stdOutFile, stdErrFile) + } else { + (stdOutFile, stdOutFile) + } + + } + + val stdOutPrintWriter = new PrintWriter(stdOutFile) + val stdErrPrintWriter = new PrintWriter(stdErrFile) + + def printToWriter(printWriter: PrintWriter)(line: String): Unit = { + printWriter.println(line) + printWriter.flush() + } + + val stringStdOutPrinterFunc = printToWriter(stdOutPrintWriter) _ + val stringStdErrPrinterFunc = printToWriter(stdErrPrintWriter) _ + + val processLogger = ProcessLogger( + stringStdOutPrinterFunc, + stringStdErrPrinterFunc + ) + + processLogger + } + + /** + * Execute the process specified in process settings + * @param processSettings specifying the commandline to run. + * @return the exit status of the process. + */ + def exec(processSettings: ProcessSettings): Int = { + + val commandLine: ProcessBuilder = processSettings.getCommand.mkString(" ") + logger.debug("Trying to start process: " + commandLine) + process = Some(commandLine.run(getProcessLogger(processSettings))) + process.get.exitValue() + + } + + /** + * Attempt to destroy the underlying process. + */ + def tryDestroy(): Unit = { + logger.debug("Trying to kill process") + process.getOrElse { + throw new IllegalStateException("Tried to kill unstarted job.") + }.destroy() + } + +} diff --git a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala index eb5b14f27..ed1c1c77f 100644 --- a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala +++ b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala @@ -53,7 +53,7 @@ object QueueTest extends BaseTest with Logging { /** * All the job runners configured to run QueueTests at The Broad. */ - final val allJobRunners = Seq("GridEngine", "Shell") + final val allJobRunners = Seq("GridEngine", "Shell", "ParallelShell") /** * The default job runners to run.