From ccdc0212071b1c0f1a01e112a5bc32d30dee4dc5 Mon Sep 17 00:00:00 2001 From: carneiro Date: Mon, 28 Mar 2011 20:17:57 +0000 Subject: [PATCH] Added BWA (option) to the data processing pipeline. Lots of testing still happening... little fix to the calling pipeline. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5528 348d0f76-0448-11de-a6fe-93d51630548a --- .../MethodsDevelopmentCallingPipeline.scala | 4 +- .../oneoffs/carneiro/dataProcessingV2.scala | 276 ++++++++++++++---- 2 files changed, 222 insertions(+), 58 deletions(-) diff --git a/scala/qscript/core/MethodsDevelopmentCallingPipeline.scala b/scala/qscript/core/MethodsDevelopmentCallingPipeline.scala index adcf48c31..84b7ba8ea 100755 --- a/scala/qscript/core/MethodsDevelopmentCallingPipeline.scala +++ b/scala/qscript/core/MethodsDevelopmentCallingPipeline.scala @@ -120,7 +120,7 @@ class MethodsDevelopmentCallingPipeline extends QScript { "HiSeq19" -> new Target("NA12878.HiSeq19", hg19, dbSNP_b37_129, hapmap_b37, indelMask_b37, new File("/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.HiSeq.WGS.bwa.cleaned.recal.hg19.bam"), new File("/humgen/gsa-hpprojects/dev/carneiro/hiseq19/analysis/snps/NA12878.HiSeq19.filtered.vcf"), - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.hg19.intervals", 2.3, 0.5, !lowPass), + "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.hg19.intervals", 2.3, 3.0, !lowPass), "GA2hg19" -> new Target("NA12878.GA2.hg19", hg19, dbSNP_b37_129, hapmap_b37, indelMask_b37, new File("/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.GA2.WGS.bwa.cleaned.hg19.bam"), new File("/humgen/gsa-hpprojects/dev/carneiro/hiseq19/analysis/snps/NA12878.GA2.hg19.filtered.vcf"), @@ -278,7 +278,7 @@ class MethodsDevelopmentCallingPipeline extends QScript { this.rodBind :+= RodBind("input", "VCF", if ( goldStandard ) { t.goldStandard_VCF } else { t.rawVCF } ) this.tranches_file = if ( goldStandard ) { t.goldStandardTranchesFile } else { t.tranchesFile} this.recal_file = if ( goldStandard ) { t.goldStandardRecalFile } else { t.recalFile } - this.fdr_filter_level = 2.0 + this.fdr_filter_level = t.trancheTarget this.out = t.recalibratedVCF this.analysisName = t.name + "_AVQSR" this.jobName = queueLogDir + t.name + ".applyVQSR" diff --git a/scala/qscript/oneoffs/carneiro/dataProcessingV2.scala b/scala/qscript/oneoffs/carneiro/dataProcessingV2.scala index 2ee47ee60..783fddcce 100755 --- a/scala/qscript/oneoffs/carneiro/dataProcessingV2.scala +++ b/scala/qscript/oneoffs/carneiro/dataProcessingV2.scala @@ -5,38 +5,60 @@ import org.broadinstitute.sting.queue.extensions.picard.PicardBamFunction import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.function.ListWriterFunction -import net.sf.samtools.{SAMFileReader,SAMFileHeader,SAMReadGroupRecord} +import net.sf.samtools.{SAMFileReader,SAMReadGroupRecord} + +import scala.io.Source._ import collection.JavaConversions._ -import org.broadinstitute.sting.commandline.ArgumentSource class dataProcessingV2 extends QScript { qscript => + /**************************************************************************** + * Required Parameters (if default values are not good for you) + ****************************************************************************/ + + + @Input(doc="input BAM file - or list of BAM files", fullName="input", shortName="i", required=true) + var input: File = _ + @Input(doc="path to GenomeAnalysisTK.jar", fullName="path_to_gatk_jar", shortName="gatk", required=true) var GATKjar: File = _ @Input(doc="path to AnalyzeCovariates.jar", fullName="path_to_ac_jar", shortName="ac", required=true) var ACJar: File = _ - @Input(doc="path to Picard's MarkDuplicates.jar", fullName="path_to_dedup_jar", shortName="dedup", required=true) - var dedupJar: File = _ - - @Input(doc="path to Picard's MergeSamFiles.jar", fullName="path_to_merge_jar", shortName="merge", required=true) - var mergeBamJar: File = _ - @Input(doc="path to R resources folder inside the Sting repository", fullName="path_to_r", shortName="r", required=true) var R: String = _ - @Input(doc="The path to the binary of bwa (usually BAM files have already been mapped - but if you want to remap this is the option)", fullName="path_to_bwa", shortName="bwa", required=false) - var bwaPath: File = _ + @Input(doc="path to Picard's MarkDuplicates.jar", fullName="path_to_dedup_jar", shortName="dedup", required=false) + var dedupJar: File = new File("/seq/software/picard/current/bin/MarkDuplicates.jar") - @Input(doc="input BAM file - or list of BAM files", fullName="input", shortName="i", required=true) - var input: File = _ + @Input(doc="path to Picard's MergeSamFiles.jar", fullName="path_to_merge_jar", shortName="merge", required=false) + var mergeBamJar: File = new File("/seq/software/picard/current/bin/MergeSamFiles.jar") + + @Input(doc="path to Picard's ValidateSam.jar", fullName="path_to_validate_jar", shortName="validate", required=false) + var validateSamJar: File = new File("/seq/software/picard/current/bin/ValidateSamFile.jar") @Input(doc="Reference fasta file", fullName="reference", shortName="R", required=false) var reference: File = new File("/seq/references/Homo_sapiens_assembly19/v1/Homo_sapiens_assembly19.fasta") + + + /**************************************************************************** + * Optional Parameters + ****************************************************************************/ + + + @Input(doc="path to Picard's RevertSam.jar", fullName="path_to_revert_jar", shortName="revert", required=false) + var revertSamJar: File = _ + + @Input(doc="path to Picard's SortSam.jar", fullName="path_to_sort_jar", shortName="sort", required=false) + var sortSamJar: File = _ + + @Input(doc="The path to the binary of bwa (usually BAM files have already been mapped - but if you want to remap this is the option)", fullName="path_to_bwa", shortName="bwa", required=false) + var bwaPath: File = _ + @Input(doc="dbsnp ROD to use (VCF)", fullName="dbsnp", shortName="D", required=false) var dbSNP: File = new File("/humgen/gsa-hpprojects/GATK/data/dbsnp_132_b37.leftAligned.vcf") @@ -46,12 +68,6 @@ class dataProcessingV2 extends QScript { @Input(doc="the project name determines the final output (BAM file) base name. Example NA12878 yields NA12878.processed.bam", fullName="project", shortName="p", required=false) var projectName: String = "project" - @Input(doc="Perform cleaning on knowns only", fullName="knowns_only", shortName="knowns", required=false) - var knownsOnly: Boolean = false - - @Input(doc="Perform cleaning using Smith Waterman", fullName="use_smith_waterman", shortName="sw", required=false) - var useSW: Boolean = false - @Input(doc="Output path for the processed BAM files.", fullName="output_directory", shortName="outputDir", required=false) var outputDir: String = "" @@ -61,23 +77,35 @@ class dataProcessingV2 extends QScript { @Input(doc="an intervals file to be used by GATK - output bams at intervals only", fullName="gatk_interval_file", shortName="intervals", required=false) var intervals: File = _ - // Gracefully hide Queue's output - val queueLogDir: String = ".qlog/" + @Input(doc="Perform cleaning on knowns only", fullName="knowns_only", shortName="knowns", required=false) + var knownsOnly: Boolean = false - // Use the number of contigs for scatter gathering jobs - var nContigs: Int = -1 + @Input(doc="Perform cleaning using Smith Waterman", fullName="use_smith_waterman", shortName="sw", required=false) + var useSW: Boolean = false + + @Input(doc="Decompose input BAM file and fully realign it using BWA and assume Single Ended reads", fullName="use_bwa_single_ended", shortName="bwase", required=false) + var useBWAse: Boolean = false + + @Input(doc="Decompose input BAM file and fully realign it using BWA and assume Pair Ended reads", fullName="use_bwa_pair_ended", shortName="bwape", required=false) + var useBWApe: Boolean = false - // Updates and checks that all input files have the same number of contigs - // we use the number of contigs for scatter gather. - def updateNumberOfContigs(n: Int): Boolean = { - if (nContigs < 0) { - nContigs = n - return true - } - return nContigs == n - } + /**************************************************************************** + * Global Variables + ****************************************************************************/ + + val queueLogDir: String = ".qlog/" // Gracefully hide Queue's output + var nContigs: Int = 0 // Use the number of contigs for scatter gathering jobs + + + + /**************************************************************************** + * Helper functions + ****************************************************************************/ + + + // Utility function to check if there are multiple samples in a BAM file (currently we can't deal with that) def hasMultipleSamples(readGroups: java.util.List[SAMReadGroupRecord]): Boolean = { var sample: String = "" for (r <- readGroups) { @@ -89,18 +117,15 @@ class dataProcessingV2 extends QScript { return false } - - def createSampleFiles(): Map[String, File] = { - val outName: String = qscript.outputDir + qscript.projectName + // Utility function to merge all bam files of similar samples. Generates on BAM file per sample. + // It uses the sample information on the header of the input BAM files. + def createSampleFiles(bamFiles: List[File]): Map[String, File] = { // Creating a table with SAMPLE information from each input BAM file val sampleTable = scala.collection.mutable.Map.empty[String, List[File]] - for (bam <- scala.io.Source.fromFile(input).getLines) { - val bamFile = new File(bam) - val samReader = new SAMFileReader(bamFile) + for (bam <- bamFiles) { + val samReader = new SAMFileReader(bam) val header = samReader.getFileHeader() - // keep a record of the number of contigs in this bam file (they should all be the same - assert(updateNumberOfContigs(header.getSequenceDictionary.getSequences.size()), "Input BAMS should all have the same number of contigs. " + bam + " has " + header.getSequenceDictionary.getSequences.size()) val readGroups = header.getReadGroups() // only allow one sample per file. Bam files with multiple samples would require pre-processing of the file @@ -111,30 +136,65 @@ class dataProcessingV2 extends QScript { for (rg <- readGroups) { val sample = rg.getSample() if (!sampleTable.contains(sample)) - sampleTable(sample) = List(bamFile) - else if ( !sampleTable(sample).contains(bamFile)) - sampleTable(sample) :+= bamFile + sampleTable(sample) = List(bam) + else if ( !sampleTable(sample).contains(bam)) + sampleTable(sample) :+= bam } } // Creating one file for each sample in the dataset val sampleBamFiles = scala.collection.mutable.Map.empty[String, File] for ((sample, flist) <- sampleTable) { - val sampleFileName = new File(outName + "." + sample + ".bam") + val sampleFileName = new File(qscript.outputDir + qscript.projectName + "." + sample + ".bam") sampleBamFiles(sample) = sampleFileName add(joinBams(flist, sampleFileName)) } return sampleBamFiles.toMap } + // Checks how many contigs are in the dataset. Uses the BAM file header information. + def getNumberOfContigs(): Int = { + val bam = fromFile(input).getLines.next + val samReader = new SAMFileReader(new File(bam)) + return samReader.getFileHeader.getSequenceDictionary.getSequences.size() + } + + + // Takes a list of processed BAM files, revert them to unprocessed and realigns each lane, producing a list of + // per-lane aligned bam files, ready to be processed. + def performAlignment(bamFiles: List[File]): List[File] = { + return List() + } + + def createListFromFile(in: File):List[File] = { + if (in.toString.endsWith("bam")) + return List(in) + val l: List[File] = List() + for (bam <- fromFile(in).getLines) + l :+= bam + return l + } + + + + /**************************************************************************** + * Main script + ****************************************************************************/ + def script = { //todo -- (option - BWA) run BWA on each bam file (per lane bam file) before performing per sample processing + val perLaneAlignedBamFiles: List[File] = if (useBWApe || useBWAse) {performAlignment(input)} else {createListFromFile(input)} + // Generate a BAM file per sample joining all per lane files if necessary + val sampleBamFiles = createSampleFiles(perLaneAlignedBamFiles) + // keep a record of the number of contigs in this bam file (they should all be the same + nContigs = getNumberOfContigs() + + // Final output list of processed bam files var cohortList: List[File] = List() - val sampleBamFiles = createSampleFiles() // Simple progress report println("\nFound the following samples: ") @@ -161,6 +221,10 @@ class dataProcessingV2 extends QScript { val postRecalFile = swapExt(bam, ".bam", ".post_recal.csv") val preOutPath = swapExt(bam, ".bam", ".pre") val postOutPath = swapExt(bam, ".bam", ".post") + val preValidateLog = swapExt(bam, ".bam", ".pre.validation") + val postValidateLog = swapExt(bam, ".bam", ".post.validation") + + add(validate(bam, preValidateLog)) if (!knownsOnly) add(target(bam, targetIntervals)) @@ -171,7 +235,8 @@ class dataProcessingV2 extends QScript { recal(dedupedBam, preRecalFile, recalBam), cov(recalBam, postRecalFile), analyzeCovariates(preRecalFile, preOutPath), - analyzeCovariates(postRecalFile, postOutPath)) + analyzeCovariates(postRecalFile, postOutPath), + validate(recalBam, postValidateLog)) cohortList :+= recalBam } @@ -181,11 +246,18 @@ class dataProcessingV2 extends QScript { add(writeList(cohortList, cohortFile)) } + + + /**************************************************************************** + * Classes (Walkers and non-GATK programs) + ****************************************************************************/ + + // General arguments to GATK walkers trait CommandLineGATKArgs extends CommandLineGATK { this.jarFile = qscript.GATKjar this.reference_sequence = qscript.reference - this.memoryLimit = Some(4) + this.memoryLimit = 4 this.isIntermediate = true } @@ -193,7 +265,7 @@ class dataProcessingV2 extends QScript { if (!knownsOnly) this.input_file :+= inBams this.out = outIntervals - this.mismatchFraction = Some(0.0) + this.mismatchFraction = 0.0 this.rodBind :+= RodBind("dbsnp", "VCF", dbSNP) this.rodBind :+= RodBind("indels", "VCF", indels) this.scatterCount = nContigs @@ -209,8 +281,8 @@ class dataProcessingV2 extends QScript { this.rodBind :+= RodBind("indels", "VCF", qscript.indels) this.useOnlyKnownIndels = knownsOnly this.doNotUseSW = useSW - this.compress = Some(0) - this.U = Some(org.broadinstitute.sting.gatk.arguments.ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION) // todo -- update this with the last consensus between Tim, Matt and Eric. This is ugly! + this.compress = 0 + this.U = org.broadinstitute.sting.gatk.arguments.ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION // todo -- update this with the last consensus between Tim, Matt and Eric. This is ugly! this.scatterCount = nContigs this.analysisName = queueLogDir + outBam + ".clean" this.jobName = queueLogDir + outBam + ".clean" @@ -222,26 +294,31 @@ class dataProcessingV2 extends QScript { this.covariate ++= List("ReadGroupCovariate", "QualityScoreCovariate", "CycleCovariate", "DinucCovariate") this.input_file :+= inBam this.recal_file = outRecalFile + this.jobQueue = "gsa" // should take this out once scatter gather is available. this.analysisName = queueLogDir + outRecalFile + ".covariates" this.jobName = queueLogDir + outRecalFile + ".covariates" } + //todo -- add scatter gather capability (waiting for khalid's modifications to the queue base case class recal (inBam: File, inRecalFile: File, outBam: File) extends TableRecalibration with CommandLineGATKArgs { @Output(doc="recalibrated bam index") var recalIndex = new File(outBam + ".bai") this.input_file :+= inBam this.recal_file = inRecalFile - this.baq = Some(org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.CALCULATE_AS_NECESSARY) + this.baq = org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.CALCULATE_AS_NECESSARY this.out = outBam if (!qscript.intervalString.isEmpty()) this.intervalsString ++= List(qscript.intervalString) else if (qscript.intervals != null) this.intervals :+= qscript.intervals - this.U = Some(org.broadinstitute.sting.gatk.arguments.ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION) // todo -- update this with the last consensus between Tim, Matt and Eric. This is ugly! - this.index_output_bam_on_the_fly = Some(true) + this.U = org.broadinstitute.sting.gatk.arguments.ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION // todo -- update this with the last consensus between Tim, Matt and Eric. This is ugly! + this.index_output_bam_on_the_fly = true this.isIntermediate = false + this.jobQueue = "gsa" // should take this out once scatter gather is available. this.analysisName = queueLogDir + outBam + ".recalibration" this.jobName = queueLogDir + outBam + ".recalibration" } + + // Outside tools (not GATK walkers) case class analyzeCovariates (inRecalFile: File, outPath: File) extends AnalyzeCovariates { @@ -256,13 +333,13 @@ class dataProcessingV2 extends QScript { case class dedup (inBam: File, outBam: File, metricsFile: File) extends PicardBamFunction { @Input(doc="fixed bam") var clean = inBam @Output(doc="deduped bam") var deduped = outBam - @Output(doc="deduped bam index") var dedupedIndex = new File(outBam + ".bai") @Output(doc="metrics file") var metrics = metricsFile override def inputBams = List(clean) override def outputBam = deduped - override def commandLine = super.commandLine + " M=" + metricsFile + " CREATE_INDEX=true" - sortOrder = null - this.memoryLimit = Some(6) + override def commandLine = super.commandLine + " M=" + metricsFile + this.sortOrder = null + this.createIndex = true + this.memoryLimit = 6 this.isIntermediate = true this.jarFile = qscript.dedupJar this.analysisName = queueLogDir + outBam + ".dedup" @@ -282,6 +359,93 @@ class dataProcessingV2 extends QScript { this.jobName = queueLogDir + outBam + ".joinBams" } + case class sortSam (inSam: File, outBam: File) extends PicardBamFunction { + @Input(doc="input unsorted sam file") var sam = inBams + @Output(doc="sorted bam") var bam = outBam + @Output(doc="sorted bam index") var bamIndex = new File(outBam + "bai") + override def inputBams = List(sam) + override def outputBam = bam + override def commandLine = super.commandLine + " CREATE_INDEX=true" + this.jarFile = qscript.sortSamJar + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".sortSam" + this.jobName = queueLogDir + outBam + ".sortSam" + } + + case class validate (inBam: File, outLog: File) extends PicardBamFunction { + @Input(doc="input bam list") var toValidate = inBam + @Output(doc="validation log") var validate = outLog + override def inputBams = List(inBam) + override def outputBam = outLog + override def commandLine = super.commandLine + " VALIDATE_INDEX=true MODE=SUMMARY REFERENCE_SEQUENCE=" + qscript.reference + sortOrder = null + this.jarFile = qscript.validateSamJar + this.isIntermediate = false + this.analysisName = queueLogDir + outLog + ".validate" + this.jobName = queueLogDir + outLog + ".validate" + } + + case class revert (inBam: File, outBam: File) extends PicardBamFunction { + @Input(doc="old annotated bam") var oldBam = inBam + @Output(doc="reverted bam") var revertedBam = outBam + @Output(doc="reverted bam index") var revertedBamIndex = new File(outBam + ".bai") + override def inputBams = List(oldBam) + override def outputBam = revertedBam + override def commandLine = super.commandLine + " CREATE_INDEX=true" + this.isIntermediate = true + this.jarFile = qscript.dedupJar + this.analysisName = queueLogDir + outBam + ".dedup" + this.jobName = queueLogDir + outBam + ".dedup" + } + + case class bwa_aln_se (inBam: File, outSai: File) extends CommandLineFunction { + @Input(doc="bam file to be aligned") var bam = inBam + @Output(doc="output sai file") var sai = outSai + def commandLine = bwaPath + " aln -q 5 " + reference + " -b " + bam + " > " + sai + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".bwa_aln_se" + this.jobName = queueLogDir + outBam + ".bwa_aln_se" + } + + case class bwa_aln_pe1 (inBam: File, outSai1: File) extends CommandLineFunction { + @Input(doc="bam file to be aligned") var bam = inBam + @Output(doc="output sai file for 1st mating pair") var sai = outSai1 + def commandLine = bwaPath + " aln -q 5 " + reference + " -b1 " + bam + " > " + sai + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".bwa_aln_pe1" + this.jobName = queueLogDir + outBam + ".bwa_aln_pe1" + } + + case class bwa_aln_pe2 (inBam: File, outSai2: File) extends CommandLineFunction { + @Input(doc="bam file to be aligned") var bam = inBam + @Output(doc="output sai file for 2nd mating pair") var sai = outSai2 + def commandLine = bwaPath + " aln -q 5 " + reference + " -b2 " + bam + " > " + sai + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".bwa_aln_pe2" + this.jobName = queueLogDir + outBam + ".bwa_aln_pe2" + } + + case class bwa_sam_se (inBam: File, inSai: File, outBam: File, readGroup: String) extends CommandLineFunction { + @Input(doc="bam file to be aligned") var bam = inBam + @Input(doc="bwa alignment index file") var sai = inSai + @Output(doc="output aligned bam file") var alignedBam = outBam + def commandLine = bwaPath + " samse " + reference + " " + sai + " " + bam + " -r " + readGroup + " > " + alignedBam + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".bwa_sam_se" + this.jobName = queueLogDir + outBam + ".bwa_sam_se" + } + + case class bwa_sam_pe (inBam: File, inSai1: File, inSai2:File, outBam: File, readGroup: String) extends CommandLineFunction { + @Input(doc="bam file to be aligned") var bam = inBam + @Input(doc="bwa alignment index file for 1st mating pair") var sai1 = inSai1 + @Input(doc="bwa alignment index file for 2nd mating pair") var sai2 = inSai2 + @Output(doc="output aligned bam file") var alignedBam = outBam + def commandLine = bwaPath + " samse " + reference + " " + sai1 + " " + sai2 + " " + bam + " -r " + readGroup + " > " + alignedBam + this.isIntermediate = true + this.analysisName = queueLogDir + outBam + ".bwa_sam_pe" + this.jobName = queueLogDir + outBam + ".bwa_sam_pe" + } + case class writeList(inBams: List[File], outBamList: File) extends ListWriterFunction { this.inputFiles = inBams this.listFile = outBamList