From e9b9e2318cccfce10930080defcd072c72a5a918 Mon Sep 17 00:00:00 2001 From: Johan Dahlberg Date: Wed, 3 Oct 2012 11:35:43 +0200 Subject: [PATCH 1/2] Fixed SortSam bug, for .done file The *.bai.done file for the .bai file was written in the run directory instead of in the specified output directory. Changing getName() to getAbsolutePath() fixes this. Signed-off-by: Joel Thibault --- .../broadinstitute/sting/queue/extensions/picard/SortSam.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/extensions/picard/SortSam.scala b/public/scala/src/org/broadinstitute/sting/queue/extensions/picard/SortSam.scala index 9257cc7c2..b22bb2b59 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/extensions/picard/SortSam.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/extensions/picard/SortSam.scala @@ -50,7 +50,7 @@ class SortSam extends org.broadinstitute.sting.queue.function.JavaCommandLineFun override def freezeFieldValues() { super.freezeFieldValues() if (outputIndex == null && output != null) - outputIndex = new File(output.getName.stripSuffix(".bam") + ".bai") + outputIndex = new File(output.getAbsolutePath.stripSuffix(".bam") + ".bai") } From f66284658d611f9fde78f3211dbfff5682cb4886 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Tue, 9 Oct 2012 18:31:56 -0400 Subject: [PATCH 2/2] RetryMemoryLimit now works with Scatter/Gather. --- .../examples/ExampleRetryMemoryLimit.scala | 20 ++++++---- .../extensions/gatk/BamGatherFunction.scala | 6 +-- .../extensions/gatk/VcfGatherFunction.scala | 4 +- .../queue/function/CommandLineFunction.scala | 4 ++ .../function/JavaCommandLineFunction.scala | 1 + .../sting/queue/function/QFunction.scala | 9 +++++ .../queue/function/RetryMemoryLimit.scala | 28 ++++++++++++- .../scattergather/CloneFunction.scala | 40 +++++++++++++------ 8 files changed, 84 insertions(+), 28 deletions(-) diff --git a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleRetryMemoryLimit.scala b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleRetryMemoryLimit.scala index 09a24e782..1cd5a7512 100644 --- a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleRetryMemoryLimit.scala +++ b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/ExampleRetryMemoryLimit.scala @@ -10,13 +10,17 @@ class ExampleRetryMemoryLimit extends QScript { var bamFile: File = _ def script() { - val ug = new UnifiedGenotyper with RetryMemoryLimit - // First run with 1m - ug.memoryLimit = .001 - // On retry run with 1g - ug.retryMemoryFunction = (d => d * 1000) - ug.reference_sequence = referenceFile - ug.input_file = Seq(bamFile) - add(ug) + for (scatterCount <- 1 to 2) { + val ug = new UnifiedGenotyper with RetryMemoryLimit + // First run with 1m + ug.memoryLimit = .001 + // On retry run with 1g + ug.retryMemoryFunction = (d => d * 1000) + ug.reference_sequence = referenceFile + ug.input_file = Seq(bamFile) + ug.out = swapExt(bamFile, ".bam", ".scattered_%d.vcf".format(scatterCount)) + ug.scatterCount = scatterCount + add(ug) + } } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala index 6cd4b06bc..9522ec86c 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala @@ -26,19 +26,19 @@ package org.broadinstitute.sting.queue.extensions.gatk import org.broadinstitute.sting.queue.function.scattergather.GatherFunction import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction -import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction} import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor /** * Merges BAM files using net.sf.picard.sam.MergeSamFiles. */ -class BamGatherFunction extends GatherFunction with PicardBamFunction { +class BamGatherFunction extends GatherFunction with PicardBamFunction with RetryMemoryLimit { this.javaMainClass = "net.sf.picard.sam.MergeSamFiles" this.assumeSorted = Some(true) protected def inputBams = gatherParts protected def outputBam = originalOutput - override def freezeFieldValues { + override def freezeFieldValues() { val originalGATK = originalFunction.asInstanceOf[CommandLineGATK] // Whatever the original function can handle, merging *should* do less. diff --git a/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala index 739e6cc91..75be4d773 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -25,13 +25,13 @@ package org.broadinstitute.sting.queue.extensions.gatk import org.broadinstitute.sting.queue.function.scattergather.GatherFunction -import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.queue.function.{RetryMemoryLimit, QFunction} import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor /** * Merges a vcf text file. */ -class VcfGatherFunction extends CombineVariants with GatherFunction { +class VcfGatherFunction extends CombineVariants with GatherFunction with RetryMemoryLimit { this.assumeIdenticalSamples = true this.suppressCommandLineHeader = true diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 84b625760..eb426d301 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -25,6 +25,7 @@ package org.broadinstitute.sting.queue.function import org.broadinstitute.sting.queue.util._ +import org.broadinstitute.sting.commandline.Argument /** * A command line that will be run in a pipeline. @@ -33,12 +34,15 @@ trait CommandLineFunction extends QFunction with Logging { def commandLine: String /** Upper memory limit */ + @Argument(doc="Memory limit", required=false) var memoryLimit: Option[Double] = None /** Resident memory limit */ + @Argument(doc="Resident memory limit", required=false) var residentLimit: Option[Double] = None /** Resident memory request */ + @Argument(doc="Resident memory request", required=false) var residentRequest: Option[Double] = None /** the number of SMP cores this job wants */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala index b9cb8540f..6500360c0 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala @@ -47,6 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction { /** * Memory limit for the java executable, or if None will use the default memoryLimit. */ + @Argument(doc="Java memory limit", required=false) var javaMemoryLimit: Option[Double] = None /** diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 9f7932d39..aae846534 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -113,11 +113,13 @@ trait QFunction extends Logging with QJobReport { var jobErrorFile: File = _ /** Errors (if any) from the last failed run of jobErrorFiles. */ + @Argument(doc="Job error lines", required=false) var jobErrorLines: Seq[String] = Nil /** * The number of times this function has previously been run. */ + @Argument(doc="Job retries", required=false) var retries = 0 /** Change settings for the next run. Retries will be set to the number of times the function was run and jobErrorLines may contain the error text. */ @@ -541,4 +543,11 @@ object QFunction { classFields } } + + /** + * Returns the Seq of fields for a QFunction class. + * @param clazz Class to retrieve fields for. + * @return the fields of the class. + */ + def classFunctionFields(clazz: Class[_]) = classFields(clazz).functionFields } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/RetryMemoryLimit.scala b/public/scala/src/org/broadinstitute/sting/queue/function/RetryMemoryLimit.scala index 8bba5551f..acc9a7203 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/RetryMemoryLimit.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/RetryMemoryLimit.scala @@ -24,17 +24,26 @@ package org.broadinstitute.sting.queue.function +import org.broadinstitute.sting.commandline.Argument + +object RetryMemoryLimit { + private val defaultRetryMemoryFunction: (Double => Double) = ( 2 * _ ) + private val defaultMemoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT") +} + /** A mixin that on retry increases the memory limit when certain text is found. */ trait RetryMemoryLimit extends CommandLineFunction { /** How to increase the memory. By default doubles the memory. */ - var retryMemoryFunction: (Double => Double) = (2 * _) + var retryMemoryFunction: (Double => Double) = RetryMemoryLimit.defaultRetryMemoryFunction /** Once the threshold is passed, no more memory will be added to memory limit. */ + @Argument(doc="threshold to stop doubling the memory", required=false) var memoryLimitThreshold: Option[Double] = None /** Various strings to look for to determine we ran out of memory. */ - var memoryLimitErrorText = Seq("OutOfMemory", "you did not provide enough memory", "TERM_MEMLIMIT") + @Argument(doc="text to look for in the errors", required = false) + var memoryLimitErrorText = RetryMemoryLimit.defaultMemoryLimitErrorText override def freezeFieldValues() { super.freezeFieldValues() @@ -42,6 +51,21 @@ trait RetryMemoryLimit extends CommandLineFunction { this.memoryLimitThreshold = this.qSettings.memoryLimitThreshold } + + override def copySettingsTo(function: QFunction) { + super.copySettingsTo(function) + function match { + case retryMemoryLimit: RetryMemoryLimit => + if (retryMemoryLimit.memoryLimitThreshold.isEmpty) + retryMemoryLimit.memoryLimitThreshold = this.memoryLimitThreshold + if (retryMemoryLimit.retryMemoryFunction == RetryMemoryLimit.defaultRetryMemoryFunction) + retryMemoryLimit.retryMemoryFunction = this.retryMemoryFunction + if (retryMemoryLimit.memoryLimitErrorText == RetryMemoryLimit.defaultMemoryLimitErrorText) + retryMemoryLimit.memoryLimitErrorText = this.memoryLimitErrorText + case _ => /* ignore */ + } + } + override def setupRetry() { super.setupRetry() if (this.memoryLimitThreshold.isDefined && this.memoryLimit.isDefined) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index 5b4f2b7e6..686188e72 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -30,6 +30,10 @@ import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** * Shadow clones another command line function. */ +object CloneFunction { + private lazy val cloneFunctionFields = QFunction.classFunctionFields(classOf[CloneFunction]) +} + class CloneFunction extends CommandLineFunction { var originalFunction: ScatterGatherableFunction = _ var cloneIndex: Int = _ @@ -41,10 +45,10 @@ class CloneFunction extends CommandLineFunction { var originalValues = Map.empty[ArgumentSource, Any] withScatterPartCount += 1 if (withScatterPartCount == 1) { - overriddenFields.foreach{ - case (field, overrideValue) => { + originalFunction.functionFields.foreach { + case (field) => { originalValues += field -> originalFunction.getFieldValue(field) - originalFunction.setFieldValue(field, overrideValue) + originalFunction.setFieldValue(field, getFieldValue(field)) } } } @@ -52,9 +56,11 @@ class CloneFunction extends CommandLineFunction { f() } finally { if (withScatterPartCount == 1) { - originalValues.foreach{ - case (name, value) => - originalFunction.setFieldValue(name, value) + originalFunction.functionFields.foreach { + case (field) => { + setFieldValue(field, originalFunction.getFieldValue(field)) + originalFunction.setFieldValue(field, originalValues(field)) + } } } withScatterPartCount -= 1 @@ -63,6 +69,8 @@ class CloneFunction extends CommandLineFunction { override def description = withScatterPart(() => originalFunction.description) override def shortDescription = withScatterPart(() => originalFunction.shortDescription) + override def setupRetry() { withScatterPart(() => originalFunction.setupRetry()) } + override protected def functionFieldClass = originalFunction.getClass def commandLine = withScatterPart(() => originalFunction.commandLine) @@ -73,13 +81,19 @@ class CloneFunction extends CommandLineFunction { } override def getFieldValue(source: ArgumentSource): AnyRef = { - overriddenFields.get(source) match { - case Some(value) => value.asInstanceOf[AnyRef] - case None => { - val value = originalFunction.getFieldValue(source) - overriddenFields += source -> value - value - } + CloneFunction.cloneFunctionFields.find(_.field.getName == source.field.getName) match { + case Some(cloneSource) => + super.getFieldValue(cloneSource) + case None => + overriddenFields.get(source) match { + case Some(value) => + value.asInstanceOf[AnyRef] + case None => { + val value = originalFunction.getFieldValue(source) + overriddenFields += source -> value + value + } + } } }