From ac3f1be7f014c00117aebe15cb9feeae3c911846 Mon Sep 17 00:00:00 2001 From: kshakir Date: Fri, 3 Jun 2011 22:20:38 +0000 Subject: [PATCH] Added a samtools merge CLF. Using samtools to merge the low pass bams before cleaning to avoid "Too many open files." with 1500+ bams. Other minor cleanup as pointed out by the IntelliJ scala plugin. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5942 348d0f76-0448-11de-a6fe-93d51630548a --- .../playground/WholeGenomePipeline.scala | 24 +++++++++- .../SamtoolsCommandLineFunction.scala | 36 ++++++++++++++ .../samtools/SamtoolsIndexFunction.scala | 36 +++++++++++--- .../samtools/SamtoolsMergeFunction.scala | 48 +++++++++++++++++++ .../queue/function/CommandLineFunction.scala | 2 +- .../sting/queue/function/QFunction.scala | 23 ++++----- 6 files changed, 148 insertions(+), 21 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsCommandLineFunction.scala create mode 100644 scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsMergeFunction.scala diff --git a/scala/qscript/playground/WholeGenomePipeline.scala b/scala/qscript/playground/WholeGenomePipeline.scala index 0968f4e9b..344124f71 100644 --- a/scala/qscript/playground/WholeGenomePipeline.scala +++ b/scala/qscript/playground/WholeGenomePipeline.scala @@ -22,6 +22,8 @@ * OTHER DEALINGS IN THE SOFTWARE. */ +import io.Source +import org.broadinstitute.sting.queue.extensions.samtools.{SamtoolsIndexFunction, SamtoolsMergeFunction} import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.extensions.gatk._ import org.broadinstitute.sting.utils.interval.IntervalUtils @@ -93,6 +95,24 @@ class WholeGenomePipeline extends QScript { val project = Array(".bams.list", ".bam.list", ".list").foldLeft(bamList.getName)(_.stripSuffix(_)) val projectBase = project + "." + runType + val mergeBam = new SamtoolsMergeFunction + mergeBam.inputBams = Source.fromFile(bamList).getLines().toList + if (runType != "wg") + mergeBam.region = intervals.head.toString + mergeBam.memoryLimit = pipelineMemoryLimit + mergeBam.outputBam = cleanerTmpDir + "/" + projectBase + ".unclean.bam" + mergeBam.jobOutputFile = projectBase + ".unclean.bam.out" + mergeBam.isIntermediate = true + mergeBam.memoryLimit = pipelineMemoryLimit + add(mergeBam) + + val indexBam = new SamtoolsIndexFunction + indexBam.bamFile = mergeBam.outputBam + indexBam.memoryLimit = pipelineMemoryLimit + indexBam.jobOutputFile = projectBase + ".unclean.bam.bai.out" + indexBam.isIntermediate = true + add(indexBam) + var chunkVcfs = List.empty[File] for (interval <- intervals) { val chr = interval.chr @@ -111,7 +131,7 @@ class WholeGenomePipeline extends QScript { val chunkInterval = List("%s:%d-%d".format(chr, start, stop)) val target = new RealignerTargetCreator with CommandLineGATKArgs - target.input_file :+= bamList + target.input_file :+= mergeBam.outputBam target.intervalsString = chunkInterval target.excludeIntervals = excludeIntervals target.mismatchFraction = 0.0 @@ -123,7 +143,7 @@ class WholeGenomePipeline extends QScript { add(target) val clean = new IndelRealigner with CommandLineGATKArgs - clean.input_file :+= bamList + clean.input_file :+= mergeBam.outputBam clean.intervalsString = chunkInterval clean.excludeIntervals = excludeIntervals clean.targetIntervals = target.out diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsCommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsCommandLineFunction.scala new file mode 100644 index 000000000..909215509 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsCommandLineFunction.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.extensions.samtools + +import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.commandline.Argument + +/** + * samtools command line function + */ +abstract class SamtoolsCommandLineFunction extends CommandLineFunction { + @Argument(doc="samtools path") + var samtools: String = "samtools" +} diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala index 6eb5ebfa5..801a152ec 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsIndexFunction.scala @@ -1,15 +1,37 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.extensions.samtools -import org.broadinstitute.sting.queue.function.CommandLineFunction import java.io.File -import org.broadinstitute.sting.commandline.{Argument, Output, Input} +import org.broadinstitute.sting.commandline.{Output, Input} /** * Indexes a BAM file using samtools. */ -class SamtoolsIndexFunction extends CommandLineFunction { - @Argument(doc="samtools path") - var samtools: String = "samtools" +class SamtoolsIndexFunction extends SamtoolsCommandLineFunction { + analysisName = "samtools index" @Input(doc="BAM file to index") var bamFile: File = _ @@ -20,8 +42,8 @@ class SamtoolsIndexFunction extends CommandLineFunction { /** * Sets the bam file index to the bam file name + ".bai". */ - override def freezeFieldValues = { - super.freezeFieldValues + override def freezeFieldValues() { + super.freezeFieldValues() if (bamFileIndex == null && bamFile != null) bamFileIndex = new File(bamFile.getPath + ".bai") } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsMergeFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsMergeFunction.scala new file mode 100644 index 000000000..d88484660 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/extensions/samtools/SamtoolsMergeFunction.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.extensions.samtools + +import java.io.File +import org.broadinstitute.sting.commandline.{Argument, Output, Input} + +/** + * Merges BAM files using samtools. + */ +class SamtoolsMergeFunction extends SamtoolsCommandLineFunction { + analysisName = "samtools merge" + + @Input(doc="BAM file input") + var inputBams: List[File] = Nil + + @Output(doc="BAM file output") + var outputBam: File = _ + + @Argument(doc="region", required=false) + var region: String = _ + + def commandLine = "%s merge%s %s%s".format( + samtools, optional(" -R ", region), + outputBam, repeat(" ", inputBams)) +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 2457f3fea..5ea4f4818 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -46,7 +46,7 @@ trait CommandLineFunction extends QFunction with Logging { /** * Sets all field values. */ - override def freezeFieldValues = { + override def freezeFieldValues() { if (jobQueue == null) jobQueue = qSettings.jobQueue diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 3cdd7a913..7048b6413 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -225,7 +225,7 @@ trait QFunction extends Logging { /** * Deletes the output files and all the status files for this function. */ - def deleteOutputs() = { + def deleteOutputs() { commandOutputs.foreach(file => IOUtils.tryDelete(file)) doneOutputs.foreach(file => IOUtils.tryDelete(file)) failOutputs.foreach(file => IOUtils.tryDelete(file)) @@ -234,7 +234,7 @@ trait QFunction extends Logging { /** * Creates the output directories for this function if it doesn't exist. */ - def mkOutputDirectories() = { + def mkOutputDirectories() { outputDirectories.foreach(dir => { if (!dir.exists && !dir.mkdirs) throw new QException("Unable to create directory: " + dir) @@ -322,15 +322,15 @@ trait QFunction extends Logging { * The function is allow to make necessary updates internally to make sure * the inputs and outputs will be equal to other inputs and outputs. */ - final def freeze = { - freezeFieldValues - canonFieldValues + final def freeze() { + freezeFieldValues() + canonFieldValues() } /** * Sets all field values. */ - def freezeFieldValues = { + def freezeFieldValues() { if (jobNamePrefix == null) jobNamePrefix = qSettings.jobNamePrefix @@ -355,14 +355,15 @@ trait QFunction extends Logging { /** * If the command directory is relative, insert the run directory ahead of it. */ - def absoluteCommandDirectory() = + def absoluteCommandDirectory() { commandDirectory = IOUtils.absolute(qSettings.runDirectory, commandDirectory) + } /** * Makes all field values canonical so that the graph can match the * inputs of one function to the output of another using equals(). */ - def canonFieldValues = { + def canonFieldValues() { for (field <- this.functionFields) { var fieldValue = this.getFieldValue(field) fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] @@ -413,7 +414,7 @@ trait QFunction extends Logging { * @return the isRequired value from the field annotation. */ private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required() /** * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field @@ -422,7 +423,7 @@ trait QFunction extends Logging { * @return the Array[ArgumentSource] that may be set instead of the field. */ private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf() .split(",").map(_.trim).filter(_.length > 0) .map(fieldName => functionFields.find(fieldName == _.field.getName) match { case Some(x) => x @@ -436,7 +437,7 @@ trait QFunction extends Logging { * @return the doc value from the field annotation. */ private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = - ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc + ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc() /** * Returns true if the field has a value.