QScript authors can now tag functions as intermediate. Functions tagged as intermediate will be skipped unless another function in the graph needs their output.

Re-logging the failed jobs and the path to their log files at the end of a run.
Added a parameter -bigMemQueue for the fullCallingPipeline.q instead of hardcoding gsa (gsa was backed up and it was actually faster to run on week).


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4520 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-18 22:11:14 +00:00
parent 524cb8257c
commit 81479229e1
8 changed files with 131 additions and 55 deletions

View File

@ -58,6 +58,9 @@ class fullCallingPipeline extends QScript {
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
//var protocol: String = _
@Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false)
var big_mem_queue: String = _
private var pipeline: Pipeline = _
trait CommandLineGATKArgs extends CommandLineGATK {
@ -109,6 +112,7 @@ class fullCallingPipeline extends QScript {
targetCreator.input_file :+= bam
targetCreator.out = indel_targets
targetCreator.memoryLimit = Some(2)
targetCreator.isIntermediate = true
val realigner = new IndelRealigner with CommandLineGATKArgs
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
@ -117,6 +121,7 @@ class fullCallingPipeline extends QScript {
realigner.intervals = qscript.contigIntervals
realigner.targetIntervals = targetCreator.out
realigner.scatterCount = contigCount
realigner.isIntermediate = true
// may need to explicitly run fix mates
var fixMates = new PicardBamJarFunction {
@ -170,6 +175,7 @@ class fullCallingPipeline extends QScript {
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+sampleId
fixMates.isIntermediate = true
// Add the fix mates explicitly
}
@ -177,6 +183,7 @@ class fullCallingPipeline extends QScript {
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_cleaned_"+sampleId
samtoolsindex.isIntermediate = true
if (!qscript.skip_cleaning) {
if ( realigner.scatterCount > 1 ) {
@ -282,7 +289,7 @@ class fullCallingPipeline extends QScript {
mergeIndels.rodBind = indelCallFiles
mergeIndels.analysisName = base+"_MergeIndels"
mergeIndels.memoryLimit = Some(16)
mergeIndels.jobQueue = "gsa"
mergeIndels.jobQueue = qscript.big_mem_queue
// 1b. genomically annotate SNPs -- no longer slow
val annotated = new GenomicAnnotator with CommandLineGATKArgs
@ -326,7 +333,7 @@ class fullCallingPipeline extends QScript {
val clusters_clusterFile = swapExt("SnpCalls/IntermediateFiles",snps.out,".vcf",".cluster")
clusters.clusterFile = clusters_clusterFile
clusters.memoryLimit = Some(6)
clusters.jobQueue = "gsa"
clusters.jobQueue = qscript.big_mem_queue
clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun")
clusters.analysisName = base+"_Cluster"

View File

@ -86,6 +86,7 @@ class QCommandLine extends CommandLineProgram with Logging {
if (qGraph.hasFailed) {
logger.info("Done with errors")
qGraph.logFailed
1
} else {
logger.info("Done")

View File

@ -29,6 +29,10 @@ class FunctionEdge(var function: QFunction) extends QEdge {
currentStatus
}
def markAsSkipped() = {
currentStatus = RunnerStatus.SKIPPED
}
def resetToPending() = {
currentStatus = RunnerStatus.PENDING
function.doneOutputs.foreach(_.delete())

View File

@ -13,6 +13,7 @@ import org.broadinstitute.sting.queue.{QSettings, QException}
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging}
import org.apache.commons.lang.StringUtils
/**
* The internal dependency tracker between sets of function input and output files.
@ -135,14 +136,14 @@ class QGraph extends Logging {
* @param qGraph The graph that contains the jobs.
* @return A list of prior jobs.
*/
def previousFunctions(edge: QEdge) : List[FunctionEdge] = {
private def previousFunctions(edge: QEdge): List[FunctionEdge] = {
var previous = List.empty[FunctionEdge]
val source = this.jobGraph.getEdgeSource(edge)
for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along the edge and return its job id
// Stop recursing when we find a job along the edge and return its job id
case functionEdge: FunctionEdge => previous :+= functionEdge
// For any other type of edge find the jobs preceding the edge
@ -266,11 +267,10 @@ class QGraph extends Logging {
private def runJobs() = {
try {
traverseFunctions(edge => {
val isDone = !this.startClean &&
edge.status == RunnerStatus.DONE &&
this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
if (!isDone)
if (startClean)
edge.resetToPending()
else
checkDone(edge)
})
var readyJobs = getReadyJobs
@ -312,6 +312,43 @@ class QGraph extends Logging {
}
}
/**
* Checks if an edge is done or if it's an intermediate edge if it can be skipped.
* This function may modify previous edges if it discovers that the edge passed in
* is dependent jobs that were previously marked as skipped.
* @param edge Edge to check to see if it's done or can be skipped.
*/
private def checkDone(edge: FunctionEdge) = {
if (edge.function.isIntermediate) {
// By default we do not need to run intermediate edges.
// Mark any intermediate edges as skipped, if they're not already done.
if (edge.status != RunnerStatus.DONE)
edge.markAsSkipped()
} else {
val previous = this.previousFunctions(edge)
val isDone = edge.status == RunnerStatus.DONE &&
previous.forall(edge => edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED)
if (!isDone) {
edge.resetToPending()
resetPreviousSkipped(edge, previous)
}
}
}
/**
* From the previous edges, resets any that are marked as skipped to pending.
* If those that are reset have skipped edges, those skipped edges are recursively also set
* to pending.
* @param edge Dependent edge.
* @param previous Previous edges that provide inputs to edge.
*/
private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge]): Unit = {
for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) {
previousEdge.resetToPending()
resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge))
}
}
private def newRunner(f: QFunction) = {
f match {
case cmd: CommandLineFunction =>
@ -403,6 +440,7 @@ class QGraph extends Logging {
var total = 0
var done = 0
var failed = 0
var skipped = 0
}
/**
@ -442,18 +480,25 @@ class QGraph extends Logging {
})
statuses.foreach(status => {
if (status.scatter.total + status.gather.total > 0) {
val sgTotal = status.scatter.total + status.gather.total
val sgDone = status.scatter.done + status.gather.done
val sgFailed = status.scatter.failed + status.gather.failed
val sgSkipped = status.scatter.skipped + status.gather.skipped
if (sgTotal > 0) {
var sgStatus = RunnerStatus.PENDING
if (status.scatter.failed + status.gather.failed > 0)
if (sgFailed > 0)
sgStatus = RunnerStatus.FAILED
else if (status.scatter.done + status.gather.done == status.scatter.total + status.gather.total)
else if (sgDone == sgTotal)
sgStatus = RunnerStatus.DONE
else if (status.scatter.done + status.gather.done > 0)
else if (sgDone + sgSkipped == sgTotal)
sgStatus = RunnerStatus.SKIPPED
else if (sgDone > 0)
sgStatus = RunnerStatus.RUNNING
status.status = sgStatus
}
var info = ("%-" + maxWidth + "s [%#7s]").format(status.analysisName, status.status)
var info = ("%-" + maxWidth + "s [%s]")
.format(status.analysisName, StringUtils.center(status.status.toString, 7))
if (status.scatter.total + status.gather.total > 1) {
info += formatSGStatus(status.scatter, "s")
info += formatSGStatus(status.gather, "g")
@ -478,12 +523,9 @@ class QGraph extends Logging {
private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = {
stats.total += 1
edge.status match {
case RunnerStatus.DONE => {
stats.done += 1
}
case RunnerStatus.FAILED => {
stats.failed += 1
}
case RunnerStatus.DONE => stats.done += 1
case RunnerStatus.FAILED => stats.failed += 1
case RunnerStatus.SKIPPED => stats.skipped += 1
/* can't tell the difference between pending and running right now! */
case RunnerStatus.PENDING =>
case RunnerStatus.RUNNING =>
@ -661,6 +703,18 @@ class QGraph extends Logging {
})
}
def logFailed = {
foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) {
logger.error("-----")
logger.error("Failed: " + edge.function.description)
logger.error("Log: " + edge.function.jobOutputFile.getAbsolutePath)
if (edge.function.jobErrorFile != null)
logger.error("Error: " + edge.function.jobErrorFile.getAbsolutePath)
}
})
}
/**
* Kills any forked jobs still running.
*/

View File

@ -5,4 +5,5 @@ object RunnerStatus extends Enumeration {
val RUNNING = Value("running")
val FAILED = Value("failed")
val DONE = Value("done")
val SKIPPED = Value("skipped")
}

View File

@ -116,6 +116,12 @@ trait QFunction {
def outputFields = QFunction.classFields(this.functionFieldClass).outputFields
/** The @Argument fields on this CommandLineFunction. */
def argumentFields = QFunction.classFields(this.functionFieldClass).argumentFields
/**
* If true, unless another unfinished function is dependent on this function,
* this function will NOT be run even if the outputs have not been created.
*/
var isIntermediate = false
/**
* Returns the class that should be used for looking up fields.

View File

@ -12,49 +12,38 @@ class CloneFunction extends CommandLineFunction {
var index: Int = _
private var overriddenFields = Map.empty[ArgumentSource, Any]
private var withScatterPartCount = 0
private def withScatterPart[A](f: () => A): A = {
var originalValues = Map.empty[ArgumentSource, Any]
overriddenFields.foreach{
case (field, overrideValue) => {
originalValues += field -> originalFunction.getFieldValue(field)
originalFunction.setFieldValue(field, overrideValue)
withScatterPartCount += 1
if (withScatterPartCount == 1) {
overriddenFields.foreach{
case (field, overrideValue) => {
originalValues += field -> originalFunction.getFieldValue(field)
originalFunction.setFieldValue(field, overrideValue)
}
}
}
try {
f()
} finally {
originalValues.foreach{
case (name, value) =>
originalFunction.setFieldValue(name, value)
if (withScatterPartCount == 1) {
originalValues.foreach{
case (name, value) =>
originalFunction.setFieldValue(name, value)
}
}
withScatterPartCount -= 1
}
}
override def dotString = originalFunction.dotString
override def description = originalFunction.description
override def dotString = withScatterPart(() => originalFunction.dotString)
override def description = withScatterPart(() => originalFunction.description)
override protected def functionFieldClass = originalFunction.getClass
override def useStatusOutput(file: File) =
file != jobOutputFile && file != jobErrorFile && originalFunction.useStatusOutput(file)
override def freezeFieldValues = {
if (this.analysisName == null)
this.analysisName = originalFunction.analysisName
if (this.qSettings == null)
this.qSettings = originalFunction.qSettings
if (this.memoryLimit.isEmpty && originalFunction.memoryLimit.isDefined)
this.memoryLimit = originalFunction.memoryLimit
if (this.jobTempDir == null)
this.jobTempDir = originalFunction.jobTempDir
if (this.jobQueue == null)
this.jobQueue = originalFunction.jobQueue
if (this.jobProject == null)
this.jobProject = originalFunction.jobProject
if (this.jobName == null)
this.jobName = originalFunction.jobName
super.freezeFieldValues
}
def commandLine = withScatterPart(() => originalFunction.commandLine)
override def getFieldValue(source: ArgumentSource) = {

View File

@ -63,7 +63,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
*/
def generateFunctions() = {
var functions = List.empty[QFunction]
var tempDirectories = List.empty[File]
// Only depend on input fields that have a value
val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_))
@ -74,14 +73,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
// Create the scatter function based on @Scatter
val scatterFunction = this.newScatterFunction(this.scatterField)
syncFunction(scatterFunction)
scatterFunction.addOrder = this.addOrder :+ 1
scatterFunction.analysisName = this.analysisName
scatterFunction.qSettings = this.qSettings
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
scatterFunction.originalInput = originalInput
scatterFunction.setOriginalFunction(this, scatterField)
initScatterFunction(scatterFunction, this.scatterField)
tempDirectories :+= scatterFunction.commandDirectory
functions :+= scatterFunction
// Create the gather functions for each output field
@ -91,14 +88,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
for (gatherField <- outputFieldsWithValues) {
val gatherFunction = this.newGatherFunction(gatherField)
val gatherOutput = getFieldFile(gatherField)
syncFunction(gatherFunction)
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
gatherFunction.analysisName = this.analysisName
gatherFunction.qSettings = this.qSettings
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
initGatherFunction(gatherFunction, gatherField)
tempDirectories :+= gatherFunction.commandDirectory
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput
@ -110,11 +105,13 @@ trait ScatterGatherableFunction extends CommandLineFunction {
for (i <- 1 to this.scatterCount) {
val cloneFunction = this.newCloneFunction()
syncFunction(cloneFunction)
cloneFunction.originalFunction = this
cloneFunction.index = i
cloneFunction.addOrder = this.addOrder :+ (i+1)
cloneFunction.memoryLimit = this.memoryLimit
// Setup the fields on the clone function, outputing each as a relative file in the sg directory.
// Setup the fields on the clone function, outputting each as a relative file in the sg directory.
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)
var scatterPart = new File(originalInput.getName)
cloneFunction.setFieldValue(scatterField, scatterPart)
@ -137,7 +134,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
}
cloneFunctions :+= cloneFunction
tempDirectories :+= cloneFunction.commandDirectory
}
functions ++= cloneFunctions
@ -235,6 +231,24 @@ trait ScatterGatherableFunction extends CommandLineFunction {
this.setupCloneFunction(cloneFunction, index)
}
/**
* Copies standard values from this function to the just created function.
* @param newFunction newly created function.
*/
protected def syncFunction(newFunction: QFunction) = {
newFunction.isIntermediate = this.isIntermediate
newFunction.analysisName = this.analysisName
newFunction.qSettings = this.qSettings
newFunction.jobTempDir = this.jobTempDir
newFunction.jobName = this.jobName
newFunction match {
case newCLFFunction: CommandLineFunction =>
newCLFFunction.jobQueue = this.jobQueue
newCLFFunction.jobProject = this.jobProject
case _ => /* ignore */
}
}
/**
* Returns a temporary directory under this scatter gather directory.
* @param Sub directory under the scatter gather directory.