From 618c69f8dcf75b1bf458b1869c35f636f7acffbb Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 17 Aug 2010 14:59:42 +0000 Subject: [PATCH] More updates to the CleanBamFile pipeline. Added the a CommandLineFunction.jobDependencies that will explicitly force a function to wait for a file, even if the value isn't otherwise listed on an @Input. More bug fixes and refactoring of functions. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4048 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/kshakir/CleanBamFile.scala | 66 +++++++++++++++++-- scala/qscript/recalibrate.scala | 3 +- .../firehose/ImportSingleValueFunction.scala | 45 +++++++++++++ .../picard/PicardBamJarFunction.scala | 2 +- .../SamtoolsIndexFunction.scala} | 15 ++--- .../queue/function/CommandLineFunction.scala | 4 ++ 6 files changed, 118 insertions(+), 17 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/extensions/firehose/ImportSingleValueFunction.scala rename scala/src/org/broadinstitute/sting/queue/extensions/{gatk/BamIndexFunction.scala => samtools/SamtoolsIndexFunction.scala} (61%) diff --git a/scala/qscript/kshakir/CleanBamFile.scala b/scala/qscript/kshakir/CleanBamFile.scala index 729cea827..cc64c2977 100644 --- a/scala/qscript/kshakir/CleanBamFile.scala +++ b/scala/qscript/kshakir/CleanBamFile.scala @@ -1,4 +1,6 @@ +import org.broadinstitute.sting.queue.extensions.firehose.ImportSingleValueFunction import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction +import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.extensions.gatk._ @@ -8,6 +10,9 @@ class CleanBamFile extends QScript { @Argument(doc="gatk jar", shortName="gatk") var gatkJar: File = _ + @Argument(doc="samtools binary", shortName="samtools") + var samtoolsBinary: String = _ + @Argument(doc="fix mates jar", shortName="fixMates") var fixMatesJar: File = _ @@ -29,7 +34,7 @@ class CleanBamFile extends QScript { @Argument(doc="read group blacklist", shortName="RGBL", required=false) var readGroupBlackList: String = _ - @Argument(doc="intervals", shortName="L", required=false) + @Argument(doc="intervals", shortName="L") var intervals: File = _ @Argument(doc="Script that can split the interval file by contig, for example Sting/python/splitIntervalsByContig.py.", shortName="RTCSS") @@ -49,6 +54,33 @@ class CleanBamFile extends QScript { @Input(doc="dbsnp file", shortName="D") var dbsnpFile: File = _ + @Argument(doc="firehose import jar", shortName="importJar") + var firehoseImportJar: File = _ + + @Argument(doc="short job queue", shortName="shortQueue", required=false) + var shortJobQueue: String = _ + + @Argument(doc="firehose host", shortName="FHHost") + var firehoseHost: String = _ + + @Argument(doc="firehose port", shortName="FHPort") + var firehosePort: Int = _ + + @Argument(doc="firehose domain", shortName="FHDom") + var firehoseDomain: String = _ + + @Argument(doc="clean bam firehose entity type", shortName="bamFHEType") + var bamFirehoseEntityType: String = _ + + @Argument(doc="clean bam firehose entity id", shortName="bamFHEID") + var bamFirehoseEntityID: String = _ + + @Argument(doc="clean bam firehose annotation type name", shortName="bamFHAnn") + var bamFirehoseAnnotationTypeName: String = _ + + @Argument(doc="clean bam firehose security token", shortName="bamFHToken") + var bamFirehoseSecurityToken: String = _ + trait GATKCommonArgs extends CommandLineGATK { this.jarFile = qscript.gatkJar this.reference_sequence = qscript.referenceFile @@ -96,7 +128,7 @@ class CleanBamFile extends QScript { realigner.DBSNP = dbsnpFile realigner.scatterCount = indelRealignerScatterCount - val bamIndex = new BamIndexFunction + var fixedBam: File = null if (realigner.scatterCount > 1) { realigner.output = baseFile(".cleaned.bam") @@ -105,8 +137,12 @@ class CleanBamFile extends QScript { case (scatter: IntervalScatterFunction, _) => scatter.splitIntervalsScript = indelRealignerScatterScript } + realigner.gatherClass = { + case source if (source.field.getName=="output") => + classOf[BamGatherFunction] + } realigner.setupGatherFunction = { - case (gather: PicardBamJarFunction, _) => + case (gather: BamGatherFunction, _) => gather.memoryLimit = Some(4) gather.jarFile = fixMatesJar // Don't pass this AS=true to fix mates! @@ -115,7 +151,7 @@ class CleanBamFile extends QScript { gather.mergeTextScript = mergeTextScript } - bamIndex.bamFile = realigner.output + fixedBam = realigner.output } else { realigner.output = baseFile(".unfixed.cleaned.bam") @@ -132,12 +168,30 @@ class CleanBamFile extends QScript { fixMates.unfixed = realigner.output fixMates.fixed = baseFile(".cleaned.bam") - bamIndex.bamFile = fixMates.fixed + fixedBam = fixMates.fixed // Add the fix mates explicitly add(fixMates) } - add(targetCreator, realigner, bamIndex) + val bamIndex = new SamtoolsIndexFunction + bamIndex.samtools = samtoolsBinary + bamIndex.bamFile = fixedBam + bamIndex.bamFileIndex = swapExt(fixedBam, "bam", "bam.bai") + + val importer = new ImportSingleValueFunction + importer.jobQueue = shortJobQueue + importer.jarFile = firehoseImportJar + importer.host = firehoseHost + importer.port = firehosePort + importer.domain = firehoseDomain + importer.entityType = bamFirehoseEntityType + importer.entityID = bamFirehoseEntityID + importer.annotationTypeName = bamFirehoseAnnotationTypeName + importer.securityToken = bamFirehoseSecurityToken + importer.importValue = fixedBam + importer.jobDependencies :+= bamIndex.bamFileIndex + + add(targetCreator, realigner, bamIndex, importer) } } diff --git a/scala/qscript/recalibrate.scala b/scala/qscript/recalibrate.scala index 8640494eb..182bed82d 100755 --- a/scala/qscript/recalibrate.scala +++ b/scala/qscript/recalibrate.scala @@ -1,4 +1,5 @@ import org.broadinstitute.sting.queue.extensions.gatk._ +import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction import org.broadinstitute.sting.queue.QScript import org.apache.commons.io.FilenameUtils; @@ -44,7 +45,7 @@ def script = { def bai(bam: File) = new File(bam + ".bai") -class Index(bamIn: File) extends BamIndexFunction { +class Index(bamIn: File) extends SamtoolsIndexFunction { bamFile = bamIn } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/firehose/ImportSingleValueFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/firehose/ImportSingleValueFunction.scala new file mode 100644 index 000000000..07f158224 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/extensions/firehose/ImportSingleValueFunction.scala @@ -0,0 +1,45 @@ +package org.broadinstitute.sting.queue.extensions.firehose + +import org.broadinstitute.sting.queue.function.JarCommandLineFunction +import org.broadinstitute.sting.commandline.{Input, Argument} +import java.io.File + +/** + * Runs the Firehose ImportSingleValue jar file. + */ +class ImportSingleValueFunction extends JarCommandLineFunction { + @Argument(doc="firehose host") + var host: String = _ + + @Argument(doc="firehose port") + var port: Int = _ + + @Argument(doc="firehose domain") + var domain: String = _ + + @Argument(doc="firehose entity type") + var entityType: String = _ + + @Argument(doc="firehose entity id") + var entityID: String = _ + + @Argument(doc="firehose annotation type name", shortName="bamFHAnn", required=false) + var annotationTypeName: String = _ + + @Argument(doc="clean bam firehose security token", shortName="bamFHToken", required=false) + var securityToken: String = _ + + @Input(doc="imports the path to this file", exclusiveOf="importValueInFile") + var importValue: File = _ + + @Input(doc="imports the value contained in the file", exclusiveOf="importValue") + var importValueInFile: File = _ + + override def commandLine = super.commandLine + ("" + + " PORT=%s HOST=%s DOMAIN=%s ENTITY_TYPE=%s" + + " ENTITY_ID=%s ANNOTATION_TYPE_NAME=%s SECURITY_TOKEN=%s" + + "%s%s" + ).format( + port, host, domain, entityType, entityID, annotationTypeName, securityToken, + optional(" VALUE=", importValue), optional(" VALUE_FILE=", importValueInFile)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamJarFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamJarFunction.scala index efd861f9b..c86a389c1 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamJarFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/picard/PicardBamJarFunction.scala @@ -20,7 +20,7 @@ trait PicardBamJarFunction extends JarCommandLineFunction { protected def inputBams: List[File] protected def outputBam: File - override def commandLine = super.commandLine + "%s%s%s".format( + override def commandLine = super.commandLine + "%s%s%s%s%s%s%s%s".format( optional(" COMPRESSION_LEVEL=", compressionLevel), optional(" VALIDATION_STRINGENCY=", validationStringency), optional(" SO=", sortOrder), optional( " MAX_RECORDS_IN_RAM=", maxRecordsInRam), optional(" ASSUME_SORTED=", assumeSorted), " OUTPUT=" + outputBam, repeat(" INPUT=", inputBams), " TMP_DIR=" + jobTempDir) diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamIndexFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala similarity index 61% rename from scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamIndexFunction.scala rename to scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala index 82ef24b2d..6eb5ebfa5 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/BamIndexFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala @@ -1,18 +1,15 @@ -package org.broadinstitute.sting.queue.extensions.gatk +package org.broadinstitute.sting.queue.extensions.samtools import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File import org.broadinstitute.sting.commandline.{Argument, Output, Input} /** - * Indexes a BAM file. - * By default uses samtools index. - * The syntax of the script must be: - * + * Indexes a BAM file using samtools. */ -class BamIndexFunction extends CommandLineFunction { - @Argument(doc="BAM file script") - var bamIndexScript: String = "samtools index" +class SamtoolsIndexFunction extends CommandLineFunction { + @Argument(doc="samtools path") + var samtools: String = "samtools" @Input(doc="BAM file to index") var bamFile: File = _ @@ -29,7 +26,7 @@ class BamIndexFunction extends CommandLineFunction { bamFileIndex = new File(bamFile.getPath + ".bai") } - def commandLine = "%s %s %s".format(bamIndexScript, bamFile, bamFileIndex) + def commandLine = "%s index %s %s".format(samtools, bamFile, bamFileIndex) override def dotString = "Index: %s".format(bamFile.getName) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 4a97c5a49..6f0858715 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -44,6 +44,10 @@ trait CommandLineFunction extends QFunction with Logging { /** If true this function will run only if the jobs it is dependent on succeed. */ var jobRunOnlyIfPreviousSucceed = true + /** Files that this job should wait on before running. */ + @Input(doc="Explicit job dependencies", required=false) + var jobDependencies: List[File] = Nil + /** File to redirect any output. Defaults to .out */ @Output(doc="File to redirect any output", required=false) @Gather(classOf[SimpleTextGatherFunction])