diff --git a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala index f72bdee22..1e51d75e2 100644 --- a/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala +++ b/public/gatk-queue-extensions-public/src/test/scala/org/broadinstitute/gatk/queue/pipeline/examples/HelloWorldQueueTest.scala @@ -149,4 +149,23 @@ class HelloWorldQueueTest { spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out") QueueTest.executeTest(spec) } + + @Test(timeOut=36000000) + def testHelloWorldParallelShell() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } + + @Test(timeOut=36000000) + def testHelloWorldParallelShellMaxConcurrentRun() { + val spec = new QueueTestSpec + spec.name = "HelloWorldWithLogDirectory" + spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" + + " -maxConcurrentRun 10" + spec.jobRunners = Seq("ParallelShell") + QueueTest.executeTest(spec) + } } diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala index 27dbc346f..a34b9e6f9 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraph.scala @@ -437,8 +437,8 @@ class QGraph extends Logging { def startJobs: Boolean = { def canRunMoreConcurrentJobs: Boolean = - if(settings.maximumNumberOfConcurrentJobs > 0) - runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs + if(settings.maximumNumberOfConcurrentJobs.isDefined) + runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs.get else true diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala index d6be1602b..bc068682b 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/QGraphSettings.scala @@ -28,7 +28,7 @@ package org.broadinstitute.gatk.queue.engine import java.io.File import org.broadinstitute.gatk.queue.QSettings import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils} -import org.broadinstitute.gatk.utils.commandline.{Advanced, ArgumentCollection, Argument} +import org.broadinstitute.gatk.utils.commandline.{ClassType, Advanced, ArgumentCollection, Argument} /** * Command line options for a QGraph. @@ -81,8 +81,9 @@ class QGraphSettings { var disableJobReport: Boolean = false @Advanced + @ClassType(classOf[Int]) @Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false) - var maximumNumberOfConcurrentJobs: Int = -1 + var maximumNumberOfConcurrentJobs: Option[Int] = None @ArgumentCollection val emailSettings = new EmailSettings diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala index 64d51c123..c200538c6 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobManager.scala @@ -1,5 +1,5 @@ /* -* Copyright (c) 2012 The Broad Institute +* Copyright 2012-2015 Broad Institute, Inc. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation @@ -67,4 +67,4 @@ class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunn */ override def tryStop(runners: Set[ParallelShellJobRunner]) = runners.foreach(_.tryStop()) -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala index ae9ffeea2..8afb80a25 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ParallelShellJobRunner.scala @@ -1,5 +1,5 @@ /* -* Copyright (c) 2012 The Broad Institute +* Copyright 2012-2015 Broad Institute, Inc. * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation @@ -63,7 +63,7 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL val commandLine = Array("sh", jobScript.getAbsolutePath) val stdoutSettings = new OutputStreamSettings val stderrSettings = new OutputStreamSettings - val mergeError = (function.jobErrorFile == null) + val mergeError = function.jobErrorFile == null stdoutSettings.setOutputFile(function.jobOutputFile, true) if (function.jobErrorFile != null) @@ -90,25 +90,20 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL // Register a callback on the completion of the future, making sure that // the status of the job is updated accordingly. - executedFuture.onComplete { tryExitStatus => - - tryExitStatus match { - case Success(exitStatus) => { - logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) - finalExitStatus.success(exitStatus) - getRunInfo.doneTime = new Date() - exitStatusUpdateJobRunnerStatus(exitStatus) - } - case Failure(throwable) => { - logger.debug( - "Failed in return from run with: " + - throwable.getClass.getCanonicalName + " :: " + - throwable.getMessage) - finalExitStatus.failure(throwable) - getRunInfo.doneTime = new Date() - updateStatus(RunnerStatus.FAILED) - } - } + executedFuture.onComplete { + case Success(exitStatus) => + logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) + finalExitStatus.success(exitStatus) + getRunInfo.doneTime = new Date() + exitStatusUpdateJobRunnerStatus(exitStatus) + case Failure(throwable) => + logger.debug( + "Failed in return from run with: " + + throwable.getClass.getCanonicalName + " :: " + + throwable.getMessage) + finalExitStatus.failure(throwable) + getRunInfo.doneTime = new Date() + updateStatus(RunnerStatus.FAILED) } } @@ -153,4 +148,4 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL false } } -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala index 4d2bdd476..4bf3f994c 100644 --- a/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala +++ b/public/gatk-queue/src/main/scala/org/broadinstitute/gatk/queue/engine/parallelshell/ThreadSafeProcessController.scala @@ -1,3 +1,28 @@ +/* +* Copyright 2012-2015 Broad Institute, Inc. +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + package org.broadinstitute.gatk.queue.engine.parallelshell import java.io.PrintWriter @@ -78,4 +103,4 @@ class ThreadSafeProcessController extends Logging { }.destroy() } -} \ No newline at end of file +} diff --git a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala index eb5b14f27..ed1c1c77f 100644 --- a/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala +++ b/public/gatk-queue/src/test/scala/org/broadinstitute/gatk/queue/pipeline/QueueTest.scala @@ -53,7 +53,7 @@ object QueueTest extends BaseTest with Logging { /** * All the job runners configured to run QueueTests at The Broad. */ - final val allJobRunners = Seq("GridEngine", "Shell") + final val allJobRunners = Seq("GridEngine", "Shell", "ParallelShell") /** * The default job runners to run.