diff --git a/java/src/org/broadinstitute/sting/gatk/io/stubs/SAMFileWriterArgumentTypeDescriptor.java b/java/src/org/broadinstitute/sting/gatk/io/stubs/SAMFileWriterArgumentTypeDescriptor.java index 5b5270a70..0ba01b22f 100644 --- a/java/src/org/broadinstitute/sting/gatk/io/stubs/SAMFileWriterArgumentTypeDescriptor.java +++ b/java/src/org/broadinstitute/sting/gatk/io/stubs/SAMFileWriterArgumentTypeDescriptor.java @@ -42,13 +42,13 @@ import java.io.OutputStream; * Insert a SAMFileWriterStub instead of a full-fledged concrete OutputStream implementations. */ public class SAMFileWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor { - private static final String DEFAULT_ARGUMENT_FULLNAME = "outputBAM"; - private static final String DEFAULT_ARGUMENT_SHORTNAME = "ob"; + public static final String DEFAULT_ARGUMENT_FULLNAME = "outputBAM"; + public static final String DEFAULT_ARGUMENT_SHORTNAME = "ob"; - private static final String COMPRESSION_FULLNAME = "bam_compression"; - private static final String COMPRESSION_SHORTNAME = "compress"; + public static final String COMPRESSION_FULLNAME = "bam_compression"; + public static final String COMPRESSION_SHORTNAME = "compress"; - private static final String CREATE_INDEX_FULLNAME = "index_output_bam_on_the_fly"; + public static final String CREATE_INDEX_FULLNAME = "index_output_bam_on_the_fly"; /** * The engine into which output stubs should be fed. diff --git a/java/src/org/broadinstitute/sting/gatk/io/stubs/VCFWriterArgumentTypeDescriptor.java b/java/src/org/broadinstitute/sting/gatk/io/stubs/VCFWriterArgumentTypeDescriptor.java index 510bcba5e..f36e02f23 100644 --- a/java/src/org/broadinstitute/sting/gatk/io/stubs/VCFWriterArgumentTypeDescriptor.java +++ b/java/src/org/broadinstitute/sting/gatk/io/stubs/VCFWriterArgumentTypeDescriptor.java @@ -41,9 +41,9 @@ import java.util.*; * @version 0.1 */ public class VCFWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor { - private static final String NO_HEADER_ARG_NAME = "NO_HEADER"; - private static final String SITES_ONLY_ARG_NAME = "sites_only"; - private static final HashSet SUPPORTED_ZIPPED_SUFFIXES = new HashSet(); + public static final String NO_HEADER_ARG_NAME = "NO_HEADER"; + public static final String SITES_ONLY_ARG_NAME = "sites_only"; + public static final HashSet SUPPORTED_ZIPPED_SUFFIXES = new HashSet(); // // static list of zipped suffixes supported by this system. diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index a3f8f76a2..061aa6854 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -60,7 +60,7 @@ class QGraph extends Logging { private var running = true private val runningLock = new Object private var runningJobs = Set.empty[FunctionEdge] - private var intermediatesJobs = Set.empty[FunctionEdge] + private var cleanupJobs = Set.empty[FunctionEdge] private val nl = "%n".format() @@ -394,7 +394,7 @@ class QGraph extends Logging { logStatusCounts logNextStatusCounts = false - deleteDoneIntermediates(lastRunningCheck) + deleteCleanup(lastRunningCheck) if (readyJobs.size == 0 && runningJobs.size > 0) Thread.sleep(nextRunningCheck(lastRunningCheck)) @@ -410,8 +410,8 @@ class QGraph extends Logging { runningJobs --= doneJobs runningJobs --= failedJobs - if (!settings.keepIntermediates) - intermediatesJobs ++= doneJobs.filter(_.function.isIntermediate) + + addCleanup(doneJobs) statusCounts.running -= doneJobs.size statusCounts.running -= failedJobs.size @@ -430,7 +430,7 @@ class QGraph extends Logging { } logStatusCounts - deleteDoneIntermediates(-1) + deleteCleanup(-1) } catch { case e => logger.error("Uncaught error running jobs.", e) @@ -503,19 +503,36 @@ class QGraph extends Logging { if (edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) { logger.debug("Already done: " + edge.function.description) - if (!settings.keepIntermediates && edge.function.isIntermediate) - intermediatesJobs += edge + addCleanup(edge) } } + /** + * Checks if the functions should have their outptus removed after they finish running + * @param edges Functions to check + */ + private def addCleanup(edges: Traversable[FunctionEdge]) { + edges.foreach(addCleanup(_)) + } + + /** + * Checks if the function should have their outptus removed after they finish running + * @param edges Function to check + */ + private def addCleanup(edge: FunctionEdge) { + if (!settings.keepIntermediates) + if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs) + cleanupJobs += edge + } + /** * Continues deleting the outputs of intermediate jobs that are no longer needed until it's time to recheck running status. * @param lastRunningCheck The last time the status was checked. */ - private def deleteDoneIntermediates(lastRunningCheck: Long) { + private def deleteCleanup(lastRunningCheck: Long) { var doneJobs = Set.empty[FunctionEdge] - for (edge <- intermediatesJobs) { + for (edge <- cleanupJobs) { val nextDone = nextFunctions(edge).forall(next => { val status = next.status (status == RunnerStatus.DONE || status == RunnerStatus.SKIPPED) @@ -529,7 +546,7 @@ class QGraph extends Logging { if (running && !readyRunningCheck(lastRunningCheck)) { logger.debug("Deleting intermediates:" + edge.function.description) edge.function.deleteOutputs() - intermediatesJobs -= edge + cleanupJobs -= edge } } } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala index 91b9ae3fb..10de17bb8 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamGatherFunction.scala @@ -26,9 +26,11 @@ package org.broadinstitute.sting.queue.extensions.gatk import org.broadinstitute.sting.queue.function.scattergather.GatherFunction import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction +import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor /** - * Merges BAM files using Picards net.sf.picard.sam.MergeSamFiles. + * Merges BAM files using net.sf.picard.sam.MergeSamFiles. */ class BamGatherFunction extends GatherFunction with PicardBamFunction { this.javaMainClass = "net.sf.picard.sam.MergeSamFiles" @@ -36,8 +38,21 @@ class BamGatherFunction extends GatherFunction with PicardBamFunction { protected def inputBams = gatherParts protected def outputBam = originalOutput - override def init() { + override def freezeFieldValues { + val originalGATK = originalFunction.asInstanceOf[CommandLineGATK] + // Whatever the original function can handle, merging *should* do less. this.memoryLimit = originalFunction.memoryLimit + + // bam_compression and index_output_bam_on_the_fly from SAMFileWriterArgumentTypeDescriptor + // are added by the GATKExtensionsGenerator to the subclass of CommandLineGATK + + val compression = QFunction.findField(originalFunction.getClass, SAMFileWriterArgumentTypeDescriptor.COMPRESSION_FULLNAME) + this.compressionLevel = originalGATK.getFieldValue(compression).asInstanceOf[Option[Int]] + + val indexBam = QFunction.findField(originalFunction.getClass, SAMFileWriterArgumentTypeDescriptor.CREATE_INDEX_FULLNAME) + this.createIndex = originalGATK.getFieldValue(indexBam).asInstanceOf[Option[Boolean]] + + super.freezeFieldValues } } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala index 9ee7b8087..27e186585 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -25,6 +25,8 @@ package org.broadinstitute.sting.queue.extensions.gatk import org.broadinstitute.sting.queue.function.scattergather.GatherFunction +import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor /** * Merges a vcf text file. @@ -33,7 +35,7 @@ class VcfGatherFunction extends CombineVariants with GatherFunction { private lazy val originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK] - override def freezeFieldValues = { + override def freezeFieldValues { this.memoryLimit = Some(1) this.jarFile = this.originalGATK.jarFile @@ -46,6 +48,15 @@ class VcfGatherFunction extends CombineVariants with GatherFunction { this.out = this.originalOutput this.assumeIdenticalSamples = true + // NO_HEADER and sites_only from VCFWriterArgumentTypeDescriptor + // are added by the GATKExtensionsGenerator to the subclass of CommandLineGATK + + val noHeader = QFunction.findField(originalFunction.getClass, VCFWriterArgumentTypeDescriptor.NO_HEADER_ARG_NAME) + this.NO_HEADER = originalGATK.getFieldValue(noHeader).asInstanceOf[Boolean] + + val sitesOnly = QFunction.findField(originalFunction.getClass, VCFWriterArgumentTypeDescriptor.SITES_ONLY_ARG_NAME) + this.sites_only = originalGATK.getFieldValue(sitesOnly).asInstanceOf[Boolean] + super.freezeFieldValues } } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamFunction.scala index 386166421..2654e4a3d 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamFunction.scala @@ -37,9 +37,10 @@ import net.sf.samtools.SAMFileHeader.SortOrder * some values are optional. */ trait PicardBamFunction extends JavaCommandLineFunction { - var validationStringency: ValidationStringency = ValidationStringency.SILENT - var sortOrder: SortOrder = SortOrder.coordinate + var validationStringency = ValidationStringency.SILENT + var sortOrder = SortOrder.coordinate var compressionLevel: Option[Int] = None + var createIndex: Option[Boolean] = None var maxRecordsInRam: Option[Int] = None var assumeSorted: Option[Boolean] = None @@ -55,5 +56,6 @@ trait PicardBamFunction extends JavaCommandLineFunction { optional(" VALIDATION_STRINGENCY=", validationStringency), optional(" SO=", sortOrder), optional(" MAX_RECORDS_IN_RAM=", maxRecordsInRam), - optional(" ASSUME_SORTED=", assumeSorted)).mkString + optional(" ASSUME_SORTED=", assumeSorted), + optional(" CREATE_INDEX=", createIndex)).mkString }