DOT visualization with Queue. More sophisticated recalibation queue script with scatter/gather

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3799 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2010-07-15 22:32:48 +00:00
parent ab84ed8c68
commit 81eef0d993
3 changed files with 46 additions and 19 deletions

View File

@ -4,26 +4,30 @@ import org.apache.commons.io.FilenameUtils;
val unusedArgs = setArgs(args)
def runPipeline() = {
def runPipeline(arg: String) = {
val scatter = arg == "scatter"
for (bamIn <- inputs(".bam")) {
val root = bamIn.getPath()
val bamRoot = FilenameUtils.removeExtension(root);
val recalData = bamRoot + ".recal_data.csv"
val recalBam = bamRoot + ".recal.bam"
val recalRecalData = bamRoot + ".recal.recal_data.csv"
val recalData = new File(bamRoot + ".recal_data.csv")
val recalBam = new File(bamRoot + ".recal.bam")
val recalRecalData = new File(bamRoot + ".recal.recal_data.csv")
//add(new CountCovariates(root, recalData, "-OQ"))
val tableRecal = new TableRecalibrate(root, recalData, recalBam)
tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list")
tableRecal.scatterCount = 25
val tableRecal = new TableRecalibrate(bamIn, recalData, recalBam, "-OQ")
if ( scatter ) {
tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list")
tableRecal.scatterCount = 25
}
add(tableRecal)
add(new Index(recalBam))
add(new CountCovariates(recalBam, recalRecalData))
add(new AnalyzeCovariates(recalData, recalData + ".analyzeCovariates"))
add(new AnalyzeCovariates(recalRecalData, recalRecalData + ".analyzeCovariates"))
add(new CountCovariates(recalBam, recalRecalData, "-nt 4"))
add(new AnalyzeCovariates(recalData, new File(recalData.getPath() + ".analyzeCovariates")))
add(new AnalyzeCovariates(recalRecalData, new File(recalRecalData.getPath() + ".analyzeCovariates")))
}
}
runPipeline()
runPipeline(unusedArgs(0))
// Populate parameters passed in via -P
setParams
@ -31,30 +35,30 @@ setParams
// Run the pipeline
run
class Index(bamIn: String) extends GatkFunction {
class Index(bamIn: File) extends GatkFunction {
@Input(doc="foo") var bam = bamIn
memoryLimit = Some(1)
def commandLine = "samtools index %s".format(bam)
}
class CountCovariates(bamIn: String, recalDataIn: String, args: String = "") extends GatkFunction {
class CountCovariates(bamIn: File, recalDataIn: File, args: String = "") extends GatkFunction {
@Input(doc="foo") var bam = bamIn
@Output(doc="foo") var recalData = recalDataIn
memoryLimit = Some(4)
def commandLine = gatkCommandLine("CountCovariates") + args + " -l INFO -D /humgen/gsa-hpprojects/GATK/data/dbsnp_129_hg18.rod -I %s --max_reads_at_locus 20000 -cov ReadGroupCovariate -cov QualityScoreCovariate -cov CycleCovariate -cov DinucCovariate -recalFile %s".format(bam, recalData)
}
class TableRecalibrate(bamInArg: String, recalDataIn: String, bamOutArg: String) extends GatkFunction {
class TableRecalibrate(bamInArg: File, recalDataIn: File, bamOutArg: File, args: String = "") extends GatkFunction {
@Input(doc="foo") var bamIn = bamInArg
@Input(doc="foo") var recalData = recalDataIn
@Gather(classOf[BamGatherFunction])
@Output(doc="foo") var bamOut = new File(bamOutArg)
@Output(doc="foo") var bamOut = bamOutArg
memoryLimit = Some(2)
def commandLine = gatkCommandLine("TableRecalibration") + "-l INFO -I %s -recalFile %s -outputBam %s".format(bamIn, recalData, bamOut.getPath())
def commandLine = gatkCommandLine("TableRecalibration") + args + " -l INFO -I %s -recalFile %s -outputBam %s".format(bamIn, recalData, bamOut) // bamOut.getPath())
}
class AnalyzeCovariates(recalDataIn: String, outputDir: String) extends GatkFunction {
class AnalyzeCovariates(recalDataIn: File, outputDir: File) extends GatkFunction {
@Input(doc="foo") var recalData = recalDataIn
memoryLimit = Some(4)
def commandLine = "java -Xmx4g -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/AnalyzeCovariates.jar -recalFile %s -outputDir %s -resources /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/R/ -ignoreQ 5".format(recalData, outputDir)
}
def commandLine = "java -Xmx4g -jar /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/dist/AnalyzeCovariates.jar -recalFile %s -outputDir %s -resources /home/radon01/depristo/dev/GenomeAnalysisTK/trunk/R/ -ignoreQ 5 -Rscript /broad/tools/apps/R-2.6.0/bin/Rscript".format(recalData, outputDir)
}

View File

@ -103,6 +103,7 @@ object QScript {
qGraph.add(function)
qGraph.fillIn
qGraph.run
qGraph.renderToDot(new File("queue.dot"))
}
}
}

View File

@ -1,5 +1,6 @@
package org.broadinstitute.sting.queue.engine
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.graph.SimpleDirectedGraph
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
@ -9,6 +10,9 @@ import org.broadinstitute.sting.queue.util.{CollectionUtils, Logging}
import org.broadinstitute.sting.queue.QException
import org.jgrapht.alg.CycleDetector
import org.jgrapht.EdgeFactory
import org.jgrapht.ext.DOTExporter
import org.broadinstitute.sting.queue.function.DispatchFunction
import org.broadinstitute.sting.queue.function.gatk.GatkFunction
class QGraph extends Logging {
var dryRun = true
@ -146,4 +150,22 @@ class QGraph extends Logging {
private def isOrphan(node: QNode) =
(jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0
def renderToDot(file: java.io.File) = {
val out = new java.io.FileWriter(file)
// todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a
// todo -- visualizeString() command, or something that shows inputs / outputs
val ve = new org.jgrapht.ext.EdgeNameProvider[QFunction] {
def getEdgeName( function: QFunction ) = function match {
case f: DispatchFunction => f.jobName + " => " + f.commandLine
case _ => ""
}
}
//val iterator = new TopologicalOrderIterator(qGraph.jobGraph)
(new DOTExporter(new org.jgrapht.ext.IntegerNameProvider[QNode](), null, ve)).export(out, jobGraph)
out.close
}
}