Queue now does job tracking (replace -run with -status in the command line). Produces output that looks like:

INFO  20:58:17,827 QCommandLine - Checking pipeline status 
INFO  20:58:23,234 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_MergeIndels [DONE] 
INFO  20:58:23,236 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_158.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,237 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_929.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,238 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_SNP_calls [NOT DONE] 5t/0d/0r/5p/0f 
INFO  20:58:23,239 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_HandFilter [NOT DONE] 
INFO  20:58:23,240 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_1122.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,240 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_VariantRecalibrator [NOT DONE] 
INFO  20:58:23,241 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_913.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,242 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_2037.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,243 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_VariantEval [NOT DONE] 
INFO  20:58:23,244 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_Cluster [NOT DONE] 
INFO  20:58:23,245 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_106.bam [DONE] 5t/5d/0r/0p/0f 
INFO  20:58:23,246 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_Cluster_and_Indel_filter [NOT DONE] 
INFO  20:58:23,247 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_ApplyVariantCuts [NOT DONE] 
INFO  20:58:23,248 QGraph$$anonfun$formatStatus$1 - Height_Hirschhorn_NHGRI.uncleaned_GenomicAnnotator [NOT DONE] 
INFO  20:58:23,248 QGraph$$anonfun$formatStatus$1 - IndelGenotyper_1713.bam [DONE] 5t/5d/0r/0p/0f 




git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4340 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
chartl 2010-09-24 00:59:09 +00:00
parent 4ed9f437e9
commit c355afc320
5 changed files with 129 additions and 5 deletions

View File

@ -93,10 +93,12 @@ class fullCallingPipeline extends QScript {
// create the cleaning commands
val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs
targetCreator.analysisName = "CreateTargets_"+bam.getName
targetCreator.input_file :+= bam
targetCreator.out = indel_targets
val realigner = new IndelRealigner with CommandLineGATKArgs
realigner.analysisName = "RealignBam_"+bam.getName
realigner.input_file = targetCreator.input_file
realigner.intervals = qscript.contigIntervals
realigner.targetIntervals = new java.io.File(targetCreator.out.getAbsolutePath)
@ -139,11 +141,13 @@ class fullCallingPipeline extends QScript {
fixMates.jarFile = qscript.picardFixMatesJar
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+bam.getName
// Add the fix mates explicitly
}
var samtoolsindex = new SamtoolsIndexFunction
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_"+cleaned_bam.getName
// COMMENT THIS NEXT BLOCK TO SKIP CLEANING
if ( realigner.scatterCount > 1 )
@ -172,6 +176,7 @@ class fullCallingPipeline extends QScript {
// step through the un-indel-cleaned graph:
// 1a. call snps and indels
val snps = new UnifiedGenotyper with CommandLineGATKArgs
snps.analysisName = base+"_SNP_calls"
snps.input_file = bamFiles
snps.group :+= "Standard"
snps.out = new File(base+".vcf")
@ -203,6 +208,7 @@ class fullCallingPipeline extends QScript {
var priority = ""
for ( bam <- bamFiles ) {
var indel = new IndelGenotyperV2 with CommandLineGATKArgs
indel.analysisName = "IndelGenotyper_"+bam.getName
indel.input_file :+= bam
indel.out = swapExt(bam,".bam",".indels.vcf")
indel.downsample_to_coverage = Some(500)
@ -224,6 +230,7 @@ class fullCallingPipeline extends QScript {
mergeIndels.priority = priority
mergeIndels.variantmergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION)
mergeIndels.rodBind = indelCallFiles
mergeIndels.analysisName = base+"_MergeIndels"
// 1b. genomically annotate SNPs -- no longer slow
@ -234,6 +241,7 @@ class fullCallingPipeline extends QScript {
annotated.out = swapExt(snps.out,".vcf",".annotated.vcf")
annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet"
annotated.rodToIntervalTrackName = "variant"
annotated.analysisName = base+"_GenomicAnnotator"
// 2.a filter on cluster and near indels
@ -244,6 +252,7 @@ class fullCallingPipeline extends QScript {
masker.clusterWindowSize = Some(qscript.snpClusterWindow)
masker.clusterSize = Some(qscript.snpsInCluster)
masker.out = swapExt(annotated.out,".vcf",".indel.masked.vcf")
masker.analysisName = base+"_Cluster_and_Indel_filter"
// 2.b hand filter with standard filter
@ -253,6 +262,7 @@ class fullCallingPipeline extends QScript {
handFilter.filterName ++= List("StrandBias","AlleleBalance","QualByDepth","HomopolymerRun")
handFilter.filterExpression ++= List("\"SB>=0.10\"","\"AB>=0.75\"","\"QD<5.0\"","\"HRun>=4\"")
handFilter.out = swapExt(annotated.out,".vcf",".handfiltered.vcf")
handFilter.analysisName = base+"_HandFilter"
// 3.i generate gaussian clusters on the masked vcf
@ -267,6 +277,7 @@ class fullCallingPipeline extends QScript {
clusters.jobQueue = "gsa"
clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun")
clusters.analysisName = base+"_Cluster"
// 3.ii apply gaussian clusters to the masked vcf
@ -278,6 +289,7 @@ class fullCallingPipeline extends QScript {
recalibrate.target_titv = qscript.target_titv
recalibrate.report_dat_file = swapExt(masker.out,".vcf",".recalibrate.dat")
recalibrate.tranches_file = swapExt(masker.out,".vcf",".recalibrate.tranches")
recalibrate.analysisName = base+"_VariantRecalibrator"
// 3.iii apply variant cuts to the clusters
val cut = new ApplyVariantCuts with CommandLineGATKArgs
@ -286,6 +298,7 @@ class fullCallingPipeline extends QScript {
cut.tranches_file = recalibrate.tranches_file
// todo -- fdr inputs, etc
cut.fdr_filter_level = Some(1)
cut.analysisName = base+"_ApplyVariantCuts"
// 4. Variant eval the cut and the hand-filtered vcf files
@ -294,6 +307,7 @@ class fullCallingPipeline extends QScript {
eval.rodBind :+= RodBind("evalHandFiltered", "VCF", handFilter.out)
eval.evalModule ++= List("CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator")
eval.out = new File(base+".eval")
eval.analysisName = base+"_VariantEval"
add(snps)

View File

@ -35,6 +35,9 @@ class QCommandLine extends CommandLineProgram with Logging {
@Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden
private var runScripts = false
@Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false)
private var getStatus = false
@ArgumentCollection
private val qSettings = new QSettings
@ -62,8 +65,13 @@ class QCommandLine extends CommandLineProgram with Logging {
logger.info("Added " + script.functions.size + " functions")
}
logger.info("Running generated graph")
qGraph.run
if ( ! getStatus ) {
logger.info("Running generated graph")
qGraph.run
} else {
logger.info("Checking pipeline status")
qGraph.checkStatus
}
logger.info("Done")
0

View File

@ -13,11 +13,13 @@ import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.{QSettings, QException}
import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction}
import collection.mutable.HashMap
/**
* The internal dependency tracker between sets of function input and output files.
*/
class QGraph extends Logging {
var status = false
var dryRun = true
var bsubAllJobs = false
var bsubWaitJobs = false
@ -74,19 +76,26 @@ class QGraph extends Logging {
val isReady = numMissingValues == 0
if (isReady || this.dryRun)
if ( (isReady || this.dryRun) && ! this.status )
runJobs
if (numMissingValues > 0) {
logger.error("Total missing values: " + numMissingValues)
}
if (isReady && this.dryRun) {
if (isReady && this.dryRun && ! this.status) {
logger.info("Dry run completed successfully!")
logger.info("Re-run with \"-run\" to execute the functions.")
}
}
def checkStatus = {
// build up the full DAG with scatter-gather jobs
this.status = true
run
runStatus
}
/**
* Walks up the graph looking for the previous LsfJobs.
* @param function Function to examine for a previous command line job.
@ -241,7 +250,88 @@ class QGraph extends Logging {
}
/**
* Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction.
* Gets job statuses by traversing the graph and looking for status-related files
*/
private def runStatus = {
var statuses: HashMap[String,HashMap[String,Int]] = new HashMap[String,HashMap[String,Int]]
loop(
edgeFunction = { case edgeCLF => {
if ( edgeCLF.analysisName != null && ! edgeCLF.outputs.forall(file => file.getName.endsWith(".out") || file.getName.endsWith(".err") )) {
if ( ! statuses.keySet.contains(edgeCLF.analysisName) ) {
statuses.put(edgeCLF.analysisName,emptyStatusMap)
}
updateMap(statuses(edgeCLF.analysisName),edgeCLF)
}
}
})
formatStatus(statuses)
}
/**
* Creates an empty map with keys for status updates, todo -- make this nicer somehow
*/
private def emptyStatusMap = {
var sMap = new HashMap[String,Int]
sMap.put("status",-1)
sMap.put("sgTotal",0)
sMap.put("sgDone",0)
sMap.put("sgRunning",0)
sMap.put("sgFailed",0)
// note -- pending = total - done - run - failed
sMap
}
/**
* Updates a status map with scatter/gather status information (e.g. counts)
*/
private def updateMap(stats: HashMap[String,Int], clf: CommandLineFunction) = {
if ( clf.isGather ) {
logger.debug(clf.analysisName+": "+clf.doneOutputs.mkString(", "))
if ( clf.isDone ) {
stats("status") = 1
} else {
stats("status") = 0
}
} else {
stats("sgTotal") = (stats("sgTotal") + 1)
if ( clf.isDone ) {
stats("sgDone") = (stats("sgDone") + 1)
}
}
}
/**
* Formats a complete status map (analysis name --> map {string, int}) into nice strings
*/
private def formatStatus(stats: HashMap[String,HashMap[String,Int]]) = {
stats.foreach{ case(analysisName, status) => {
var infoStr = analysisName
val doneInt = status("status")
if ( doneInt == 1 ) {
infoStr += " [DONE]"
} else if ( doneInt == 0 ) {
infoStr += " [NOT DONE]"
} else {
infoStr += " [UNKNOWN]"
}
if ( status("sgTotal") > 0 ) {
val sgTot = status("sgTotal")
val sgDone = status("sgDone")
val sgRun = status("sgRunning")
val sgFailed = status("sgFailed")
val sgPend = (sgTot - sgDone - sgRun - sgFailed)
infoStr += " %dt/%dd/%dr/%dp/%df".format(sgTot,sgDone,sgRun,sgPend,sgFailed)
}
logger.info(infoStr)
}
}
}
/**
* Creates a new graph where if new edges are needed (for cyclic dependency checking) they can be automatically created using a generic MappingFunction.
* @return A new graph
*/
private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] {

View File

@ -14,6 +14,12 @@ import org.broadinstitute.sting.queue.{QSettings, QException}
trait CommandLineFunction extends QFunction with Logging {
def commandLine: String
/** Analysis function name */
var analysisName: String = _
/** Set to true if this is a standalone or gather function */
var isGather: Boolean = true
/** Default settings */
var qSettings: QSettings = _
@ -167,6 +173,10 @@ trait CommandLineFunction extends QFunction with Logging {
//doneFiles
}
def isDone = {
doneOutputs.size == 0 || doneOutputs.forall(_.exists)
}
/**
* Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different
* @return boolean -- if files are different

View File

@ -262,6 +262,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
gatherFunction.analysisName = this.analysisName
if (this.setupGatherFunction != null)
if (this.setupGatherFunction.isDefinedAt(gatherFunction, gatherField))
this.setupGatherFunction(gatherFunction, gatherField)
@ -289,6 +290,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
if (this.setupCloneFunction != null)
if (this.setupCloneFunction.isDefinedAt(cloneFunction, index))
this.setupCloneFunction(cloneFunction, index)
cloneFunction.isGather = false
}
/**