Added setupRetry() to modify jobs when Queue is run with '-retry' and jobs are about to restart after an error.
Implemented a mixin called "RetryMemoryLimit" which will by default double the memory. GridEngine memory request parameter can be selected on the command line via '-resMemReqParam mem_free' or '-resMemReqParam virtual_free'. Java optimizations now enabled by default: - Only 4 GC threads instead of each job using java's default O(number of cores) GC threads. Previously on a machine with N cores if you have N jobs running and java allocates N GC threads by default, then the machines are using up to N^2 threads if all jobs are in heavy GC (thanks elauzier). - Exit if GC spends more than 50% of time in GC (thanks ktibbett). - Exit if GC reclaims lest than 10% of max heap (thanks ktibbett). Added a -noGCOpt command line option to disable new java optimizations.
This commit is contained in:
parent
6ad75d2f5c
commit
22b4466cf5
|
|
@ -0,0 +1,22 @@
|
|||
import org.broadinstitute.sting.queue.function.RetryMemoryLimit
|
||||
import org.broadinstitute.sting.queue.QScript
|
||||
import org.broadinstitute.sting.queue.extensions.gatk._
|
||||
|
||||
class ExampleRetryMemoryLimit extends QScript {
|
||||
@Input(doc="The reference file for the bam files.", shortName="R")
|
||||
var referenceFile: File = _
|
||||
|
||||
@Input(doc="Bam file to genotype.", shortName="I")
|
||||
var bamFile: File = _
|
||||
|
||||
def script() {
|
||||
val ug = new UnifiedGenotyper with RetryMemoryLimit
|
||||
// First run with 1m
|
||||
ug.memoryLimit = .001
|
||||
// On retry run with 1g
|
||||
ug.retryMemoryFunction = (d => d * 1000)
|
||||
ug.reference_sequence = referenceFile
|
||||
ug.input_file = Seq(bamFile)
|
||||
add(ug)
|
||||
}
|
||||
}
|
||||
|
|
@ -5,7 +5,8 @@ import org.broadinstitute.sting.queue.extensions.gatk._
|
|||
|
||||
/**
|
||||
* An example building on the intro ExampleCountReads.scala.
|
||||
* Runs an INCOMPLETE version of the UnifiedGenotyper with VariantEval and optional VariantFiltration.
|
||||
* Runs an INCOMPLETE variant calling pipeline with just the UnifiedGenotyper, VariantEval and optional VariantFiltration.
|
||||
* For a complete description of the suggested for a variant calling pipeline see the latest version of the Best Practice Variant Detection document
|
||||
*/
|
||||
class ExampleUnifiedGenotyper extends QScript {
|
||||
// Create an alias 'qscript' to be able to access variables
|
||||
|
|
@ -43,14 +44,12 @@ class ExampleUnifiedGenotyper extends QScript {
|
|||
}
|
||||
|
||||
def script() {
|
||||
// Create the four function that we can run.
|
||||
// Create the four functions that we may run depending on options.
|
||||
val genotyper = new UnifiedGenotyper with UnifiedGenotyperArguments
|
||||
val variantFilter = new VariantFiltration with UnifiedGenotyperArguments
|
||||
val evalUnfiltered = new VariantEval with UnifiedGenotyperArguments
|
||||
val evalFiltered = new VariantEval with UnifiedGenotyperArguments
|
||||
|
||||
// If you are running this on a compute farm, make sure that the Sting/shell
|
||||
// folder is in your path to use mergeText.sh and splitIntervals.sh.
|
||||
genotyper.scatterCount = 3
|
||||
genotyper.input_file :+= qscript.bamFile
|
||||
genotyper.out = swapExt(qscript.bamFile, "bam", "unfiltered.vcf")
|
||||
|
|
|
|||
|
|
@ -55,12 +55,18 @@ class QSettings {
|
|||
@Argument(fullName="memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
|
||||
var memoryLimit: Option[Double] = None
|
||||
|
||||
@Argument(fullName="memory_limit_threshold", shortName="memLimitThresh", doc="After passing this threshold stop increasing memory limit for jobs, in gigabytes.", required=false)
|
||||
var memoryLimitThreshold: Option[Double] = None
|
||||
|
||||
@Argument(fullName="resident_memory_limit", shortName="resMemLimit", doc="Default resident memory limit for jobs, in gigabytes.", required=false)
|
||||
var residentLimit: Option[Double] = None
|
||||
|
||||
@Argument(fullName="resident_memory_request", shortName="resMemReq", doc="Default resident memory request for jobs, in gigabytes.", required=false)
|
||||
var residentRequest: Option[Double] = None
|
||||
|
||||
@Argument(fullName="resident_memory_request_parameter", shortName="resMemReqParam", doc="Parameter for resident memory requests. By default not requested.", required=false)
|
||||
var residentRequestParameter: String = _
|
||||
|
||||
/** The name of the parallel environment (required for SGE, for example) */
|
||||
@Argument(fullName="job_parallel_env", shortName="jobParaEnv", doc="An SGE style parallel environment to use for jobs requesting more than 1 core. Equivalent to submitting jobs with -pe ARG nt for jobs with nt > 1", required=false)
|
||||
var parallelEnvironmentName: String = "smp_pe" // Broad default
|
||||
|
|
@ -68,6 +74,9 @@ class QSettings {
|
|||
@Argument(fullName="dontRequestMultipleCores", shortName="multiCoreJerk", doc="If provided, Queue will not request multiple processors for jobs using multiple processors. Sometimes you eat the bear, sometimes the bear eats you.", required=false)
|
||||
var dontRequestMultipleCores: Boolean = false
|
||||
|
||||
@Argument(fullName="disableDefaultJavaGCOptimizations", shortName="noGCOpt", doc="If provided, Queue will not ensure that java GC threads are limited and that the a minimum amount of time is spent in GC.")
|
||||
var disableDefaultJavaGCOptimizations = false
|
||||
|
||||
@Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false)
|
||||
var runDirectory = new File(".")
|
||||
|
||||
|
|
|
|||
|
|
@ -40,11 +40,6 @@ import org.apache.commons.lang.StringUtils
|
|||
class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNode) extends QEdge with Logging {
|
||||
var runner: JobRunner[_] =_
|
||||
|
||||
/**
|
||||
* The number of times this edge has been run.
|
||||
*/
|
||||
var retries = 0
|
||||
|
||||
/**
|
||||
* The depth of this edge in the graph.
|
||||
*/
|
||||
|
|
@ -87,14 +82,14 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
|||
runner.init()
|
||||
runner.start()
|
||||
} catch {
|
||||
case e =>
|
||||
case e: Throwable =>
|
||||
currentStatus = RunnerStatus.FAILED
|
||||
try {
|
||||
runner.cleanup()
|
||||
function.failOutputs.foreach(_.createNewFile())
|
||||
writeStackTrace(e)
|
||||
} catch {
|
||||
case _ => /* ignore errors in the exception handler */
|
||||
case _: Throwable => /* ignore errors in the exception handler */
|
||||
}
|
||||
logger.error("Error: " + function.description, e)
|
||||
}
|
||||
|
|
@ -114,7 +109,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
|||
runner.cleanup()
|
||||
function.failOutputs.foreach(_.createNewFile())
|
||||
} catch {
|
||||
case _ => /* ignore errors in the error handler */
|
||||
case _: Throwable => /* ignore errors in the error handler */
|
||||
}
|
||||
logger.error("Error: " + function.description)
|
||||
tailError()
|
||||
|
|
@ -123,19 +118,19 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
|||
runner.cleanup()
|
||||
function.doneOutputs.foreach(_.createNewFile())
|
||||
} catch {
|
||||
case _ => /* ignore errors in the done handler */
|
||||
case _: Throwable => /* ignore errors in the done handler */
|
||||
}
|
||||
logger.info("Done: " + function.description)
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
case e: Throwable =>
|
||||
currentStatus = RunnerStatus.FAILED
|
||||
try {
|
||||
runner.cleanup()
|
||||
function.failOutputs.foreach(_.createNewFile())
|
||||
writeStackTrace(e)
|
||||
} catch {
|
||||
case _ => /* ignore errors in the exception handler */
|
||||
case _: Throwable => /* ignore errors in the exception handler */
|
||||
}
|
||||
logger.error("Error retrieving status: " + function.description, e)
|
||||
}
|
||||
|
|
@ -168,6 +163,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
|||
currentStatus = RunnerStatus.PENDING
|
||||
if (cleanOutputs)
|
||||
function.deleteOutputs()
|
||||
function.jobErrorLines = Nil
|
||||
runner = null
|
||||
}
|
||||
|
||||
|
|
@ -189,6 +185,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
|||
val tailLines = IOUtils.tail(errorFile, maxLines)
|
||||
val nl = "%n".format()
|
||||
val summary = if (tailLines.size > maxLines) "Last %d lines".format(maxLines) else "Contents"
|
||||
this.function.jobErrorLines = collection.JavaConversions.asScalaIterable(tailLines).toSeq
|
||||
logger.error("%s of %s:%n%s".format(summary, errorFile, StringUtils.join(tailLines, nl)))
|
||||
} else {
|
||||
logger.error("Unable to access log file: %s".format(errorFile))
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ class QGraph extends Logging {
|
|||
val isReady = numMissingValues == 0
|
||||
|
||||
if (this.jobGraph.edgeSet.isEmpty) {
|
||||
logger.warn("Nothing to run! Were any Functions added?");
|
||||
logger.warn("Nothing to run! Were any Functions added?")
|
||||
} else if (settings.getStatus) {
|
||||
logger.info("Checking pipeline status.")
|
||||
logStatus()
|
||||
|
|
@ -320,7 +320,7 @@ class QGraph extends Logging {
|
|||
if (settings.startFromScratch)
|
||||
logger.info("Will remove outputs from previous runs.")
|
||||
|
||||
updateGraphStatus(false)
|
||||
updateGraphStatus(cleanOutputs = false)
|
||||
|
||||
var readyJobs = getReadyJobs
|
||||
while (running && readyJobs.size > 0) {
|
||||
|
|
@ -361,7 +361,7 @@ class QGraph extends Logging {
|
|||
* Logs job statuses by traversing the graph and looking for status-related files
|
||||
*/
|
||||
private def logStatus() {
|
||||
updateGraphStatus(false)
|
||||
updateGraphStatus(cleanOutputs = false)
|
||||
doStatus(status => logger.info(status))
|
||||
}
|
||||
|
||||
|
|
@ -388,7 +388,7 @@ class QGraph extends Logging {
|
|||
if (settings.startFromScratch)
|
||||
logger.info("Removing outputs from previous runs.")
|
||||
|
||||
updateGraphStatus(true)
|
||||
updateGraphStatus(cleanOutputs = true)
|
||||
|
||||
var readyJobs = TreeSet.empty[FunctionEdge](functionOrdering)
|
||||
readyJobs ++= getReadyJobs
|
||||
|
|
@ -473,7 +473,7 @@ class QGraph extends Logging {
|
|||
logStatusCounts()
|
||||
deleteCleanup(-1)
|
||||
} catch {
|
||||
case e =>
|
||||
case e: Throwable =>
|
||||
logger.error("Uncaught error running jobs.", e)
|
||||
throw e
|
||||
} finally {
|
||||
|
|
@ -662,11 +662,12 @@ class QGraph extends Logging {
|
|||
private def checkRetryJobs(failed: Set[FunctionEdge]) {
|
||||
if (settings.retries > 0) {
|
||||
for (failedJob <- failed) {
|
||||
if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) {
|
||||
failedJob.retries += 1
|
||||
failedJob.resetToPending(true)
|
||||
if (failedJob.function.jobRestartable && failedJob.function.retries < settings.retries) {
|
||||
failedJob.function.retries += 1
|
||||
failedJob.function.setupRetry()
|
||||
failedJob.resetToPending(cleanOutputs = true)
|
||||
logger.info("Reset for retry attempt %d of %d: %s".format(
|
||||
failedJob.retries, settings.retries, failedJob.function.description))
|
||||
failedJob.function.retries, settings.retries, failedJob.function.description))
|
||||
statusCounts.failed -= 1
|
||||
statusCounts.pending += 1
|
||||
} else {
|
||||
|
|
@ -733,7 +734,7 @@ class QGraph extends Logging {
|
|||
private def emailDescription(edge: FunctionEdge) = {
|
||||
val description = new StringBuilder
|
||||
if (settings.retries > 0)
|
||||
description.append("Attempt %d of %d.%n".format(edge.retries + 1, settings.retries + 1))
|
||||
description.append("Attempt %d of %d.%n".format(edge.function.retries + 1, settings.retries + 1))
|
||||
description.append(edge.function.description)
|
||||
description.toString()
|
||||
}
|
||||
|
|
@ -1077,7 +1078,7 @@ class QGraph extends Logging {
|
|||
runner.checkUnknownStatus()
|
||||
}
|
||||
} catch {
|
||||
case e => /* ignore */
|
||||
case e: Throwable => /* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1119,20 +1120,20 @@ class QGraph extends Logging {
|
|||
try {
|
||||
manager.tryStop(managerRunners)
|
||||
} catch {
|
||||
case e => /* ignore */
|
||||
case e: Throwable => /* ignore */
|
||||
}
|
||||
for (runner <- managerRunners) {
|
||||
try {
|
||||
runner.cleanup()
|
||||
} catch {
|
||||
case e => /* ignore */
|
||||
case e: Throwable => /* ignore */
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
manager.exit()
|
||||
} catch {
|
||||
case e => /* ignore */
|
||||
case e: Throwable => /* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,13 +52,9 @@ class GridEngineJobRunner(session: Session, function: CommandLineFunction) exten
|
|||
nativeSpec += " -q " + function.jobQueue
|
||||
|
||||
// If the resident set size is requested pass on the memory request
|
||||
// NOTE: 12/20/11: depristo commented this out because mem_free isn't
|
||||
// such a standard feature in SGE (gsa-engineering queue doesn't support it)
|
||||
// requiring it can make SGE not so usable. It's dangerous to not enforce
|
||||
// that we have enough memory to run our jobs, but I'd rather be dangerous
|
||||
// than not be able to run my jobs at all.
|
||||
// if (function.residentRequest.isDefined)
|
||||
// nativeSpec += " -l mem_free=%dM".format(function.residentRequest.map(_ * 1024).get.ceil.toInt)
|
||||
// mem_free is the standard, but may also be virtual_free or even not available
|
||||
if (function.qSettings.residentRequestParameter != null && function.residentRequest.isDefined)
|
||||
nativeSpec += " -l %s=%dM".format(function.qSettings.residentRequestParameter, function.residentRequest.map(_ * 1024).get.ceil.toInt)
|
||||
|
||||
// If the resident set size limit is defined specify the memory limit
|
||||
if (function.residentLimit.isDefined)
|
||||
|
|
|
|||
|
|
@ -137,12 +137,17 @@ trait CommandLineFunction extends QFunction with Logging {
|
|||
if (residentRequest.isEmpty)
|
||||
residentRequest = memoryLimit
|
||||
|
||||
if (residentLimit.isEmpty)
|
||||
residentLimit = residentRequest.map( _ * 1.2 )
|
||||
if (residentLimit.isEmpty || residentLimit == residentRequest)
|
||||
residentLimit = residentRequest.map(residentLimitBuffer)
|
||||
|
||||
super.freezeFieldValues()
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A function that decides how much memory cushion to add to the residentRequest to create the residentLimit
|
||||
*/
|
||||
def residentLimitBuffer: (Double => Double) = (1.2 * _)
|
||||
|
||||
/**
|
||||
* Safely construct a full required command-line argument with consistent quoting, whitespace separation, etc.
|
||||
*
|
||||
|
|
@ -223,7 +228,7 @@ trait CommandLineFunction extends QFunction with Logging {
|
|||
*/
|
||||
protected def conditional( condition: Boolean, param: Any, escape: Boolean = true, format: String = "%s" ): String = {
|
||||
if ( condition ) {
|
||||
" %s ".format(formatArgument("", param, "", false, escape, format))
|
||||
" %s ".format(formatArgument("", param, "", spaceSeparated = false, escape = escape, paramFormat = format))
|
||||
}
|
||||
else {
|
||||
""
|
||||
|
|
|
|||
|
|
@ -54,6 +54,16 @@ trait JavaCommandLineFunction extends CommandLineFunction {
|
|||
*/
|
||||
var javaGCThreads: Option[Int] = None
|
||||
|
||||
/**
|
||||
* Max percent of time spent in garbage collection
|
||||
*/
|
||||
var javaGCTimeLimit: Option[Int] = None
|
||||
|
||||
/**
|
||||
* Min percent of max heap freed during a garbage collection
|
||||
*/
|
||||
var javaGCHeapFreeLimit: Option[Int] = None
|
||||
|
||||
override def freezeFieldValues() {
|
||||
super.freezeFieldValues()
|
||||
|
||||
|
|
@ -62,6 +72,37 @@ trait JavaCommandLineFunction extends CommandLineFunction {
|
|||
|
||||
if (javaMainClass != null && javaClasspath.isEmpty)
|
||||
javaClasspath = JavaCommandLineFunction.currentClasspath
|
||||
|
||||
if (!this.qSettings.disableDefaultJavaGCOptimizations) {
|
||||
// By default set the GC threads to 4
|
||||
if (javaGCThreads.isEmpty)
|
||||
javaGCThreads = Some(4)
|
||||
|
||||
// By default exit if more than 50% of time in GC
|
||||
if (javaGCTimeLimit.isEmpty)
|
||||
javaGCTimeLimit = Some(50)
|
||||
|
||||
// By default exit if GC does not free up 10% of the heap
|
||||
if (javaGCHeapFreeLimit.isEmpty)
|
||||
javaGCHeapFreeLimit = Some(10)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def copySettingsTo(function: QFunction) {
|
||||
super.copySettingsTo(function)
|
||||
function match {
|
||||
case java: JavaCommandLineFunction =>
|
||||
if (java.javaMemoryLimit.isEmpty)
|
||||
java.javaMemoryLimit = this.javaMemoryLimit
|
||||
if (java.javaGCThreads.isEmpty)
|
||||
java.javaGCThreads = this.javaGCThreads
|
||||
if (java.javaGCTimeLimit.isEmpty)
|
||||
java.javaGCTimeLimit = this.javaGCTimeLimit
|
||||
if (java.javaGCHeapFreeLimit.isEmpty)
|
||||
java.javaGCHeapFreeLimit = this.javaGCHeapFreeLimit
|
||||
case _ => /* ignore */
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -77,10 +118,13 @@ trait JavaCommandLineFunction extends CommandLineFunction {
|
|||
null
|
||||
}
|
||||
|
||||
def javaOpts = optional("-Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m", spaceSeparated=false) +
|
||||
conditional(javaGCThreads.isDefined, "-XX:+UseParallelOldGC") +
|
||||
optional("-XX:ParallelGCThreads=", javaGCThreads, spaceSeparated=false) +
|
||||
required("-Djava.io.tmpdir=", jobTempDir, spaceSeparated=false)
|
||||
def javaOpts = Array(
|
||||
optional("-Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m", spaceSeparated=false),
|
||||
conditional(javaGCThreads.isDefined || javaGCTimeLimit.isDefined || javaGCHeapFreeLimit.isDefined, "-XX:+UseParallelOldGC"),
|
||||
optional("-XX:ParallelGCThreads=", javaGCThreads, spaceSeparated=false),
|
||||
optional("-XX:GCTimeLimit=", javaGCTimeLimit, spaceSeparated=false),
|
||||
optional("-XX:GCHeapFreeLimit=", javaGCHeapFreeLimit, spaceSeparated=false),
|
||||
required("-Djava.io.tmpdir=", jobTempDir, spaceSeparated=false)).mkString("")
|
||||
|
||||
def commandLine = required("java") +
|
||||
javaOpts +
|
||||
|
|
|
|||
|
|
@ -112,6 +112,18 @@ trait QFunction extends Logging with QJobReport {
|
|||
/** File to redirect any errors. Defaults to <jobName>.out */
|
||||
var jobErrorFile: File = _
|
||||
|
||||
/** Errors (if any) from the last failed run of jobErrorFiles. */
|
||||
var jobErrorLines: Seq[String] = Nil
|
||||
|
||||
/**
|
||||
* The number of times this function has previously been run.
|
||||
*/
|
||||
var retries = 0
|
||||
|
||||
/** Change settings for the next run. Retries will be set to the number of times the function was run and jobErrorLines may contain the error text. */
|
||||
def setupRetry() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Description of this command line function.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.queue.function
|
||||
|
||||
/** A mixin that on retry increases the memory limit when certain text is found. */
|
||||
trait RetryMemoryLimit extends CommandLineFunction {
|
||||
|
||||
/** How to increase the memory. By default doubles the memory. */
|
||||
var retryMemoryFunction: (Double => Double) = (2 * _)
|
||||
|
||||
/** Once the threshold is passed, no more memory will be added to memory limit. */
|
||||
var memoryLimitThreshold: Option[Double] = None
|
||||
|
||||
/** Various strings to look for to determine we ran out of memory. */
|
||||
var memoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT")
|
||||
|
||||
override def freezeFieldValues() {
|
||||
super.freezeFieldValues()
|
||||
if (this.memoryLimitThreshold.isEmpty)
|
||||
this.memoryLimitThreshold = this.qSettings.memoryLimitThreshold
|
||||
}
|
||||
|
||||
override def setupRetry() {
|
||||
super.setupRetry()
|
||||
if (this.memoryLimitThreshold.isDefined && this.memoryLimit.isDefined) {
|
||||
|
||||
// NOTE: If we're already at or above the memoryLimit, don't do anything.
|
||||
if (this.memoryLimit.get < this.memoryLimitThreshold.get) {
|
||||
updateMemoryLimits()
|
||||
}
|
||||
|
||||
} else {
|
||||
updateMemoryLimits()
|
||||
}
|
||||
}
|
||||
|
||||
def updateMemoryLimits() {
|
||||
if (isMemoryError) {
|
||||
this.memoryLimit = this.memoryLimit.map(this.retryMemoryFunction)
|
||||
this.residentRequest = this.residentRequest.map(this.retryMemoryFunction)
|
||||
this.residentLimit = this.residentLimit.map(this.retryMemoryFunction)
|
||||
|
||||
// Rebuffer the memory limit if the limit was set exactly to the request
|
||||
if (this.residentLimit == this.residentRequest)
|
||||
this.residentLimit = this.residentRequest.map(this.residentLimitBuffer)
|
||||
|
||||
this match {
|
||||
case java: JavaCommandLineFunction =>
|
||||
java.javaMemoryLimit = java.javaMemoryLimit.map(this.retryMemoryFunction)
|
||||
case _ => /* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def isMemoryError = this.jobErrorLines.exists(line => this.memoryLimitErrorText.exists(error => line.contains(error)))
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.queue.pipeline.examples
|
||||
|
||||
import org.testng.annotations.Test
|
||||
import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec}
|
||||
import org.broadinstitute.sting.BaseTest
|
||||
|
||||
class ExampleRetryMemoryLimitPipelineTest {
|
||||
@Test
|
||||
def testRetryMemoryLimit() {
|
||||
val spec = new PipelineTestSpec
|
||||
spec.name = "RetryMemoryLimit"
|
||||
spec.args = Array(
|
||||
" -S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleRetryMemoryLimit.scala",
|
||||
" -R " + BaseTest.publicTestDir + "exampleFASTA.fasta",
|
||||
" -I " + BaseTest.publicTestDir + "exampleBAM.bam",
|
||||
" -retry 1").mkString
|
||||
spec.jobRunners = PipelineTest.allJobRunners
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
}
|
||||
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
package org.broadinstitute.sting.queue.pipeline.examples
|
||||
|
||||
import org.testng.annotations.Test
|
||||
import org.testng.annotations.{DataProvider, Test}
|
||||
import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec}
|
||||
import org.broadinstitute.sting.BaseTest
|
||||
|
||||
|
|
@ -43,42 +43,53 @@ class ExampleUnifiedGenotyperPipelineTest {
|
|||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnifiedGenotyperWithGatkIntervals() {
|
||||
@DataProvider(name = "ugIntervals")
|
||||
def getUnifiedGenotyperIntervals =
|
||||
Array(
|
||||
Array("gatk_intervals", BaseTest.validationDataLocation + "intervalTest.intervals"),
|
||||
Array("bed_intervals", BaseTest.validationDataLocation + "intervalTest.bed"),
|
||||
Array("vcf_intervals", BaseTest.validationDataLocation + "intervalTest.1.vcf")
|
||||
).asInstanceOf[Array[Array[Object]]]
|
||||
|
||||
@Test(dataProvider = "ugIntervals")
|
||||
def testUnifiedGenotyperWithIntervals(intervalsName: String, intervalsPath: String) {
|
||||
val spec = new PipelineTestSpec
|
||||
spec.name = "unifiedgenotyper_with_gatk_intervals"
|
||||
spec.name = "unifiedgenotyper_with_" + intervalsName
|
||||
spec.args = Array(
|
||||
" -S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleUnifiedGenotyper.scala",
|
||||
" -I " + BaseTest.validationDataLocation + "OV-0930.normal.chunk.bam",
|
||||
" -R " + BaseTest.hg18Reference,
|
||||
" -L " + BaseTest.validationDataLocation + "intervalTest.intervals").mkString
|
||||
" -L " + intervalsPath).mkString
|
||||
spec.jobRunners = Seq("Lsf706")
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnifiedGenotyperWithBedIntervals() {
|
||||
def testUnifiedGenotyperNoGCOpt() {
|
||||
val spec = new PipelineTestSpec
|
||||
spec.name = "unifiedgenotyper_with_bed_intervals"
|
||||
spec.name = "unifiedgenotyper_no_gc_opt"
|
||||
spec.args = Array(
|
||||
" -S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleUnifiedGenotyper.scala",
|
||||
" -I " + BaseTest.validationDataLocation + "OV-0930.normal.chunk.bam",
|
||||
" -R " + BaseTest.hg18Reference,
|
||||
" -L " + BaseTest.validationDataLocation + "intervalTest.bed").mkString
|
||||
spec.jobRunners = Seq("Lsf706")
|
||||
" -R " + BaseTest.publicTestDir + "exampleFASTA.fasta",
|
||||
" -I " + BaseTest.publicTestDir + "exampleBAM.bam",
|
||||
" -noGCOpt").mkString
|
||||
spec.jobRunners = PipelineTest.allJobRunners
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnifiedGenotyperWithVcfIntervals() {
|
||||
@DataProvider(name="resMemReqParams")
|
||||
def getResMemReqParam = Array(Array("mem_free"), Array("virtual_free")).asInstanceOf[Array[Array[Object]]]
|
||||
|
||||
@Test(dataProvider = "resMemReqParams")
|
||||
def testUnifiedGenotyperResMemReqParam(reqParam: String) {
|
||||
val spec = new PipelineTestSpec
|
||||
spec.name = "unifiedgenotyper_with_vcf_intervals"
|
||||
spec.name = "unifiedgenotyper_" + reqParam
|
||||
spec.args = Array(
|
||||
" -S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleUnifiedGenotyper.scala",
|
||||
" -I " + BaseTest.validationDataLocation + "OV-0930.normal.chunk.bam",
|
||||
" -R " + BaseTest.hg18Reference,
|
||||
" -L " + BaseTest.validationDataLocation + "intervalTest.1.vcf").mkString
|
||||
spec.jobRunners = Seq("Lsf706")
|
||||
" -R " + BaseTest.publicTestDir + "exampleFASTA.fasta",
|
||||
" -I " + BaseTest.publicTestDir + "exampleBAM.bam",
|
||||
" -resMemReqParam " + reqParam).mkString
|
||||
spec.jobRunners = Seq("GridEngine")
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue