From f4c43f013fbe635f09f1d35d288e8d7c0295a7f5 Mon Sep 17 00:00:00 2001 From: chartl Date: Mon, 6 Dec 2010 18:23:54 +0000 Subject: [PATCH] Due to the overhead for reading VCF files (>32g for 700 5MB VCF files), batched merging has to generate likelihoods in batches. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4796 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/chartl/batchMergePipeline.q | 2 +- .../queue/pipeline/ProjectManagement.scala | 18 +++++++++++------- .../sting/queue/util/CollectionUtils.scala | 11 +++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/scala/qscript/chartl/batchMergePipeline.q b/scala/qscript/chartl/batchMergePipeline.q index b00dfe7c0..f78e19a8c 100755 --- a/scala/qscript/chartl/batchMergePipeline.q +++ b/scala/qscript/chartl/batchMergePipeline.q @@ -22,7 +22,7 @@ class batchMergePipeline extends QScript { var vcfs : List[File] = extractFileEntries(vcfList) var bams : List[File] = extractFileEntries(bamList) var pmLib = new ProjectManagement(stingDir) - addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref)) + addAll(pmLib.MergeBatches(vcfs,bams,batchOut,ref,25)) } def extractFileEntries(in: File): List[File] = { diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala index c255819b2..cd420452e 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/ProjectManagement.scala @@ -28,7 +28,7 @@ class ProjectManagement(stingPath: String) { } } - def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File) : List[CommandLineFunction] = { + def MergeBatches( callVCFs: List[File], allBams: List[File], mergedVCF: File, ref: File, size: Int) : List[CommandLineFunction] = { var cmds : List[CommandLineFunction] = Nil var pfSites : PassFilterAlleles = new PassFilterAlleles(callVCFs,swapExt(mergedVCF,".vcf",".pf.alleles.vcf")) pfSites.sortByRef = pm.stingDirPath+"perl/sortByRef.pl" @@ -36,7 +36,7 @@ class ProjectManagement(stingPath: String) { cmds :+= pfSites - var calcs: List[UGCalcLikelihoods] = allBams.map( a => LikelihoodCalc(a,ref,pfSites.out_intervals) ) + var calcs: List[UGCalcLikelihoods] = batchLikelihoods(allBams,ref,pfSites.out_intervals,size) cmds ++= calcs @@ -46,18 +46,22 @@ class ProjectManagement(stingPath: String) { } - def LikelihoodCalc( bam: File, ref: File, alleleVCF: File ) : UGCalcLikelihoods = { + def batchLikelihoods(bams: List[File], ref: File, alleleVCF: File, size: Int) : List[UGCalcLikelihoods] = { + return CollectionUtils.segmentBySize(bams,size).zipWithIndex.map( u => LikelihoodCalc(u._1,ref,alleleVCF, new File("batch%d.likelihoods.vcf".format(u._2)))) + } + + def LikelihoodCalc( bams: List[File], ref: File, alleleVCF: File, outVCF: File ) : UGCalcLikelihoods = { var calc = new UGCalcLikelihoods - calc.input_file :+= bam + calc.input_file ++= bams calc.reference_sequence = ref calc.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") calc.downsample_to_coverage = Some(300) - calc.memoryLimit = Some(2) + calc.memoryLimit = if ( bams.size < 5 ) Some(2) else if(bams.size<50) Some(4) else Some(6) calc.min_base_quality_score = Some(22) calc.min_mapping_quality_score = Some(20) calc.genotype = true calc.output_all_callable_bases = true - calc.out = swapExt(bam,".bam",".likelihoods.vcf") + calc.out = outVCF calc.rodBind :+= new RodBind("allele","VCF",alleleVCF) calc.BTI = "allele" @@ -70,7 +74,7 @@ class ProjectManagement(stingPath: String) { call.jarFile = new File(pm.stingDirPath+"dist/GenomeAnalysisTK.jar") call.rodBind :+= new RodBind("allele","vcf",intervalVCF) call.BTI = "allele" - call.memoryLimit = Some(4) + call.memoryLimit = Some(8) call.out = output call.rodBind ++= likelihoods.map( a => new RodBind("variant"+a.getName.replace(".vcf",""),"vcf",a) ) diff --git a/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala index 6871d8f4b..9cc026b6c 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/CollectionUtils.scala @@ -88,4 +88,15 @@ object CollectionUtils { } result } + + /** + * Takes a List, returns a list of Lists; which are direct sub-lists of the input of a specific constant size + * (except perhaps the final element, which can be smaller) + * @param value -- The list to be batched + * @param size -- the sublist size + * @return the list batched into smaller lists of size N + */ + def segmentBySize[T](value: List[T], size: Int) : List[List[T]] = { + return if(value.size == 0) Nil else List(value.splitAt(size)._1) ++ segmentBySize(value.drop(size),size) + } }