Added a samtools merge CLF.

Using samtools to merge the low pass bams before cleaning to avoid "Too many open files." with 1500+ bams.
Other minor cleanup as pointed out by the IntelliJ scala plugin.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5942 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-06-03 22:20:38 +00:00
parent a8faacda4e
commit ac3f1be7f0
6 changed files with 148 additions and 21 deletions

View File

@ -22,6 +22,8 @@
* OTHER DEALINGS IN THE SOFTWARE. * OTHER DEALINGS IN THE SOFTWARE.
*/ */
import io.Source
import org.broadinstitute.sting.queue.extensions.samtools.{SamtoolsIndexFunction, SamtoolsMergeFunction}
import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.QScript
import org.broadinstitute.sting.queue.extensions.gatk._ import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.utils.interval.IntervalUtils import org.broadinstitute.sting.utils.interval.IntervalUtils
@ -93,6 +95,24 @@ class WholeGenomePipeline extends QScript {
val project = Array(".bams.list", ".bam.list", ".list").foldLeft(bamList.getName)(_.stripSuffix(_)) val project = Array(".bams.list", ".bam.list", ".list").foldLeft(bamList.getName)(_.stripSuffix(_))
val projectBase = project + "." + runType val projectBase = project + "." + runType
val mergeBam = new SamtoolsMergeFunction
mergeBam.inputBams = Source.fromFile(bamList).getLines().toList
if (runType != "wg")
mergeBam.region = intervals.head.toString
mergeBam.memoryLimit = pipelineMemoryLimit
mergeBam.outputBam = cleanerTmpDir + "/" + projectBase + ".unclean.bam"
mergeBam.jobOutputFile = projectBase + ".unclean.bam.out"
mergeBam.isIntermediate = true
mergeBam.memoryLimit = pipelineMemoryLimit
add(mergeBam)
val indexBam = new SamtoolsIndexFunction
indexBam.bamFile = mergeBam.outputBam
indexBam.memoryLimit = pipelineMemoryLimit
indexBam.jobOutputFile = projectBase + ".unclean.bam.bai.out"
indexBam.isIntermediate = true
add(indexBam)
var chunkVcfs = List.empty[File] var chunkVcfs = List.empty[File]
for (interval <- intervals) { for (interval <- intervals) {
val chr = interval.chr val chr = interval.chr
@ -111,7 +131,7 @@ class WholeGenomePipeline extends QScript {
val chunkInterval = List("%s:%d-%d".format(chr, start, stop)) val chunkInterval = List("%s:%d-%d".format(chr, start, stop))
val target = new RealignerTargetCreator with CommandLineGATKArgs val target = new RealignerTargetCreator with CommandLineGATKArgs
target.input_file :+= bamList target.input_file :+= mergeBam.outputBam
target.intervalsString = chunkInterval target.intervalsString = chunkInterval
target.excludeIntervals = excludeIntervals target.excludeIntervals = excludeIntervals
target.mismatchFraction = 0.0 target.mismatchFraction = 0.0
@ -123,7 +143,7 @@ class WholeGenomePipeline extends QScript {
add(target) add(target)
val clean = new IndelRealigner with CommandLineGATKArgs val clean = new IndelRealigner with CommandLineGATKArgs
clean.input_file :+= bamList clean.input_file :+= mergeBam.outputBam
clean.intervalsString = chunkInterval clean.intervalsString = chunkInterval
clean.excludeIntervals = excludeIntervals clean.excludeIntervals = excludeIntervals
clean.targetIntervals = target.out clean.targetIntervals = target.out

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.extensions.samtools
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.commandline.Argument
/**
* samtools command line function
*/
abstract class SamtoolsCommandLineFunction extends CommandLineFunction {
@Argument(doc="samtools path")
var samtools: String = "samtools"
}

View File

@ -1,15 +1,37 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.extensions.samtools package org.broadinstitute.sting.queue.extensions.samtools
import org.broadinstitute.sting.queue.function.CommandLineFunction
import java.io.File import java.io.File
import org.broadinstitute.sting.commandline.{Argument, Output, Input} import org.broadinstitute.sting.commandline.{Output, Input}
/** /**
* Indexes a BAM file using samtools. * Indexes a BAM file using samtools.
*/ */
class SamtoolsIndexFunction extends CommandLineFunction { class SamtoolsIndexFunction extends SamtoolsCommandLineFunction {
@Argument(doc="samtools path") analysisName = "samtools index"
var samtools: String = "samtools"
@Input(doc="BAM file to index") @Input(doc="BAM file to index")
var bamFile: File = _ var bamFile: File = _
@ -20,8 +42,8 @@ class SamtoolsIndexFunction extends CommandLineFunction {
/** /**
* Sets the bam file index to the bam file name + ".bai". * Sets the bam file index to the bam file name + ".bai".
*/ */
override def freezeFieldValues = { override def freezeFieldValues() {
super.freezeFieldValues super.freezeFieldValues()
if (bamFileIndex == null && bamFile != null) if (bamFileIndex == null && bamFile != null)
bamFileIndex = new File(bamFile.getPath + ".bai") bamFileIndex = new File(bamFile.getPath + ".bai")
} }

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.extensions.samtools
import java.io.File
import org.broadinstitute.sting.commandline.{Argument, Output, Input}
/**
* Merges BAM files using samtools.
*/
class SamtoolsMergeFunction extends SamtoolsCommandLineFunction {
analysisName = "samtools merge"
@Input(doc="BAM file input")
var inputBams: List[File] = Nil
@Output(doc="BAM file output")
var outputBam: File = _
@Argument(doc="region", required=false)
var region: String = _
def commandLine = "%s merge%s %s%s".format(
samtools, optional(" -R ", region),
outputBam, repeat(" ", inputBams))
}

View File

@ -46,7 +46,7 @@ trait CommandLineFunction extends QFunction with Logging {
/** /**
* Sets all field values. * Sets all field values.
*/ */
override def freezeFieldValues = { override def freezeFieldValues() {
if (jobQueue == null) if (jobQueue == null)
jobQueue = qSettings.jobQueue jobQueue = qSettings.jobQueue

View File

@ -225,7 +225,7 @@ trait QFunction extends Logging {
/** /**
* Deletes the output files and all the status files for this function. * Deletes the output files and all the status files for this function.
*/ */
def deleteOutputs() = { def deleteOutputs() {
commandOutputs.foreach(file => IOUtils.tryDelete(file)) commandOutputs.foreach(file => IOUtils.tryDelete(file))
doneOutputs.foreach(file => IOUtils.tryDelete(file)) doneOutputs.foreach(file => IOUtils.tryDelete(file))
failOutputs.foreach(file => IOUtils.tryDelete(file)) failOutputs.foreach(file => IOUtils.tryDelete(file))
@ -234,7 +234,7 @@ trait QFunction extends Logging {
/** /**
* Creates the output directories for this function if it doesn't exist. * Creates the output directories for this function if it doesn't exist.
*/ */
def mkOutputDirectories() = { def mkOutputDirectories() {
outputDirectories.foreach(dir => { outputDirectories.foreach(dir => {
if (!dir.exists && !dir.mkdirs) if (!dir.exists && !dir.mkdirs)
throw new QException("Unable to create directory: " + dir) throw new QException("Unable to create directory: " + dir)
@ -322,15 +322,15 @@ trait QFunction extends Logging {
* The function is allow to make necessary updates internally to make sure * The function is allow to make necessary updates internally to make sure
* the inputs and outputs will be equal to other inputs and outputs. * the inputs and outputs will be equal to other inputs and outputs.
*/ */
final def freeze = { final def freeze() {
freezeFieldValues freezeFieldValues()
canonFieldValues canonFieldValues()
} }
/** /**
* Sets all field values. * Sets all field values.
*/ */
def freezeFieldValues = { def freezeFieldValues() {
if (jobNamePrefix == null) if (jobNamePrefix == null)
jobNamePrefix = qSettings.jobNamePrefix jobNamePrefix = qSettings.jobNamePrefix
@ -355,14 +355,15 @@ trait QFunction extends Logging {
/** /**
* If the command directory is relative, insert the run directory ahead of it. * If the command directory is relative, insert the run directory ahead of it.
*/ */
def absoluteCommandDirectory() = def absoluteCommandDirectory() {
commandDirectory = IOUtils.absolute(qSettings.runDirectory, commandDirectory) commandDirectory = IOUtils.absolute(qSettings.runDirectory, commandDirectory)
}
/** /**
* Makes all field values canonical so that the graph can match the * Makes all field values canonical so that the graph can match the
* inputs of one function to the output of another using equals(). * inputs of one function to the output of another using equals().
*/ */
def canonFieldValues = { def canonFieldValues() {
for (field <- this.functionFields) { for (field <- this.functionFields) {
var fieldValue = this.getFieldValue(field) var fieldValue = this.getFieldValue(field)
fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef] fieldValue = CollectionUtils.updated(fieldValue, canon).asInstanceOf[AnyRef]
@ -413,7 +414,7 @@ trait QFunction extends Logging {
* @return the isRequired value from the field annotation. * @return the isRequired value from the field annotation.
*/ */
private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) = private def isRequired(field: ArgumentSource, annotation: Class[_ <: Annotation]) =
ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].required()
/** /**
* Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field * Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field
@ -422,7 +423,7 @@ trait QFunction extends Logging {
* @return the Array[ArgumentSource] that may be set instead of the field. * @return the Array[ArgumentSource] that may be set instead of the field.
*/ */
private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) = private def exclusiveOf(field: ArgumentSource, annotation: Class[_ <: Annotation]) =
ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].exclusiveOf()
.split(",").map(_.trim).filter(_.length > 0) .split(",").map(_.trim).filter(_.length > 0)
.map(fieldName => functionFields.find(fieldName == _.field.getName) match { .map(fieldName => functionFields.find(fieldName == _.field.getName) match {
case Some(x) => x case Some(x) => x
@ -436,7 +437,7 @@ trait QFunction extends Logging {
* @return the doc value from the field annotation. * @return the doc value from the field annotation.
*/ */
private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) = private def doc(field: ArgumentSource, annotation: Class[_ <: Annotation]) =
ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc ReflectionUtils.getAnnotation(field.field, annotation).asInstanceOf[ArgumentAnnotation].doc()
/** /**
* Returns true if the field has a value. * Returns true if the field has a value.