From b045f2d4aa4db357044ad736ebc05a2db73f5ab7 Mon Sep 17 00:00:00 2001 From: Johan Dahlberg Date: Thu, 7 May 2015 15:51:54 +0200 Subject: [PATCH 1/2] ParallelShell added as a new JobRunner The ParallelShell job runner will run jobs locally on one node concurrently as specified by the DAG, with the option to limit the maximum number of concurrently running jobs using the flag `maximumNumberOfJobsToRunConcurrently`. Signed-off-by: Khalid Shakir --- .../gatk/queue/engine/QGraph.scala | 15 +- .../gatk/queue/engine/QGraphSettings.scala | 4 + .../ParallelShellJobManager.scala | 70 ++++++++ .../ParallelShellJobRunner.scala | 156 ++++++++++++++++++ .../ThreadSafeProcessController.scala | 81 +++++++++ 5 files changed, 325 insertions(+), 1 deletion(-) create mode 100644 public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala create mode 100644 public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala create mode 100644 public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala index 6a3e600bb..27dbc346f 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala @@ -434,7 +434,20 @@ class QGraph extends Logging { var doneJobs = Set.empty[FunctionEdge] var failedJobs = Set.empty[FunctionEdge] - while (running && readyJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) { + def startJobs: Boolean = { + + def canRunMoreConcurrentJobs: Boolean = + if(settings.maximumNumberOfConcurrentJobs > 0) + runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs + else + true + + running && readyJobs.size > 0 && + !readyRunningCheck(lastRunningCheck) && + canRunMoreConcurrentJobs + } + + while (startJobs) { val edge = readyJobs.head edge.runner = newRunner(edge.function) edge.start() diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala index eb610aa28..d6be1602b 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala @@ -80,6 +80,10 @@ class QGraphSettings { @Argument(fullName="disableJobReport", shortName="disableJobReport", doc="If provided, we will not create a job report", required=false) var disableJobReport: Boolean = false + @Advanced + @Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false) + var maximumNumberOfConcurrentJobs: Int = -1 + @ArgumentCollection val emailSettings = new EmailSettings diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala new file mode 100644 index 000000000..64d51c123 --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala @@ -0,0 +1,70 @@ +/* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.gatk.queue.engine.parallelshell + +import org.broadinstitute.gatk.queue.function.CommandLineFunction +import org.broadinstitute.gatk.queue.engine.CommandLineJobManager + +/** + * Runs multiple jobs locally without blocking. + * Use this with care as it might not be the most efficient way to run things. + * However, for some scenarios, such as running multiple single threaded + * programs concurrently it can be quite useful. + * + * All this code is based on the normal shell runner in GATK Queue and all + * credits for everything except the concurrency part goes to the GATK team. + * + * @author Johan Dahlberg + * + */ +class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunner] { + + def runnerType = classOf[ParallelShellJobRunner] + + /** + * Create new ParallelShellJobRunner + * @param function Function for the runner. + * @return a new ParallelShellJobRunner instance + */ + def create(function: CommandLineFunction) = + new ParallelShellJobRunner(function) + + /** + * Update the status of the specified jobrunners. + * @param runners Runners to update. + * @return runners which were updated. + */ + override def updateStatus( + runners: Set[ParallelShellJobRunner]): Set[ParallelShellJobRunner] = + runners.filter { runner => runner.updateJobStatus() } + + /** + * Stop the specified runners. + * @param runners Runners to stop. + */ + override def tryStop(runners: Set[ParallelShellJobRunner]) = + runners.foreach(_.tryStop()) +} \ No newline at end of file diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala new file mode 100644 index 000000000..ae9ffeea2 --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala @@ -0,0 +1,156 @@ +/* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.gatk.queue.engine.parallelshell + +import org.broadinstitute.gatk.queue.function.CommandLineFunction +import org.broadinstitute.gatk.queue.engine.{ RunnerStatus, CommandLineJobRunner } +import java.util.Date +import org.broadinstitute.gatk.utils.Utils +import org.broadinstitute.gatk.utils.runtime.{ ProcessSettings, OutputStreamSettings } +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.util.{ Success, Failure } +import org.broadinstitute.gatk.queue.util.Logging + +/** + * Runs multiple jobs locally without blocking. + * Use this with care as it might not be the most efficient way to run things. + * However, for some scenarios, such as running multiple single threaded + * programs concurrently it can be quite useful. + * + * All this code is based on the normal shell runner in GATK Queue and all + * credits for everything except the concurrency part goes to the GATK team. + * + * @author Johan Dahlberg - 20150611 + * + * @param function Command to run. + */ +class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { + + // Controller on the thread that started the job + val controller: ThreadSafeProcessController = new ThreadSafeProcessController() + + // Once the application exits this promise will be fulfilled. + val finalExitStatus = Promise[Int]() + + /** + * Runs the function on the local shell. + */ + def start() { + val commandLine = Array("sh", jobScript.getAbsolutePath) + val stdoutSettings = new OutputStreamSettings + val stderrSettings = new OutputStreamSettings + val mergeError = (function.jobErrorFile == null) + + stdoutSettings.setOutputFile(function.jobOutputFile, true) + if (function.jobErrorFile != null) + stderrSettings.setOutputFile(function.jobErrorFile, true) + + if (logger.isDebugEnabled) { + stdoutSettings.printStandard(true) + stderrSettings.printStandard(true) + } + + val processSettings = new ProcessSettings( + commandLine, mergeError, function.commandDirectory, null, + null, stdoutSettings, stderrSettings) + + updateJobRun(processSettings) + + getRunInfo.startTime = new Date() + getRunInfo.exechosts = Utils.resolveHostname() + updateStatus(RunnerStatus.RUNNING) + + // Run the command line process in a future. + val executedFuture = + future { controller.exec(processSettings) } + + // Register a callback on the completion of the future, making sure that + // the status of the job is updated accordingly. + executedFuture.onComplete { tryExitStatus => + + tryExitStatus match { + case Success(exitStatus) => { + logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) + finalExitStatus.success(exitStatus) + getRunInfo.doneTime = new Date() + exitStatusUpdateJobRunnerStatus(exitStatus) + } + case Failure(throwable) => { + logger.debug( + "Failed in return from run with: " + + throwable.getClass.getCanonicalName + " :: " + + throwable.getMessage) + finalExitStatus.failure(throwable) + getRunInfo.doneTime = new Date() + updateStatus(RunnerStatus.FAILED) + } + } + } + } + + /** + * Possibly invoked from a shutdown thread, find and + * stop the controller from the originating thread + */ + def tryStop() = { + try { + controller.tryDestroy() + } catch { + case e: Exception => + logger.error("Unable to kill shell job: " + function.description, e) + } + } + + /** + * Update the status of the runner based on the exit status + * of the process. + */ + def exitStatusUpdateJobRunnerStatus(exitStatus: Int): Unit = { + exitStatus match { + case 0 => updateStatus(RunnerStatus.DONE) + case _ => updateStatus(RunnerStatus.FAILED) + } + } + + /** + * Attempts to get the status of a job by looking at if the finalExitStatus + * promise has completed or not. + * @return if the jobRunner has updated it's status or not. + */ + def updateJobStatus(): Boolean = { + if (finalExitStatus.isCompleted) { + val completedExitStatus = finalExitStatus.future.value.get.get + exitStatusUpdateJobRunnerStatus(completedExitStatus) + true + } else { + // Make sure the status is update here, otherwise Queue will think + // it's lots control over the job and kill it after 5 minutes. + updateStatus(status) + false + } + } +} \ No newline at end of file diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala new file mode 100644 index 000000000..4d2bdd476 --- /dev/null +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala @@ -0,0 +1,81 @@ +package org.broadinstitute.gatk.queue.engine.parallelshell + +import java.io.PrintWriter + +import org.broadinstitute.gatk.queue.util.Logging +import org.broadinstitute.gatk.utils.runtime.ProcessSettings +import scala.sys.process._ + +/** + * + */ +class ThreadSafeProcessController extends Logging { + + private var process: Option[Process] = None + + /** + * Construct a process logger writing the stdout and stderr of the + * process controlled by this instance to the files specified in + * the provided ProcessSettings instance. + * @param processSettings specifiying which files to write to + * @return a process logger which can be used by the `scala.sys.process` + */ + private def getProcessLogger(processSettings: ProcessSettings): ProcessLogger = { + + val (stdOutFile, stdErrFile) = { + + val stdOutFile = processSettings.getStdoutSettings.getOutputFile + + if(processSettings.getStderrSettings.getOutputFile != null) { + val stdErrFile = processSettings.getStderrSettings.getOutputFile + (stdOutFile, stdErrFile) + } else { + (stdOutFile, stdOutFile) + } + + } + + val stdOutPrintWriter = new PrintWriter(stdOutFile) + val stdErrPrintWriter = new PrintWriter(stdErrFile) + + def printToWriter(printWriter: PrintWriter)(line: String): Unit = { + printWriter.println(line) + printWriter.flush() + } + + val stringStdOutPrinterFunc = printToWriter(stdOutPrintWriter) _ + val stringStdErrPrinterFunc = printToWriter(stdErrPrintWriter) _ + + val processLogger = ProcessLogger( + stringStdOutPrinterFunc, + stringStdErrPrinterFunc + ) + + processLogger + } + + /** + * Execute the process specified in process settings + * @param processSettings specifying the commandline to run. + * @return the exit status of the process. + */ + def exec(processSettings: ProcessSettings): Int = { + + val commandLine: ProcessBuilder = processSettings.getCommand.mkString(" ") + logger.debug("Trying to start process: " + commandLine) + process = Some(commandLine.run(getProcessLogger(processSettings))) + process.get.exitValue() + + } + + /** + * Attempt to destroy the underlying process. + */ + def tryDestroy(): Unit = { + logger.debug("Trying to kill process") + process.getOrElse { + throw new IllegalStateException("Tried to kill unstarted job.") + }.destroy() + } + +} \ No newline at end of file From 384a09e991b8140508e3fe003d580253df0833f8 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Mon, 28 Sep 2015 22:45:50 -0300 Subject: [PATCH 2/2] Minor updates to previous ParallelShell commit. Changed `--maximumNumberOfJobsToRunConcurrently`/`-maxConcurrentRun` to `Option[Int]`. Updated licenses. Added basic tests. Removed some IntelliJ warnings. --- .../examples/HelloWorldQueueTest.scala | 19 +++++++++ .../gatk/queue/engine/QGraph.scala | 4 +- .../gatk/queue/engine/QGraphSettings.scala | 5 ++- .../ParallelShellJobManager.scala | 4 +- .../ParallelShellJobRunner.scala | 39 ++++++++----------- .../ThreadSafeProcessController.scala | 27 ++++++++++++- .../gatk/queue/pipeline/QueueTest.scala | 2 +- 7 files changed, 70 insertions(+), 30 deletions(-) diff --git a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala index f72bdee22..1e51d75e2 100644 --- a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala +++ b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala @@ -149,4 +149,23 @@ class HelloWorldQueueTest { spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out") QueueTest.executeTest(spec) } + + @Test(timeOut=36000000) + def testHelloWorldParallelShell() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } + + @Test(timeOut=36000000) + def testHelloWorldParallelShellMaxConcurrentRun() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + + " -maxConcurrentRun 10" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } } diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala index 27dbc346f..a34b9e6f9 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala @@ -437,8 +437,8 @@ class QGraph extends Logging { def startJobs: Boolean = { def canRunMoreConcurrentJobs: Boolean = - if(settings.maximumNumberOfConcurrentJobs > 0) - runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs + if(settings.maximumNumberOfConcurrentJobs.isDefined) + runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs.get else true diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala index d6be1602b..bc068682b 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala @@ -28,7 +28,7 @@ package org.broadinstitute.gatk.queue.engine import java.io.File import org.broadinstitute.gatk.queue.QSettings import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils} -import org.broadinstitute.gatk.utils.commandline.{Advanced, ArgumentCollection, Argument} +import org.broadinstitute.gatk.utils.commandline.{ClassType, Advanced, ArgumentCollection, Argument} /** * Command line options for a QGraph. @@ -81,8 +81,9 @@ class QGraphSettings { var disableJobReport: Boolean = false @Advanced + @ClassType(classOf[Int]) @Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false) - var maximumNumberOfConcurrentJobs: Int = -1 + var maximumNumberOfConcurrentJobs: Option[Int] = None @ArgumentCollection val emailSettings = new EmailSettings diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala index 64d51c123..c200538c6 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala @@ -1,5 +1,5 @@ /* -* Copyright (c) 2012 The Broad Institute +* Copyright 2012-2015 Broad Institute, Inc. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation @@ -67,4 +67,4 @@ class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunn */ override def tryStop(runners: Set[ParallelShellJobRunner]) = runners.foreach(_.tryStop()) -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala index ae9ffeea2..8afb80a25 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala @@ -1,5 +1,5 @@ /* -* Copyright (c) 2012 The Broad Institute +* Copyright 2012-2015 Broad Institute, Inc. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation @@ -63,7 +63,7 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL val commandLine = Array("sh", jobScript.getAbsolutePath) val stdoutSettings = new OutputStreamSettings val stderrSettings = new OutputStreamSettings - val mergeError = (function.jobErrorFile == null) + val mergeError = function.jobErrorFile == null stdoutSettings.setOutputFile(function.jobOutputFile, true) if (function.jobErrorFile != null) @@ -90,25 +90,20 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL // Register a callback on the completion of the future, making sure that // the status of the job is updated accordingly. - executedFuture.onComplete { tryExitStatus => - - tryExitStatus match { - case Success(exitStatus) => { - logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) - finalExitStatus.success(exitStatus) - getRunInfo.doneTime = new Date() - exitStatusUpdateJobRunnerStatus(exitStatus) - } - case Failure(throwable) => { - logger.debug( - "Failed in return from run with: " + - throwable.getClass.getCanonicalName + " :: " + - throwable.getMessage) - finalExitStatus.failure(throwable) - getRunInfo.doneTime = new Date() - updateStatus(RunnerStatus.FAILED) - } - } + executedFuture.onComplete { + case Success(exitStatus) => + logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) + finalExitStatus.success(exitStatus) + getRunInfo.doneTime = new Date() + exitStatusUpdateJobRunnerStatus(exitStatus) + case Failure(throwable) => + logger.debug( + "Failed in return from run with: " + + throwable.getClass.getCanonicalName + " :: " + + throwable.getMessage) + finalExitStatus.failure(throwable) + getRunInfo.doneTime = new Date() + updateStatus(RunnerStatus.FAILED) } } @@ -153,4 +148,4 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL false } } -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala index 4d2bdd476..4bf3f994c 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala @@ -1,3 +1,28 @@ +/* +* Copyright 2012-2015 Broad Institute, Inc. +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + package org.broadinstitute.gatk.queue.engine.parallelshell import java.io.PrintWriter @@ -78,4 +103,4 @@ class ThreadSafeProcessController extends Logging { }.destroy() } -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala index eb5b14f27..ed1c1c77f 100644 --- a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala +++ b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala @@ -53,7 +53,7 @@ object QueueTest extends BaseTest with Logging { /** * All the job runners configured to run QueueTests at The Broad. */ - final val allJobRunners = Seq("GridEngine", "Shell") + final val allJobRunners = Seq("GridEngine", "Shell", "ParallelShell") /** * The default job runners to run.