Bam gathering passes on the compression_level and the create_index flag to MergeSamFiles.

VCF gathering passes on the no_header and sites_only flags to CombineVariants.
Fixed deletion of gathered log files. Although they are intermediate and do not need to be re-run if not present, they should not be deleted.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5508 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-03-25 03:58:38 +00:00
parent 47279ee56e
commit 3e3ff4a9e7
6 changed files with 69 additions and 24 deletions

View File

@ -42,13 +42,13 @@ import java.io.OutputStream;
* Insert a SAMFileWriterStub instead of a full-fledged concrete OutputStream implementations. * Insert a SAMFileWriterStub instead of a full-fledged concrete OutputStream implementations.
*/ */
public class SAMFileWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor { public class SAMFileWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor {
private static final String DEFAULT_ARGUMENT_FULLNAME = "outputBAM"; public static final String DEFAULT_ARGUMENT_FULLNAME = "outputBAM";
private static final String DEFAULT_ARGUMENT_SHORTNAME = "ob"; public static final String DEFAULT_ARGUMENT_SHORTNAME = "ob";
private static final String COMPRESSION_FULLNAME = "bam_compression"; public static final String COMPRESSION_FULLNAME = "bam_compression";
private static final String COMPRESSION_SHORTNAME = "compress"; public static final String COMPRESSION_SHORTNAME = "compress";
private static final String CREATE_INDEX_FULLNAME = "index_output_bam_on_the_fly"; public static final String CREATE_INDEX_FULLNAME = "index_output_bam_on_the_fly";
/** /**
* The engine into which output stubs should be fed. * The engine into which output stubs should be fed.

View File

@ -41,9 +41,9 @@ import java.util.*;
* @version 0.1 * @version 0.1
*/ */
public class VCFWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor { public class VCFWriterArgumentTypeDescriptor extends ArgumentTypeDescriptor {
private static final String NO_HEADER_ARG_NAME = "NO_HEADER"; public static final String NO_HEADER_ARG_NAME = "NO_HEADER";
private static final String SITES_ONLY_ARG_NAME = "sites_only"; public static final String SITES_ONLY_ARG_NAME = "sites_only";
private static final HashSet<String> SUPPORTED_ZIPPED_SUFFIXES = new HashSet<String>(); public static final HashSet<String> SUPPORTED_ZIPPED_SUFFIXES = new HashSet<String>();
// //
// static list of zipped suffixes supported by this system. // static list of zipped suffixes supported by this system.

View File

@ -60,7 +60,7 @@ class QGraph extends Logging {
private var running = true private var running = true
private val runningLock = new Object private val runningLock = new Object
private var runningJobs = Set.empty[FunctionEdge] private var runningJobs = Set.empty[FunctionEdge]
private var intermediatesJobs = Set.empty[FunctionEdge] private var cleanupJobs = Set.empty[FunctionEdge]
private val nl = "%n".format() private val nl = "%n".format()
@ -394,7 +394,7 @@ class QGraph extends Logging {
logStatusCounts logStatusCounts
logNextStatusCounts = false logNextStatusCounts = false
deleteDoneIntermediates(lastRunningCheck) deleteCleanup(lastRunningCheck)
if (readyJobs.size == 0 && runningJobs.size > 0) if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(nextRunningCheck(lastRunningCheck)) Thread.sleep(nextRunningCheck(lastRunningCheck))
@ -410,8 +410,8 @@ class QGraph extends Logging {
runningJobs --= doneJobs runningJobs --= doneJobs
runningJobs --= failedJobs runningJobs --= failedJobs
if (!settings.keepIntermediates)
intermediatesJobs ++= doneJobs.filter(_.function.isIntermediate) addCleanup(doneJobs)
statusCounts.running -= doneJobs.size statusCounts.running -= doneJobs.size
statusCounts.running -= failedJobs.size statusCounts.running -= failedJobs.size
@ -430,7 +430,7 @@ class QGraph extends Logging {
} }
logStatusCounts logStatusCounts
deleteDoneIntermediates(-1) deleteCleanup(-1)
} catch { } catch {
case e => case e =>
logger.error("Uncaught error running jobs.", e) logger.error("Uncaught error running jobs.", e)
@ -503,19 +503,36 @@ class QGraph extends Logging {
if (edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) { if (edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) {
logger.debug("Already done: " + edge.function.description) logger.debug("Already done: " + edge.function.description)
if (!settings.keepIntermediates && edge.function.isIntermediate) addCleanup(edge)
intermediatesJobs += edge
} }
} }
/**
* Checks if the functions should have their outptus removed after they finish running
* @param edges Functions to check
*/
private def addCleanup(edges: Traversable[FunctionEdge]) {
edges.foreach(addCleanup(_))
}
/**
* Checks if the function should have their outptus removed after they finish running
* @param edges Function to check
*/
private def addCleanup(edge: FunctionEdge) {
if (!settings.keepIntermediates)
if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs)
cleanupJobs += edge
}
/** /**
* Continues deleting the outputs of intermediate jobs that are no longer needed until it's time to recheck running status. * Continues deleting the outputs of intermediate jobs that are no longer needed until it's time to recheck running status.
* @param lastRunningCheck The last time the status was checked. * @param lastRunningCheck The last time the status was checked.
*/ */
private def deleteDoneIntermediates(lastRunningCheck: Long) { private def deleteCleanup(lastRunningCheck: Long) {
var doneJobs = Set.empty[FunctionEdge] var doneJobs = Set.empty[FunctionEdge]
for (edge <- intermediatesJobs) { for (edge <- cleanupJobs) {
val nextDone = nextFunctions(edge).forall(next => { val nextDone = nextFunctions(edge).forall(next => {
val status = next.status val status = next.status
(status == RunnerStatus.DONE || status == RunnerStatus.SKIPPED) (status == RunnerStatus.DONE || status == RunnerStatus.SKIPPED)
@ -529,7 +546,7 @@ class QGraph extends Logging {
if (running && !readyRunningCheck(lastRunningCheck)) { if (running && !readyRunningCheck(lastRunningCheck)) {
logger.debug("Deleting intermediates:" + edge.function.description) logger.debug("Deleting intermediates:" + edge.function.description)
edge.function.deleteOutputs() edge.function.deleteOutputs()
intermediatesJobs -= edge cleanupJobs -= edge
} }
} }
} }

View File

@ -26,9 +26,11 @@ package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor
/** /**
* Merges BAM files using Picards net.sf.picard.sam.MergeSamFiles. * Merges BAM files using net.sf.picard.sam.MergeSamFiles.
*/ */
class BamGatherFunction extends GatherFunction with PicardBamFunction { class BamGatherFunction extends GatherFunction with PicardBamFunction {
this.javaMainClass = "net.sf.picard.sam.MergeSamFiles" this.javaMainClass = "net.sf.picard.sam.MergeSamFiles"
@ -36,8 +38,21 @@ class BamGatherFunction extends GatherFunction with PicardBamFunction {
protected def inputBams = gatherParts protected def inputBams = gatherParts
protected def outputBam = originalOutput protected def outputBam = originalOutput
override def init() { override def freezeFieldValues {
val originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
// Whatever the original function can handle, merging *should* do less. // Whatever the original function can handle, merging *should* do less.
this.memoryLimit = originalFunction.memoryLimit this.memoryLimit = originalFunction.memoryLimit
// bam_compression and index_output_bam_on_the_fly from SAMFileWriterArgumentTypeDescriptor
// are added by the GATKExtensionsGenerator to the subclass of CommandLineGATK
val compression = QFunction.findField(originalFunction.getClass, SAMFileWriterArgumentTypeDescriptor.COMPRESSION_FULLNAME)
this.compressionLevel = originalGATK.getFieldValue(compression).asInstanceOf[Option[Int]]
val indexBam = QFunction.findField(originalFunction.getClass, SAMFileWriterArgumentTypeDescriptor.CREATE_INDEX_FULLNAME)
this.createIndex = originalGATK.getFieldValue(indexBam).asInstanceOf[Option[Boolean]]
super.freezeFieldValues
} }
} }

View File

@ -25,6 +25,8 @@
package org.broadinstitute.sting.queue.extensions.gatk package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.gatk.io.stubs.VCFWriterArgumentTypeDescriptor
/** /**
* Merges a vcf text file. * Merges a vcf text file.
@ -33,7 +35,7 @@ class VcfGatherFunction extends CombineVariants with GatherFunction {
private lazy val originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK] private lazy val originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK]
override def freezeFieldValues = { override def freezeFieldValues {
this.memoryLimit = Some(1) this.memoryLimit = Some(1)
this.jarFile = this.originalGATK.jarFile this.jarFile = this.originalGATK.jarFile
@ -46,6 +48,15 @@ class VcfGatherFunction extends CombineVariants with GatherFunction {
this.out = this.originalOutput this.out = this.originalOutput
this.assumeIdenticalSamples = true this.assumeIdenticalSamples = true
// NO_HEADER and sites_only from VCFWriterArgumentTypeDescriptor
// are added by the GATKExtensionsGenerator to the subclass of CommandLineGATK
val noHeader = QFunction.findField(originalFunction.getClass, VCFWriterArgumentTypeDescriptor.NO_HEADER_ARG_NAME)
this.NO_HEADER = originalGATK.getFieldValue(noHeader).asInstanceOf[Boolean]
val sitesOnly = QFunction.findField(originalFunction.getClass, VCFWriterArgumentTypeDescriptor.SITES_ONLY_ARG_NAME)
this.sites_only = originalGATK.getFieldValue(sitesOnly).asInstanceOf[Boolean]
super.freezeFieldValues super.freezeFieldValues
} }
} }

View File

@ -37,9 +37,10 @@ import net.sf.samtools.SAMFileHeader.SortOrder
* some values are optional. * some values are optional.
*/ */
trait PicardBamFunction extends JavaCommandLineFunction { trait PicardBamFunction extends JavaCommandLineFunction {
var validationStringency: ValidationStringency = ValidationStringency.SILENT var validationStringency = ValidationStringency.SILENT
var sortOrder: SortOrder = SortOrder.coordinate var sortOrder = SortOrder.coordinate
var compressionLevel: Option[Int] = None var compressionLevel: Option[Int] = None
var createIndex: Option[Boolean] = None
var maxRecordsInRam: Option[Int] = None var maxRecordsInRam: Option[Int] = None
var assumeSorted: Option[Boolean] = None var assumeSorted: Option[Boolean] = None
@ -55,5 +56,6 @@ trait PicardBamFunction extends JavaCommandLineFunction {
optional(" VALIDATION_STRINGENCY=", validationStringency), optional(" VALIDATION_STRINGENCY=", validationStringency),
optional(" SO=", sortOrder), optional(" SO=", sortOrder),
optional(" MAX_RECORDS_IN_RAM=", maxRecordsInRam), optional(" MAX_RECORDS_IN_RAM=", maxRecordsInRam),
optional(" ASSUME_SORTED=", assumeSorted)).mkString optional(" ASSUME_SORTED=", assumeSorted),
optional(" CREATE_INDEX=", createIndex)).mkString
} }