Merge pull request #1172 from broadinstitute/ks_jd_parallel_shell
ParallelShell added as a new JobRunner
This commit is contained in:
commit
a762dcb6cf
|
|
@ -149,4 +149,23 @@ class HelloWorldQueueTest {
|
||||||
spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out")
|
spec.expectedFilePaths = Seq("pipelineLogDir/HelloWorld-1.out")
|
||||||
QueueTest.executeTest(spec)
|
QueueTest.executeTest(spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeOut=36000000)
|
||||||
|
def testHelloWorldParallelShell() {
|
||||||
|
val spec = new QueueTestSpec
|
||||||
|
spec.name = "HelloWorldWithLogDirectory"
|
||||||
|
spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala"
|
||||||
|
spec.jobRunners = Seq("ParallelShell")
|
||||||
|
QueueTest.executeTest(spec)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeOut=36000000)
|
||||||
|
def testHelloWorldParallelShellMaxConcurrentRun() {
|
||||||
|
val spec = new QueueTestSpec
|
||||||
|
spec.name = "HelloWorldWithLogDirectory"
|
||||||
|
spec.args = "-S " + QueueTest.publicQScriptsPackageDir + "examples/HelloWorld.scala" +
|
||||||
|
" -maxConcurrentRun 10"
|
||||||
|
spec.jobRunners = Seq("ParallelShell")
|
||||||
|
QueueTest.executeTest(spec)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -434,7 +434,20 @@ class QGraph extends Logging {
|
||||||
var doneJobs = Set.empty[FunctionEdge]
|
var doneJobs = Set.empty[FunctionEdge]
|
||||||
var failedJobs = Set.empty[FunctionEdge]
|
var failedJobs = Set.empty[FunctionEdge]
|
||||||
|
|
||||||
while (running && readyJobs.size > 0 && !readyRunningCheck(lastRunningCheck)) {
|
def startJobs: Boolean = {
|
||||||
|
|
||||||
|
def canRunMoreConcurrentJobs: Boolean =
|
||||||
|
if(settings.maximumNumberOfConcurrentJobs.isDefined)
|
||||||
|
runningJobs.size + startedJobs.size < settings.maximumNumberOfConcurrentJobs.get
|
||||||
|
else
|
||||||
|
true
|
||||||
|
|
||||||
|
running && readyJobs.size > 0 &&
|
||||||
|
!readyRunningCheck(lastRunningCheck) &&
|
||||||
|
canRunMoreConcurrentJobs
|
||||||
|
}
|
||||||
|
|
||||||
|
while (startJobs) {
|
||||||
val edge = readyJobs.head
|
val edge = readyJobs.head
|
||||||
edge.runner = newRunner(edge.function)
|
edge.runner = newRunner(edge.function)
|
||||||
edge.start()
|
edge.start()
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ package org.broadinstitute.gatk.queue.engine
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import org.broadinstitute.gatk.queue.QSettings
|
import org.broadinstitute.gatk.queue.QSettings
|
||||||
import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils}
|
import org.broadinstitute.gatk.queue.util.{EmailSettings, SystemUtils}
|
||||||
import org.broadinstitute.gatk.utils.commandline.{Advanced, ArgumentCollection, Argument}
|
import org.broadinstitute.gatk.utils.commandline.{ClassType, Advanced, ArgumentCollection, Argument}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Command line options for a QGraph.
|
* Command line options for a QGraph.
|
||||||
|
|
@ -80,6 +80,11 @@ class QGraphSettings {
|
||||||
@Argument(fullName="disableJobReport", shortName="disableJobReport", doc="If provided, we will not create a job report", required=false)
|
@Argument(fullName="disableJobReport", shortName="disableJobReport", doc="If provided, we will not create a job report", required=false)
|
||||||
var disableJobReport: Boolean = false
|
var disableJobReport: Boolean = false
|
||||||
|
|
||||||
|
@Advanced
|
||||||
|
@ClassType(classOf[Int])
|
||||||
|
@Argument(fullName="maximumNumberOfJobsToRunConcurrently", shortName="maxConcurrentRun", doc="The maximum number of jobs to start at any given time. (Default is no limit)", required=false)
|
||||||
|
var maximumNumberOfConcurrentJobs: Option[Int] = None
|
||||||
|
|
||||||
@ArgumentCollection
|
@ArgumentCollection
|
||||||
val emailSettings = new EmailSettings
|
val emailSettings = new EmailSettings
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2012-2015 Broad Institute, Inc.
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||||
|
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.gatk.queue.engine.parallelshell
|
||||||
|
|
||||||
|
import org.broadinstitute.gatk.queue.function.CommandLineFunction
|
||||||
|
import org.broadinstitute.gatk.queue.engine.CommandLineJobManager
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs multiple jobs locally without blocking.
|
||||||
|
* Use this with care as it might not be the most efficient way to run things.
|
||||||
|
* However, for some scenarios, such as running multiple single threaded
|
||||||
|
* programs concurrently it can be quite useful.
|
||||||
|
*
|
||||||
|
* All this code is based on the normal shell runner in GATK Queue and all
|
||||||
|
* credits for everything except the concurrency part goes to the GATK team.
|
||||||
|
*
|
||||||
|
* @author Johan Dahlberg
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class ParallelShellJobManager extends CommandLineJobManager[ParallelShellJobRunner] {
|
||||||
|
|
||||||
|
def runnerType = classOf[ParallelShellJobRunner]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new ParallelShellJobRunner
|
||||||
|
* @param function Function for the runner.
|
||||||
|
* @return a new ParallelShellJobRunner instance
|
||||||
|
*/
|
||||||
|
def create(function: CommandLineFunction) =
|
||||||
|
new ParallelShellJobRunner(function)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the status of the specified jobrunners.
|
||||||
|
* @param runners Runners to update.
|
||||||
|
* @return runners which were updated.
|
||||||
|
*/
|
||||||
|
override def updateStatus(
|
||||||
|
runners: Set[ParallelShellJobRunner]): Set[ParallelShellJobRunner] =
|
||||||
|
runners.filter { runner => runner.updateJobStatus() }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the specified runners.
|
||||||
|
* @param runners Runners to stop.
|
||||||
|
*/
|
||||||
|
override def tryStop(runners: Set[ParallelShellJobRunner]) =
|
||||||
|
runners.foreach(_.tryStop())
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,151 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2012-2015 Broad Institute, Inc.
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||||
|
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.gatk.queue.engine.parallelshell
|
||||||
|
|
||||||
|
import org.broadinstitute.gatk.queue.function.CommandLineFunction
|
||||||
|
import org.broadinstitute.gatk.queue.engine.{ RunnerStatus, CommandLineJobRunner }
|
||||||
|
import java.util.Date
|
||||||
|
import org.broadinstitute.gatk.utils.Utils
|
||||||
|
import org.broadinstitute.gatk.utils.runtime.{ ProcessSettings, OutputStreamSettings }
|
||||||
|
import scala.concurrent._
|
||||||
|
import ExecutionContext.Implicits.global
|
||||||
|
import scala.util.{ Success, Failure }
|
||||||
|
import org.broadinstitute.gatk.queue.util.Logging
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs multiple jobs locally without blocking.
|
||||||
|
* Use this with care as it might not be the most efficient way to run things.
|
||||||
|
* However, for some scenarios, such as running multiple single threaded
|
||||||
|
* programs concurrently it can be quite useful.
|
||||||
|
*
|
||||||
|
* All this code is based on the normal shell runner in GATK Queue and all
|
||||||
|
* credits for everything except the concurrency part goes to the GATK team.
|
||||||
|
*
|
||||||
|
* @author Johan Dahlberg - 20150611
|
||||||
|
*
|
||||||
|
* @param function Command to run.
|
||||||
|
*/
|
||||||
|
class ParallelShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
|
||||||
|
|
||||||
|
// Controller on the thread that started the job
|
||||||
|
val controller: ThreadSafeProcessController = new ThreadSafeProcessController()
|
||||||
|
|
||||||
|
// Once the application exits this promise will be fulfilled.
|
||||||
|
val finalExitStatus = Promise[Int]()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the function on the local shell.
|
||||||
|
*/
|
||||||
|
def start() {
|
||||||
|
val commandLine = Array("sh", jobScript.getAbsolutePath)
|
||||||
|
val stdoutSettings = new OutputStreamSettings
|
||||||
|
val stderrSettings = new OutputStreamSettings
|
||||||
|
val mergeError = function.jobErrorFile == null
|
||||||
|
|
||||||
|
stdoutSettings.setOutputFile(function.jobOutputFile, true)
|
||||||
|
if (function.jobErrorFile != null)
|
||||||
|
stderrSettings.setOutputFile(function.jobErrorFile, true)
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled) {
|
||||||
|
stdoutSettings.printStandard(true)
|
||||||
|
stderrSettings.printStandard(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
val processSettings = new ProcessSettings(
|
||||||
|
commandLine, mergeError, function.commandDirectory, null,
|
||||||
|
null, stdoutSettings, stderrSettings)
|
||||||
|
|
||||||
|
updateJobRun(processSettings)
|
||||||
|
|
||||||
|
getRunInfo.startTime = new Date()
|
||||||
|
getRunInfo.exechosts = Utils.resolveHostname()
|
||||||
|
updateStatus(RunnerStatus.RUNNING)
|
||||||
|
|
||||||
|
// Run the command line process in a future.
|
||||||
|
val executedFuture =
|
||||||
|
future { controller.exec(processSettings) }
|
||||||
|
|
||||||
|
// Register a callback on the completion of the future, making sure that
|
||||||
|
// the status of the job is updated accordingly.
|
||||||
|
executedFuture.onComplete {
|
||||||
|
case Success(exitStatus) =>
|
||||||
|
logger.debug(commandLine.mkString(" ") + " :: Got return on exit status in future: " + exitStatus)
|
||||||
|
finalExitStatus.success(exitStatus)
|
||||||
|
getRunInfo.doneTime = new Date()
|
||||||
|
exitStatusUpdateJobRunnerStatus(exitStatus)
|
||||||
|
case Failure(throwable) =>
|
||||||
|
logger.debug(
|
||||||
|
"Failed in return from run with: " +
|
||||||
|
throwable.getClass.getCanonicalName + " :: " +
|
||||||
|
throwable.getMessage)
|
||||||
|
finalExitStatus.failure(throwable)
|
||||||
|
getRunInfo.doneTime = new Date()
|
||||||
|
updateStatus(RunnerStatus.FAILED)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Possibly invoked from a shutdown thread, find and
|
||||||
|
* stop the controller from the originating thread
|
||||||
|
*/
|
||||||
|
def tryStop() = {
|
||||||
|
try {
|
||||||
|
controller.tryDestroy()
|
||||||
|
} catch {
|
||||||
|
case e: Exception =>
|
||||||
|
logger.error("Unable to kill shell job: " + function.description, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the status of the runner based on the exit status
|
||||||
|
* of the process.
|
||||||
|
*/
|
||||||
|
def exitStatusUpdateJobRunnerStatus(exitStatus: Int): Unit = {
|
||||||
|
exitStatus match {
|
||||||
|
case 0 => updateStatus(RunnerStatus.DONE)
|
||||||
|
case _ => updateStatus(RunnerStatus.FAILED)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to get the status of a job by looking at if the finalExitStatus
|
||||||
|
* promise has completed or not.
|
||||||
|
* @return if the jobRunner has updated it's status or not.
|
||||||
|
*/
|
||||||
|
def updateJobStatus(): Boolean = {
|
||||||
|
if (finalExitStatus.isCompleted) {
|
||||||
|
val completedExitStatus = finalExitStatus.future.value.get.get
|
||||||
|
exitStatusUpdateJobRunnerStatus(completedExitStatus)
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
// Make sure the status is update here, otherwise Queue will think
|
||||||
|
// it's lots control over the job and kill it after 5 minutes.
|
||||||
|
updateStatus(status)
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,106 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2012-2015 Broad Institute, Inc.
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||||
|
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.gatk.queue.engine.parallelshell
|
||||||
|
|
||||||
|
import java.io.PrintWriter
|
||||||
|
|
||||||
|
import org.broadinstitute.gatk.queue.util.Logging
|
||||||
|
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
|
||||||
|
import scala.sys.process._
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class ThreadSafeProcessController extends Logging {
|
||||||
|
|
||||||
|
private var process: Option[Process] = None
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a process logger writing the stdout and stderr of the
|
||||||
|
* process controlled by this instance to the files specified in
|
||||||
|
* the provided ProcessSettings instance.
|
||||||
|
* @param processSettings specifiying which files to write to
|
||||||
|
* @return a process logger which can be used by the `scala.sys.process`
|
||||||
|
*/
|
||||||
|
private def getProcessLogger(processSettings: ProcessSettings): ProcessLogger = {
|
||||||
|
|
||||||
|
val (stdOutFile, stdErrFile) = {
|
||||||
|
|
||||||
|
val stdOutFile = processSettings.getStdoutSettings.getOutputFile
|
||||||
|
|
||||||
|
if(processSettings.getStderrSettings.getOutputFile != null) {
|
||||||
|
val stdErrFile = processSettings.getStderrSettings.getOutputFile
|
||||||
|
(stdOutFile, stdErrFile)
|
||||||
|
} else {
|
||||||
|
(stdOutFile, stdOutFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
val stdOutPrintWriter = new PrintWriter(stdOutFile)
|
||||||
|
val stdErrPrintWriter = new PrintWriter(stdErrFile)
|
||||||
|
|
||||||
|
def printToWriter(printWriter: PrintWriter)(line: String): Unit = {
|
||||||
|
printWriter.println(line)
|
||||||
|
printWriter.flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
val stringStdOutPrinterFunc = printToWriter(stdOutPrintWriter) _
|
||||||
|
val stringStdErrPrinterFunc = printToWriter(stdErrPrintWriter) _
|
||||||
|
|
||||||
|
val processLogger = ProcessLogger(
|
||||||
|
stringStdOutPrinterFunc,
|
||||||
|
stringStdErrPrinterFunc
|
||||||
|
)
|
||||||
|
|
||||||
|
processLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the process specified in process settings
|
||||||
|
* @param processSettings specifying the commandline to run.
|
||||||
|
* @return the exit status of the process.
|
||||||
|
*/
|
||||||
|
def exec(processSettings: ProcessSettings): Int = {
|
||||||
|
|
||||||
|
val commandLine: ProcessBuilder = processSettings.getCommand.mkString(" ")
|
||||||
|
logger.debug("Trying to start process: " + commandLine)
|
||||||
|
process = Some(commandLine.run(getProcessLogger(processSettings)))
|
||||||
|
process.get.exitValue()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to destroy the underlying process.
|
||||||
|
*/
|
||||||
|
def tryDestroy(): Unit = {
|
||||||
|
logger.debug("Trying to kill process")
|
||||||
|
process.getOrElse {
|
||||||
|
throw new IllegalStateException("Tried to kill unstarted job.")
|
||||||
|
}.destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -53,7 +53,7 @@ object QueueTest extends BaseTest with Logging {
|
||||||
/**
|
/**
|
||||||
* All the job runners configured to run QueueTests at The Broad.
|
* All the job runners configured to run QueueTests at The Broad.
|
||||||
*/
|
*/
|
||||||
final val allJobRunners = Seq("GridEngine", "Shell")
|
final val allJobRunners = Seq("GridEngine", "Shell", "ParallelShell")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default job runners to run.
|
* The default job runners to run.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue