Minor updates to previous ParallelShell commit.

Changed `--maximumNumberOfJobsToRunConcurrently`/`-maxConcurrentRun` to `Option[Int]`.
Updated licenses.
Added basic tests.
Removed some IntelliJ warnings.
This commit is contained in:
Khalid Shakir 2015-09-28 22:45:50 -03:00
parent b045f2d4aa
commit 384a09e991
7 changed files with 70 additions and 30 deletions

View File

@ -149,4 +149,23 @@ class HelloWorldQueueTest {
spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out") spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out")
QueueTest.executeTest(spec) QueueTest.executeTest(spec)
} }
@Test(timeOut=36000000)
def testHelloWorldParallelShell() {
val spec = new QueueTestSpec
spec.name = "HelloWorldWithLogDirectory"
spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala"
spec.jobRunners = Seq("ParallelShell")
QueueTest.executeTest(spec)
}
@Test(timeOut=36000000)
def testHelloWorldParallelShellMaxConcurrentRun() {
val spec = new QueueTestSpec
spec.name = "HelloWorldWithLogDirectory"
spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" +
" -maxConcurrentRun 10"
spec.jobRunners = Seq("ParallelShell")
QueueTest.executeTest(spec)
}
} }

View File

@ -437,8 +437,8 @@ class QGraph extends Logging {
def startJobs: Boolean = { def startJobs: Boolean = {
def canRunMoreConcurrentJobs: Boolean = def canRunMoreConcurrentJobs: Boolean =
if(settings.maximumNumberOfConcurrentJobs > 0) if(settings.maximumNumberOfConcurrentJobs.isDefined)
runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs.get
else else
true true

View File

@ -28,7 +28,7 @@ package org.broadinstitute.gatk.queue.engine
import java.io.File import java.io.File
import org.broadinstitute.gatk.queue.QSettings import org.broadinstitute.gatk.queue.QSettings
import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils} import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils}
import org.broadinstitute.gatk.utils.commandline.{Advanced, ArgumentCollection, Argument} import org.broadinstitute.gatk.utils.commandline.{ClassType, Advanced, ArgumentCollection, Argument}
/** /**
* Command line options for a QGraph. * Command line options for a QGraph.
@ -81,8 +81,9 @@ class QGraphSettings {
var disableJobReport: Boolean = false var disableJobReport: Boolean = false
@Advanced @Advanced
@ClassType(classOf[Int])
@Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false) @Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false)
var maximumNumberOfConcurrentJobs: Int = -1 var maximumNumberOfConcurrentJobs: Option[Int] = None
@ArgumentCollection @ArgumentCollection
val emailSettings = new EmailSettings val emailSettings = new EmailSettings

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012 The Broad Institute * Copyright 2012-2015 Broad Institute, Inc.
* *
* Permission is hereby granted, free of charge, to any person * Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation * obtaining a copy of this software and associated documentation
@ -67,4 +67,4 @@ class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunn
*/ */
override def tryStop(runners: Set[ParallelShellJobRunner]) = override def tryStop(runners: Set[ParallelShellJobRunner]) =
runners.foreach(_.tryStop()) runners.foreach(_.tryStop())
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012 The Broad Institute * Copyright 2012-2015 Broad Institute, Inc.
* *
* Permission is hereby granted, free of charge, to any person * Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation * obtaining a copy of this software and associated documentation
@ -63,7 +63,7 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL
val commandLine = Array("sh", jobScript.getAbsolutePath) val commandLine = Array("sh", jobScript.getAbsolutePath)
val stdoutSettings = new OutputStreamSettings val stdoutSettings = new OutputStreamSettings
val stderrSettings = new OutputStreamSettings val stderrSettings = new OutputStreamSettings
val mergeError = (function.jobErrorFile == null) val mergeError = function.jobErrorFile == null
stdoutSettings.setOutputFile(function.jobOutputFile, true) stdoutSettings.setOutputFile(function.jobOutputFile, true)
if (function.jobErrorFile != null) if (function.jobErrorFile != null)
@ -90,25 +90,20 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL
// Register a callback on the completion of the future, making sure that // Register a callback on the completion of the future, making sure that
// the status of the job is updated accordingly. // the status of the job is updated accordingly.
executedFuture.onComplete { tryExitStatus => executedFuture.onComplete {
case Success(exitStatus) =>
tryExitStatus match { logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus)
case Success(exitStatus) => { finalExitStatus.success(exitStatus)
logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus) getRunInfo.doneTime = new Date()
finalExitStatus.success(exitStatus) exitStatusUpdateJobRunnerStatus(exitStatus)
getRunInfo.doneTime = new Date() case Failure(throwable) =>
exitStatusUpdateJobRunnerStatus(exitStatus) logger.debug(
} "Failed in return from run with: " +
case Failure(throwable) => { throwable.getClass.getCanonicalName + " :: " +
logger.debug( throwable.getMessage)
"Failed in return from run with: " + finalExitStatus.failure(throwable)
throwable.getClass.getCanonicalName + " :: " + getRunInfo.doneTime = new Date()
throwable.getMessage) updateStatus(RunnerStatus.FAILED)
finalExitStatus.failure(throwable)
getRunInfo.doneTime = new Date()
updateStatus(RunnerStatus.FAILED)
}
}
} }
} }
@ -153,4 +148,4 @@ class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandL
false false
} }
} }
} }

View File

@ -1,3 +1,28 @@
/*
* Copyright 2012-2015 Broad Institute, Inc.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.gatk.queue.engine.parallelshell package org.broadinstitute.gatk.queue.engine.parallelshell
import java.io.PrintWriter import java.io.PrintWriter
@ -78,4 +103,4 @@ class ThreadSafeProcessController extends Logging {
}.destroy() }.destroy()
} }
} }

View File

@ -53,7 +53,7 @@ object QueueTest extends BaseTest with Logging {
/** /**
* All the job runners configured to run QueueTests at The Broad. * All the job runners configured to run QueueTests at The Broad.
*/ */
final val allJobRunners = Seq("GridEngine", "Shell") final val allJobRunners = Seq("GridEngine", "Shell", "ParallelShell")
/** /**
* The default job runners to run. * The default job runners to run.