RetryMemoryLimit now works with Scatter/Gather.
This commit is contained in:
parent
e9b9e2318c
commit
f66284658d
|
|
@ -10,13 +10,17 @@ class ExampleRetryMemoryLimit extends QScript {
|
||||||
var bamFile: File = _
|
var bamFile: File = _
|
||||||
|
|
||||||
def script() {
|
def script() {
|
||||||
val ug = new UnifiedGenotyper with RetryMemoryLimit
|
for (scatterCount <- 1 to 2) {
|
||||||
// First run with 1m
|
val ug = new UnifiedGenotyper with RetryMemoryLimit
|
||||||
ug.memoryLimit = .001
|
// First run with 1m
|
||||||
// On retry run with 1g
|
ug.memoryLimit = .001
|
||||||
ug.retryMemoryFunction = (d => d * 1000)
|
// On retry run with 1g
|
||||||
ug.reference_sequence = referenceFile
|
ug.retryMemoryFunction = (d => d * 1000)
|
||||||
ug.input_file = Seq(bamFile)
|
ug.reference_sequence = referenceFile
|
||||||
add(ug)
|
ug.input_file = Seq(bamFile)
|
||||||
|
ug.out = swapExt(bamFile, ".bam", ".scattered_%d.vcf".format(scatterCount))
|
||||||
|
ug.scatterCount = scatterCount
|
||||||
|
add(ug)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,19 +26,19 @@ package org.broadinstitute.sting.queue.extensions.gatk
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
|
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
|
||||||
import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction
|
import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction
|
||||||
import org.broadinstitute.sting.queue.function.QFunction
|
import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction}
|
||||||
import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor
|
import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merges BAM files using net.sf.picard.sam.MergeSamFiles.
|
* Merges BAM files using net.sf.picard.sam.MergeSamFiles.
|
||||||
*/
|
*/
|
||||||
class BamGatherFunction extends GatherFunction with PicardBamFunction {
|
class BamGatherFunction extends GatherFunction with PicardBamFunction with RetryMemoryLimit {
|
||||||
this.javaMainClass = "net.sf.picard.sam.MergeSamFiles"
|
this.javaMainClass = "net.sf.picard.sam.MergeSamFiles"
|
||||||
this.assumeSorted = Some(true)
|
this.assumeSorted = Some(true)
|
||||||
protected def inputBams = gatherParts
|
protected def inputBams = gatherParts
|
||||||
protected def outputBam = originalOutput
|
protected def outputBam = originalOutput
|
||||||
|
|
||||||
override def freezeFieldValues {
|
override def freezeFieldValues() {
|
||||||
val originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
|
val originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
|
||||||
|
|
||||||
// Whatever the original function can handle, merging *should* do less.
|
// Whatever the original function can handle, merging *should* do less.
|
||||||
|
|
|
||||||
|
|
@ -25,13 +25,13 @@
|
||||||
package org.broadinstitute.sting.queue.extensions.gatk
|
package org.broadinstitute.sting.queue.extensions.gatk
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
|
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
|
||||||
import org.broadinstitute.sting.queue.function.QFunction
|
import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction}
|
||||||
import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor
|
import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merges a vcf text file.
|
* Merges a vcf text file.
|
||||||
*/
|
*/
|
||||||
class VcfGatherFunction extends CombineVariants with GatherFunction {
|
class VcfGatherFunction extends CombineVariants with GatherFunction with RetryMemoryLimit {
|
||||||
this.assumeIdenticalSamples = true
|
this.assumeIdenticalSamples = true
|
||||||
this.suppressCommandLineHeader = true
|
this.suppressCommandLineHeader = true
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@
|
||||||
package org.broadinstitute.sting.queue.function
|
package org.broadinstitute.sting.queue.function
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.util._
|
import org.broadinstitute.sting.queue.util._
|
||||||
|
import org.broadinstitute.sting.commandline.Argument
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A command line that will be run in a pipeline.
|
* A command line that will be run in a pipeline.
|
||||||
|
|
@ -33,12 +34,15 @@ trait CommandLineFunction extends QFunction with Logging {
|
||||||
def commandLine: String
|
def commandLine: String
|
||||||
|
|
||||||
/** Upper memory limit */
|
/** Upper memory limit */
|
||||||
|
@Argument(doc="Memory limit", required=false)
|
||||||
var memoryLimit: Option[Double] = None
|
var memoryLimit: Option[Double] = None
|
||||||
|
|
||||||
/** Resident memory limit */
|
/** Resident memory limit */
|
||||||
|
@Argument(doc="Resident memory limit", required=false)
|
||||||
var residentLimit: Option[Double] = None
|
var residentLimit: Option[Double] = None
|
||||||
|
|
||||||
/** Resident memory request */
|
/** Resident memory request */
|
||||||
|
@Argument(doc="Resident memory request", required=false)
|
||||||
var residentRequest: Option[Double] = None
|
var residentRequest: Option[Double] = None
|
||||||
|
|
||||||
/** the number of SMP cores this job wants */
|
/** the number of SMP cores this job wants */
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
|
||||||
/**
|
/**
|
||||||
* Memory limit for the java executable, or if None will use the default memoryLimit.
|
* Memory limit for the java executable, or if None will use the default memoryLimit.
|
||||||
*/
|
*/
|
||||||
|
@Argument(doc="Java memory limit", required=false)
|
||||||
var javaMemoryLimit: Option[Double] = None
|
var javaMemoryLimit: Option[Double] = None
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -113,11 +113,13 @@ trait QFunction extends Logging with QJobReport {
|
||||||
var jobErrorFile: File = _
|
var jobErrorFile: File = _
|
||||||
|
|
||||||
/** Errors (if any) from the last failed run of jobErrorFiles. */
|
/** Errors (if any) from the last failed run of jobErrorFiles. */
|
||||||
|
@Argument(doc="Job error lines", required=false)
|
||||||
var jobErrorLines: Seq[String] = Nil
|
var jobErrorLines: Seq[String] = Nil
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of times this function has previously been run.
|
* The number of times this function has previously been run.
|
||||||
*/
|
*/
|
||||||
|
@Argument(doc="Job retries", required=false)
|
||||||
var retries = 0
|
var retries = 0
|
||||||
|
|
||||||
/** Change settings for the next run. Retries will be set to the number of times the function was run and jobErrorLines may contain the error text. */
|
/** Change settings for the next run. Retries will be set to the number of times the function was run and jobErrorLines may contain the error text. */
|
||||||
|
|
@ -541,4 +543,11 @@ object QFunction {
|
||||||
classFields
|
classFields
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Seq of fields for a QFunction class.
|
||||||
|
* @param clazz Class to retrieve fields for.
|
||||||
|
* @return the fields of the class.
|
||||||
|
*/
|
||||||
|
def classFunctionFields(clazz: Class[_]) = classFields(clazz).functionFields
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,17 +24,26 @@
|
||||||
|
|
||||||
package org.broadinstitute.sting.queue.function
|
package org.broadinstitute.sting.queue.function
|
||||||
|
|
||||||
|
import org.broadinstitute.sting.commandline.Argument
|
||||||
|
|
||||||
|
object RetryMemoryLimit {
|
||||||
|
private val defaultRetryMemoryFunction: (Double => Double) = ( 2 * _ )
|
||||||
|
private val defaultMemoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT")
|
||||||
|
}
|
||||||
|
|
||||||
/** A mixin that on retry increases the memory limit when certain text is found. */
|
/** A mixin that on retry increases the memory limit when certain text is found. */
|
||||||
trait RetryMemoryLimit extends CommandLineFunction {
|
trait RetryMemoryLimit extends CommandLineFunction {
|
||||||
|
|
||||||
/** How to increase the memory. By default doubles the memory. */
|
/** How to increase the memory. By default doubles the memory. */
|
||||||
var retryMemoryFunction: (Double => Double) = (2 * _)
|
var retryMemoryFunction: (Double => Double) = RetryMemoryLimit.defaultRetryMemoryFunction
|
||||||
|
|
||||||
/** Once the threshold is passed, no more memory will be added to memory limit. */
|
/** Once the threshold is passed, no more memory will be added to memory limit. */
|
||||||
|
@Argument(doc="threshold to stop doubling the memory", required=false)
|
||||||
var memoryLimitThreshold: Option[Double] = None
|
var memoryLimitThreshold: Option[Double] = None
|
||||||
|
|
||||||
/** Various strings to look for to determine we ran out of memory. */
|
/** Various strings to look for to determine we ran out of memory. */
|
||||||
var memoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT")
|
@Argument(doc="text to look for in the errors", required = false)
|
||||||
|
var memoryLimitErrorText = RetryMemoryLimit.defaultMemoryLimitErrorText
|
||||||
|
|
||||||
override def freezeFieldValues() {
|
override def freezeFieldValues() {
|
||||||
super.freezeFieldValues()
|
super.freezeFieldValues()
|
||||||
|
|
@ -42,6 +51,21 @@ trait RetryMemoryLimit extends CommandLineFunction {
|
||||||
this.memoryLimitThreshold = this.qSettings.memoryLimitThreshold
|
this.memoryLimitThreshold = this.qSettings.memoryLimitThreshold
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
override def copySettingsTo(function: QFunction) {
|
||||||
|
super.copySettingsTo(function)
|
||||||
|
function match {
|
||||||
|
case retryMemoryLimit: RetryMemoryLimit =>
|
||||||
|
if (retryMemoryLimit.memoryLimitThreshold.isEmpty)
|
||||||
|
retryMemoryLimit.memoryLimitThreshold = this.memoryLimitThreshold
|
||||||
|
if (retryMemoryLimit.retryMemoryFunction == RetryMemoryLimit.defaultRetryMemoryFunction)
|
||||||
|
retryMemoryLimit.retryMemoryFunction = this.retryMemoryFunction
|
||||||
|
if (retryMemoryLimit.memoryLimitErrorText == RetryMemoryLimit.defaultMemoryLimitErrorText)
|
||||||
|
retryMemoryLimit.memoryLimitErrorText = this.memoryLimitErrorText
|
||||||
|
case _ => /* ignore */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override def setupRetry() {
|
override def setupRetry() {
|
||||||
super.setupRetry()
|
super.setupRetry()
|
||||||
if (this.memoryLimitThreshold.isDefined && this.memoryLimit.isDefined) {
|
if (this.memoryLimitThreshold.isDefined && this.memoryLimit.isDefined) {
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,10 @@ import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
|
||||||
/**
|
/**
|
||||||
* Shadow clones another command line function.
|
* Shadow clones another command line function.
|
||||||
*/
|
*/
|
||||||
|
object CloneFunction {
|
||||||
|
private lazy val cloneFunctionFields = QFunction.classFunctionFields(classOf[CloneFunction])
|
||||||
|
}
|
||||||
|
|
||||||
class CloneFunction extends CommandLineFunction {
|
class CloneFunction extends CommandLineFunction {
|
||||||
var originalFunction: ScatterGatherableFunction = _
|
var originalFunction: ScatterGatherableFunction = _
|
||||||
var cloneIndex: Int = _
|
var cloneIndex: Int = _
|
||||||
|
|
@ -41,10 +45,10 @@ class CloneFunction extends CommandLineFunction {
|
||||||
var originalValues = Map.empty[ArgumentSource, Any]
|
var originalValues = Map.empty[ArgumentSource, Any]
|
||||||
withScatterPartCount += 1
|
withScatterPartCount += 1
|
||||||
if (withScatterPartCount == 1) {
|
if (withScatterPartCount == 1) {
|
||||||
overriddenFields.foreach{
|
originalFunction.functionFields.foreach {
|
||||||
case (field, overrideValue) => {
|
case (field) => {
|
||||||
originalValues += field -> originalFunction.getFieldValue(field)
|
originalValues += field -> originalFunction.getFieldValue(field)
|
||||||
originalFunction.setFieldValue(field, overrideValue)
|
originalFunction.setFieldValue(field, getFieldValue(field))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -52,9 +56,11 @@ class CloneFunction extends CommandLineFunction {
|
||||||
f()
|
f()
|
||||||
} finally {
|
} finally {
|
||||||
if (withScatterPartCount == 1) {
|
if (withScatterPartCount == 1) {
|
||||||
originalValues.foreach{
|
originalFunction.functionFields.foreach {
|
||||||
case (name, value) =>
|
case (field) => {
|
||||||
originalFunction.setFieldValue(name, value)
|
setFieldValue(field, originalFunction.getFieldValue(field))
|
||||||
|
originalFunction.setFieldValue(field, originalValues(field))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
withScatterPartCount -= 1
|
withScatterPartCount -= 1
|
||||||
|
|
@ -63,6 +69,8 @@ class CloneFunction extends CommandLineFunction {
|
||||||
|
|
||||||
override def description = withScatterPart(() => originalFunction.description)
|
override def description = withScatterPart(() => originalFunction.description)
|
||||||
override def shortDescription = withScatterPart(() => originalFunction.shortDescription)
|
override def shortDescription = withScatterPart(() => originalFunction.shortDescription)
|
||||||
|
override def setupRetry() { withScatterPart(() => originalFunction.setupRetry()) }
|
||||||
|
|
||||||
override protected def functionFieldClass = originalFunction.getClass
|
override protected def functionFieldClass = originalFunction.getClass
|
||||||
|
|
||||||
def commandLine = withScatterPart(() => originalFunction.commandLine)
|
def commandLine = withScatterPart(() => originalFunction.commandLine)
|
||||||
|
|
@ -73,13 +81,19 @@ class CloneFunction extends CommandLineFunction {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getFieldValue(source: ArgumentSource): AnyRef = {
|
override def getFieldValue(source: ArgumentSource): AnyRef = {
|
||||||
overriddenFields.get(source) match {
|
CloneFunction.cloneFunctionFields.find(_.field.getName == source.field.getName) match {
|
||||||
case Some(value) => value.asInstanceOf[AnyRef]
|
case Some(cloneSource) =>
|
||||||
case None => {
|
super.getFieldValue(cloneSource)
|
||||||
val value = originalFunction.getFieldValue(source)
|
case None =>
|
||||||
overriddenFields += source -> value
|
overriddenFields.get(source) match {
|
||||||
value
|
case Some(value) =>
|
||||||
}
|
value.asInstanceOf[AnyRef]
|
||||||
|
case None => {
|
||||||
|
val value = originalFunction.getFieldValue(source)
|
||||||
|
overriddenFields += source -> value
|
||||||
|
value
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue