From 81eef0d9939cc72a989914cbf1ffb819b362f9e8 Mon Sep 17 00:00:00 2001 From: depristo Date: Thu, 15 Jul 2010 22:32:48 +0000 Subject: [PATCH] DOT visualization with Queue. More sophisticated recalibation queue script with scatter/gather git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3799 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/recalibrate.scala | 42 ++++++++++--------- .../broadinstitute/sting/queue/QScript.scala | 1 + .../sting/queue/engine/QGraph.scala | 22 ++++++++++ 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/scala/qscript/recalibrate.scala b/scala/qscript/recalibrate.scala index c150bc8d7..4954a2417 100755 --- a/scala/qscript/recalibrate.scala +++ b/scala/qscript/recalibrate.scala @@ -4,26 +4,30 @@ import org.apache.commons.io.FilenameUtils; val unusedArgs = setArgs(args) -def runPipeline() = { +def runPipeline(arg: String) = { + val scatter = arg == "scatter" + for (bamIn <- inputs(".bam")) { val root = bamIn.getPath() val bamRoot = FilenameUtils.removeExtension(root); - val recalData = bamRoot + ".recal_data.csv" - val recalBam = bamRoot + ".recal.bam" - val recalRecalData = bamRoot + ".recal.recal_data.csv" + val recalData = new File(bamRoot + ".recal_data.csv") + val recalBam = new File(bamRoot + ".recal.bam") + val recalRecalData = new File(bamRoot + ".recal.recal_data.csv") //add(new CountCovariates(root, recalData, "-OQ")) - val tableRecal = new TableRecalibrate(root, recalData, recalBam) - tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list") - tableRecal.scatterCount = 25 + val tableRecal = new TableRecalibrate(bamIn, recalData, recalBam, "-OQ") + if ( scatter ) { + tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list") + tableRecal.scatterCount = 25 + } add(tableRecal) add(new Index(recalBam)) - add(new CountCovariates(recalBam, recalRecalData)) - add(new AnalyzeCovariates(recalData, recalData + ".analyzeCovariates")) - add(new AnalyzeCovariates(recalRecalData, recalRecalData + ".analyzeCovariates")) + add(new CountCovariates(recalBam, recalRecalData, "-nt 4")) + add(new AnalyzeCovariates(recalData, new File(recalData.getPath() + ".analyzeCovariates"))) + add(new AnalyzeCovariates(recalRecalData, new File(recalRecalData.getPath() + ".analyzeCovariates"))) } } -runPipeline() +runPipeline(unusedArgs(0)) // Populate parameters passed in via -P setParams @@ -31,30 +35,30 @@ setParams // Run the pipeline run -class Index(bamIn: String) extends GatkFunction { +class Index(bamIn: File) extends GatkFunction { @Input(doc="foo") var bam = bamIn memoryLimit = Some(1) def commandLine = "samtools index %s".format(bam) } -class CountCovariates(bamIn: String, recalDataIn: String, args: String = "") extends GatkFunction { +class CountCovariates(bamIn: File, recalDataIn: File, args: String = "") extends GatkFunction { @Input(doc="foo") var bam = bamIn @Output(doc="foo") var recalData = recalDataIn memoryLimit = Some(4) def commandLine = gatkCommandLine("CountCovariates") + args + " -l INFO -D /humgen/gsa-hpprojects/GATK/data/dbsnp_129_hg18.rod -I %s --max_reads_at_locus 20000 -cov ReadGroupCovariate -cov QualityScoreCovariate -cov CycleCovariate -cov DinucCovariate -recalFile %s".format(bam, recalData) } -class TableRecalibrate(bamInArg: String, recalDataIn: String, bamOutArg: String) extends GatkFunction { +class TableRecalibrate(bamInArg: File, recalDataIn: File, bamOutArg: File, args: String = "") extends GatkFunction { @Input(doc="foo") var bamIn = bamInArg @Input(doc="foo") var recalData = recalDataIn @Gather(classOf[BamGatherFunction]) - @Output(doc="foo") var bamOut = new File(bamOutArg) + @Output(doc="foo") var bamOut = bamOutArg memoryLimit = Some(2) - def commandLine = gatkCommandLine("TableRecalibration") + "-l INFO -I %s -recalFile %s -outputBam %s".format(bamIn, recalData, bamOut.getPath()) + def commandLine = gatkCommandLine("TableRecalibration") + args + " -l INFO -I %s -recalFile %s -outputBam %s".format(bamIn, recalData, bamOut) // bamOut.getPath()) } -class AnalyzeCovariates(recalDataIn: String, outputDir: String) extends GatkFunction { +class AnalyzeCovariates(recalDataIn: File, outputDir: File) extends GatkFunction { @Input(doc="foo") var recalData = recalDataIn memoryLimit = Some(4) - def commandLine = "java -Xmx4g -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/AnalyzeCovariates.jar -recalFile %s -outputDir %s -resources /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/R/ -ignoreQ 5".format(recalData, outputDir) -} \ No newline at end of file + def commandLine = "java -Xmx4g -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/AnalyzeCovariates.jar -recalFile %s -outputDir %s -resources /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/R/ -ignoreQ 5 -Rscript /broad/tools/apps/R-2.6.0/bin/Rscript".format(recalData, outputDir) +} diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index aa2861e17..7fa24e9ee 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -103,6 +103,7 @@ object QScript { qGraph.add(function) qGraph.fillIn qGraph.run + qGraph.renderToDot(new File("queue.dot")) } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 5ebaba6d9..9cade0c84 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -1,5 +1,6 @@ package org.broadinstitute.sting.queue.engine +import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.graph.SimpleDirectedGraph import scala.collection.JavaConversions import scala.collection.JavaConversions._ @@ -9,6 +10,9 @@ import org.broadinstitute.sting.queue.util.{CollectionUtils, Logging} import org.broadinstitute.sting.queue.QException import org.jgrapht.alg.CycleDetector import org.jgrapht.EdgeFactory +import org.jgrapht.ext.DOTExporter +import org.broadinstitute.sting.queue.function.DispatchFunction +import org.broadinstitute.sting.queue.function.gatk.GatkFunction class QGraph extends Logging { var dryRun = true @@ -146,4 +150,22 @@ class QGraph extends Logging { private def isOrphan(node: QNode) = (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 + + def renderToDot(file: java.io.File) = { + val out = new java.io.FileWriter(file) + + // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a + // todo -- visualizeString() command, or something that shows inputs / outputs + val ve = new org.jgrapht.ext.EdgeNameProvider[QFunction] { + def getEdgeName( function: QFunction ) = function match { + case f: DispatchFunction => f.jobName + " => " + f.commandLine + case _ => "" + } + } + + //val iterator = new TopologicalOrderIterator(qGraph.jobGraph) + (new DOTExporter(new org.jgrapht.ext.IntegerNameProvider[QNode](), null, ve)).export(out, jobGraph) + + out.close + } }