Merge branch 'master' of ssh://gsa2.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable

This commit is contained in:
Ryan Poplin 2012-10-10 10:47:40 -04:00
commit 15b405d458
9 changed files with 85 additions and 29 deletions

View File

@ -10,13 +10,17 @@ class ExampleRetryMemoryLimit extends QScript {
var bamFile: File = _
def script() {
val ug = new UnifiedGenotyper with RetryMemoryLimit
// First run with 1m
ug.memoryLimit = .001
// On retry run with 1g
ug.retryMemoryFunction = (d => d * 1000)
ug.reference_sequence = referenceFile
ug.input_file = Seq(bamFile)
add(ug)
for (scatterCount <- 1 to 2) {
val ug = new UnifiedGenotyper with RetryMemoryLimit
// First run with 1m
ug.memoryLimit = .001
// On retry run with 1g
ug.retryMemoryFunction = (d => d * 1000)
ug.reference_sequence = referenceFile
ug.input_file = Seq(bamFile)
ug.out = swapExt(bamFile, ".bam", ".scattered_%d.vcf".format(scatterCount))
ug.scatterCount = scatterCount
add(ug)
}
}
}

View File

@ -26,19 +26,19 @@ package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction}
import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor
/**
* Merges BAM files using net.sf.picard.sam.MergeSamFiles.
*/
class BamGatherFunction extends GatherFunction with PicardBamFunction {
class BamGatherFunction extends GatherFunction with PicardBamFunction with RetryMemoryLimit {
this.javaMainClass = "net.sf.picard.sam.MergeSamFiles"
this.assumeSorted = Some(true)
protected def inputBams = gatherParts
protected def outputBam = originalOutput
override def freezeFieldValues {
override def freezeFieldValues() {
val originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
// Whatever the original function can handle, merging *should* do less.

View File

@ -25,13 +25,13 @@
package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction}
import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor
/**
* Merges a vcf text file.
*/
class VcfGatherFunction extends CombineVariants with GatherFunction {
class VcfGatherFunction extends CombineVariants with GatherFunction with RetryMemoryLimit {
this.assumeIdenticalSamples = true
this.suppressCommandLineHeader = true

View File

@ -50,7 +50,7 @@ class SortSam extends org.broadinstitute.sting.queue.function.JavaCommandLineFun
override def freezeFieldValues() {
super.freezeFieldValues()
if (outputIndex == null && output != null)
outputIndex = new File(output.getName.stripSuffix(".bam") + ".bai")
outputIndex = new File(output.getAbsolutePath.stripSuffix(".bam") + ".bai")
}

View File

@ -25,6 +25,7 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.commandline.Argument
/**
* A command line that will be run in a pipeline.
@ -33,12 +34,15 @@ trait CommandLineFunction extends QFunction with Logging {
def commandLine: String
/** Upper memory limit */
@Argument(doc="Memory limit", required=false)
var memoryLimit: Option[Double] = None
/** Resident memory limit */
@Argument(doc="Resident memory limit", required=false)
var residentLimit: Option[Double] = None
/** Resident memory request */
@Argument(doc="Resident memory request", required=false)
var residentRequest: Option[Double] = None
/** the number of SMP cores this job wants */

View File

@ -47,6 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction {
/**
* Memory limit for the java executable, or if None will use the default memoryLimit.
*/
@Argument(doc="Java memory limit", required=false)
var javaMemoryLimit: Option[Double] = None
/**

View File

@ -113,11 +113,13 @@ trait QFunction extends Logging with QJobReport {
var jobErrorFile: File = _
/** Errors (if any) from the last failed run of jobErrorFiles. */
@Argument(doc="Job error lines", required=false)
var jobErrorLines: Seq[String] = Nil
/**
* The number of times this function has previously been run.
*/
@Argument(doc="Job retries", required=false)
var retries = 0
/** Change settings for the next run. Retries will be set to the number of times the function was run and jobErrorLines may contain the error text. */
@ -541,4 +543,11 @@ object QFunction {
classFields
}
}
/**
* Returns the Seq of fields for a QFunction class.
* @param clazz Class to retrieve fields for.
* @return the fields of the class.
*/
def classFunctionFields(clazz: Class[_]) = classFields(clazz).functionFields
}

View File

@ -24,17 +24,26 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.commandline.Argument
object RetryMemoryLimit {
private val defaultRetryMemoryFunction: (Double => Double) = ( 2 * _ )
private val defaultMemoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT")
}
/** A mixin that on retry increases the memory limit when certain text is found. */
trait RetryMemoryLimit extends CommandLineFunction {
/** How to increase the memory. By default doubles the memory. */
var retryMemoryFunction: (Double => Double) = (2 * _)
var retryMemoryFunction: (Double => Double) = RetryMemoryLimit.defaultRetryMemoryFunction
/** Once the threshold is passed, no more memory will be added to memory limit. */
@Argument(doc="threshold to stop doubling the memory", required=false)
var memoryLimitThreshold: Option[Double] = None
/** Various strings to look for to determine we ran out of memory. */
var memoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT")
@Argument(doc="text to look for in the errors", required = false)
var memoryLimitErrorText = RetryMemoryLimit.defaultMemoryLimitErrorText
override def freezeFieldValues() {
super.freezeFieldValues()
@ -42,6 +51,21 @@ trait RetryMemoryLimit extends CommandLineFunction {
this.memoryLimitThreshold = this.qSettings.memoryLimitThreshold
}
override def copySettingsTo(function: QFunction) {
super.copySettingsTo(function)
function match {
case retryMemoryLimit: RetryMemoryLimit =>
if (retryMemoryLimit.memoryLimitThreshold.isEmpty)
retryMemoryLimit.memoryLimitThreshold = this.memoryLimitThreshold
if (retryMemoryLimit.retryMemoryFunction == RetryMemoryLimit.defaultRetryMemoryFunction)
retryMemoryLimit.retryMemoryFunction = this.retryMemoryFunction
if (retryMemoryLimit.memoryLimitErrorText == RetryMemoryLimit.defaultMemoryLimitErrorText)
retryMemoryLimit.memoryLimitErrorText = this.memoryLimitErrorText
case _ => /* ignore */
}
}
override def setupRetry() {
super.setupRetry()
if (this.memoryLimitThreshold.isDefined && this.memoryLimit.isDefined) {

View File

@ -30,6 +30,10 @@ import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
/**
* Shadow clones another command line function.
*/
object CloneFunction {
private lazy val cloneFunctionFields = QFunction.classFunctionFields(classOf[CloneFunction])
}
class CloneFunction extends CommandLineFunction {
var originalFunction: ScatterGatherableFunction = _
var cloneIndex: Int = _
@ -41,10 +45,10 @@ class CloneFunction extends CommandLineFunction {
var originalValues = Map.empty[ArgumentSource, Any]
withScatterPartCount += 1
if (withScatterPartCount == 1) {
overriddenFields.foreach{
case (field, overrideValue) => {
originalFunction.functionFields.foreach {
case (field) => {
originalValues += field -> originalFunction.getFieldValue(field)
originalFunction.setFieldValue(field, overrideValue)
originalFunction.setFieldValue(field, getFieldValue(field))
}
}
}
@ -52,9 +56,11 @@ class CloneFunction extends CommandLineFunction {
f()
} finally {
if (withScatterPartCount == 1) {
originalValues.foreach{
case (name, value) =>
originalFunction.setFieldValue(name, value)
originalFunction.functionFields.foreach {
case (field) => {
setFieldValue(field, originalFunction.getFieldValue(field))
originalFunction.setFieldValue(field, originalValues(field))
}
}
}
withScatterPartCount -= 1
@ -63,6 +69,8 @@ class CloneFunction extends CommandLineFunction {
override def description = withScatterPart(() => originalFunction.description)
override def shortDescription = withScatterPart(() => originalFunction.shortDescription)
override def setupRetry() { withScatterPart(() => originalFunction.setupRetry()) }
override protected def functionFieldClass = originalFunction.getClass
def commandLine = withScatterPart(() => originalFunction.commandLine)
@ -73,13 +81,19 @@ class CloneFunction extends CommandLineFunction {
}
override def getFieldValue(source: ArgumentSource): AnyRef = {
overriddenFields.get(source) match {
case Some(value) => value.asInstanceOf[AnyRef]
case None => {
val value = originalFunction.getFieldValue(source)
overriddenFields += source -> value
value
}
CloneFunction.cloneFunctionFields.find(_.field.getName == source.field.getName) match {
case Some(cloneSource) =>
super.getFieldValue(cloneSource)
case None =>
overriddenFields.get(source) match {
case Some(value) =>
value.asInstanceOf[AnyRef]
case None => {
val value = originalFunction.getFieldValue(source)
overriddenFields += source -> value
value
}
}
}
}