From c355afc3207a40f4ae69052d4e8aa79350a700af Mon Sep 17 00:00:00 2001 From: chartl Date: Fri, 24 Sep 2010 00:59:09 +0000 Subject: [PATCH] Queue now does job tracking (replace -run with -status in the command line). Produces output that looks like: INFO 20:58:17,827 QCommandLine - Checking pipeline status INFO 20:58:23,234 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_MergeIndels [DONE] INFO 20:58:23,236 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_158.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,237 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_929.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,238 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_SNP_calls [NOT DONE] 5t/0d/0r/5p/0f INFO 20:58:23,239 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_HandFilter [NOT DONE] INFO 20:58:23,240 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_1122.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,240 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_VariantRecalibrator [NOT DONE] INFO 20:58:23,241 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_913.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,242 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_2037.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,243 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_VariantEval [NOT DONE] INFO 20:58:23,244 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_Cluster [NOT DONE] INFO 20:58:23,245 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_106.bam [DONE] 5t/5d/0r/0p/0f INFO 20:58:23,246 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_Cluster_and_Indel_filter [NOT DONE] INFO 20:58:23,247 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_ApplyVariantCuts [NOT DONE] INFO 20:58:23,248 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_GenomicAnnotator [NOT DONE] INFO 20:58:23,248 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_1713.bam [DONE] 5t/5d/0r/0p/0f git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4340 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/fullCallingPipeline.q | 14 +++ .../sting/queue/QCommandLine.scala | 12 ++- .../sting/queue/engine/QGraph.scala | 96 ++++++++++++++++++- .../queue/function/CommandLineFunction.scala | 10 ++ .../ScatterGatherableFunction.scala | 2 + 5 files changed, 129 insertions(+), 5 deletions(-) diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index 05fccf16c..c269a7af7 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -93,10 +93,12 @@ class fullCallingPipeline extends QScript { // create the cleaning commands val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs + targetCreator.analysisName = "CreateTargets_"+bam.getName targetCreator.input_file :+= bam targetCreator.out = indel_targets val realigner = new IndelRealigner with CommandLineGATKArgs + realigner.analysisName = "RealignBam_"+bam.getName realigner.input_file = targetCreator.input_file realigner.intervals = qscript.contigIntervals realigner.targetIntervals = new java.io.File(targetCreator.out.getAbsolutePath) @@ -139,11 +141,13 @@ class fullCallingPipeline extends QScript { fixMates.jarFile = qscript.picardFixMatesJar fixMates.unfixed = realigner.out fixMates.fixed = cleaned_bam + fixMates.analysisName = "FixMates_"+bam.getName // Add the fix mates explicitly } var samtoolsindex = new SamtoolsIndexFunction samtoolsindex.bamFile = cleaned_bam + samtoolsindex.analysisName = "index_"+cleaned_bam.getName // COMMENT THIS NEXT BLOCK TO SKIP CLEANING if ( realigner.scatterCount > 1 ) @@ -172,6 +176,7 @@ class fullCallingPipeline extends QScript { // step through the un-indel-cleaned graph: // 1a. call snps and indels val snps = new UnifiedGenotyper with CommandLineGATKArgs + snps.analysisName = base+"_SNP_calls" snps.input_file = bamFiles snps.group :+= "Standard" snps.out = new File(base+".vcf") @@ -203,6 +208,7 @@ class fullCallingPipeline extends QScript { var priority = "" for ( bam <- bamFiles ) { var indel = new IndelGenotyperV2 with CommandLineGATKArgs + indel.analysisName = "IndelGenotyper_"+bam.getName indel.input_file :+= bam indel.out = swapExt(bam,".bam",".indels.vcf") indel.downsample_to_coverage = Some(500) @@ -224,6 +230,7 @@ class fullCallingPipeline extends QScript { mergeIndels.priority = priority mergeIndels.variantmergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION) mergeIndels.rodBind = indelCallFiles + mergeIndels.analysisName = base+"_MergeIndels" // 1b. genomically annotate SNPs -- no longer slow @@ -234,6 +241,7 @@ class fullCallingPipeline extends QScript { annotated.out = swapExt(snps.out,".vcf",".annotated.vcf") annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet" annotated.rodToIntervalTrackName = "variant" + annotated.analysisName = base+"_GenomicAnnotator" // 2.a filter on cluster and near indels @@ -244,6 +252,7 @@ class fullCallingPipeline extends QScript { masker.clusterWindowSize = Some(qscript.snpClusterWindow) masker.clusterSize = Some(qscript.snpsInCluster) masker.out = swapExt(annotated.out,".vcf",".indel.masked.vcf") + masker.analysisName = base+"_Cluster_and_Indel_filter" // 2.b hand filter with standard filter @@ -253,6 +262,7 @@ class fullCallingPipeline extends QScript { handFilter.filterName ++= List("StrandBias","AlleleBalance","QualByDepth","HomopolymerRun") handFilter.filterExpression ++= List("\"SB>=0.10\"","\"AB>=0.75\"","\"QD<5.0\"","\"HRun>=4\"") handFilter.out = swapExt(annotated.out,".vcf",".handfiltered.vcf") + handFilter.analysisName = base+"_HandFilter" // 3.i generate gaussian clusters on the masked vcf @@ -267,6 +277,7 @@ class fullCallingPipeline extends QScript { clusters.jobQueue = "gsa" clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun") + clusters.analysisName = base+"_Cluster" // 3.ii apply gaussian clusters to the masked vcf @@ -278,6 +289,7 @@ class fullCallingPipeline extends QScript { recalibrate.target_titv = qscript.target_titv recalibrate.report_dat_file = swapExt(masker.out,".vcf",".recalibrate.dat") recalibrate.tranches_file = swapExt(masker.out,".vcf",".recalibrate.tranches") + recalibrate.analysisName = base+"_VariantRecalibrator" // 3.iii apply variant cuts to the clusters val cut = new ApplyVariantCuts with CommandLineGATKArgs @@ -286,6 +298,7 @@ class fullCallingPipeline extends QScript { cut.tranches_file = recalibrate.tranches_file // todo -- fdr inputs, etc cut.fdr_filter_level = Some(1) + cut.analysisName = base+"_ApplyVariantCuts" // 4. Variant eval the cut and the hand-filtered vcf files @@ -294,6 +307,7 @@ class fullCallingPipeline extends QScript { eval.rodBind :+= RodBind("evalHandFiltered", "VCF", handFilter.out) eval.evalModule ++= List("CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator") eval.out = new File(base+".eval") + eval.analysisName = base+"_VariantEval" add(snps) diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 78956e8bc..a18a8ef69 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -35,6 +35,9 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden private var runScripts = false + @Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false) + private var getStatus = false + @ArgumentCollection private val qSettings = new QSettings @@ -62,8 +65,13 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } - logger.info("Running generated graph") - qGraph.run + if ( ! getStatus ) { + logger.info("Running generated graph") + qGraph.run + } else { + logger.info("Checking pipeline status") + qGraph.checkStatus + } logger.info("Done") 0 diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index f33831163..d78b990b4 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -13,11 +13,13 @@ import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.{QSettings, QException} import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction} +import collection.mutable.HashMap /** * The internal dependency tracker between sets of function input and output files. */ class QGraph extends Logging { + var status = false var dryRun = true var bsubAllJobs = false var bsubWaitJobs = false @@ -74,19 +76,26 @@ class QGraph extends Logging { val isReady = numMissingValues == 0 - if (isReady || this.dryRun) + if ( (isReady || this.dryRun) && ! this.status ) runJobs if (numMissingValues > 0) { logger.error("Total missing values: " + numMissingValues) } - if (isReady && this.dryRun) { + if (isReady && this.dryRun && ! this.status) { logger.info("Dry run completed successfully!") logger.info("Re-run with \"-run\" to execute the functions.") } } + def checkStatus = { + // build up the full DAG with scatter-gather jobs + this.status = true + run + runStatus + } + /** * Walks up the graph looking for the previous LsfJobs. * @param function Function to examine for a previous command line job. @@ -241,7 +250,88 @@ class QGraph extends Logging { } /** - * Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction. + * Gets job statuses by traversing the graph and looking for status-related files + */ + private def runStatus = { + var statuses: HashMap[String,HashMap[String,Int]] = new HashMap[String,HashMap[String,Int]] + loop( + edgeFunction = { case edgeCLF => { + if ( edgeCLF.analysisName != null && ! edgeCLF.outputs.forall(file => file.getName.endsWith(".out") || file.getName.endsWith(".err") )) { + if ( ! statuses.keySet.contains(edgeCLF.analysisName) ) { + statuses.put(edgeCLF.analysisName,emptyStatusMap) + } + updateMap(statuses(edgeCLF.analysisName),edgeCLF) + } + } + }) + formatStatus(statuses) + } + + /** + * Creates an empty map with keys for status updates, todo -- make this nicer somehow + */ + private def emptyStatusMap = { + var sMap = new HashMap[String,Int] + sMap.put("status",-1) + sMap.put("sgTotal",0) + sMap.put("sgDone",0) + sMap.put("sgRunning",0) + sMap.put("sgFailed",0) + // note -- pending = total - done - run - failed + sMap + } + + /** + * Updates a status map with scatter/gather status information (e.g. counts) + */ + private def updateMap(stats: HashMap[String,Int], clf: CommandLineFunction) = { + if ( clf.isGather ) { + logger.debug(clf.analysisName+": "+clf.doneOutputs.mkString(", ")) + if ( clf.isDone ) { + stats("status") = 1 + } else { + stats("status") = 0 + } + } else { + stats("sgTotal") = (stats("sgTotal") + 1) + if ( clf.isDone ) { + stats("sgDone") = (stats("sgDone") + 1) + } + } + } + + /** + * Formats a complete status map (analysis name --> map {string, int}) into nice strings + */ + private def formatStatus(stats: HashMap[String,HashMap[String,Int]]) = { + stats.foreach{ case(analysisName, status) => { + var infoStr = analysisName + val doneInt = status("status") + if ( doneInt == 1 ) { + infoStr += " [DONE]" + } else if ( doneInt == 0 ) { + infoStr += " [NOT DONE]" + } else { + infoStr += " [UNKNOWN]" + } + + if ( status("sgTotal") > 0 ) { + val sgTot = status("sgTotal") + val sgDone = status("sgDone") + val sgRun = status("sgRunning") + val sgFailed = status("sgFailed") + val sgPend = (sgTot - sgDone - sgRun - sgFailed) + infoStr += " %dt/%dd/%dr/%dp/%df".format(sgTot,sgDone,sgRun,sgPend,sgFailed) + } + + logger.info(infoStr) + } + + } + } + + /** + * Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction. * @return A new graph */ private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] { diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 92a5485c3..31d0d5541 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -14,6 +14,12 @@ import org.broadinstitute.sting.queue.{QSettings, QException} trait CommandLineFunction extends QFunction with Logging { def commandLine: String + /** Analysis function name */ + var analysisName: String = _ + + /** Set to true if this is a standalone or gather function */ + var isGather: Boolean = true + /** Default settings */ var qSettings: QSettings = _ @@ -167,6 +173,10 @@ trait CommandLineFunction extends QFunction with Logging { //doneFiles } + def isDone = { + doneOutputs.size == 0 || doneOutputs.forall(_.exists) + } + /** * Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different * @return boolean -- if files are different diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index b7c787476..883e6f95e 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -262,6 +262,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) gatherFunction.originalOutput = this.getFieldFile(gatherField) gatherFunction.setOriginalFunction(this, gatherField) + gatherFunction.analysisName = this.analysisName if (this.setupGatherFunction != null) if (this.setupGatherFunction.isDefinedAt(gatherFunction, gatherField)) this.setupGatherFunction(gatherFunction, gatherField) @@ -289,6 +290,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) this.setupCloneFunction(cloneFunction, index) + cloneFunction.isGather = false } /**