diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index 8a6cbc537..38dd24994 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -58,6 +58,9 @@ class fullCallingPipeline extends QScript { //@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection") //var protocol: String = _ + @Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false) + var big_mem_queue: String = _ + private var pipeline: Pipeline = _ trait CommandLineGATKArgs extends CommandLineGATK { @@ -109,6 +112,7 @@ class fullCallingPipeline extends QScript { targetCreator.input_file :+= bam targetCreator.out = indel_targets targetCreator.memoryLimit = Some(2) + targetCreator.isIntermediate = true val realigner = new IndelRealigner with CommandLineGATKArgs realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId)) @@ -117,6 +121,7 @@ class fullCallingPipeline extends QScript { realigner.intervals = qscript.contigIntervals realigner.targetIntervals = targetCreator.out realigner.scatterCount = contigCount + realigner.isIntermediate = true // may need to explicitly run fix mates var fixMates = new PicardBamJarFunction { @@ -170,6 +175,7 @@ class fullCallingPipeline extends QScript { fixMates.unfixed = realigner.out fixMates.fixed = cleaned_bam fixMates.analysisName = "FixMates_"+sampleId + fixMates.isIntermediate = true // Add the fix mates explicitly } @@ -177,6 +183,7 @@ class fullCallingPipeline extends QScript { samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId)) samtoolsindex.bamFile = cleaned_bam samtoolsindex.analysisName = "index_cleaned_"+sampleId + samtoolsindex.isIntermediate = true if (!qscript.skip_cleaning) { if ( realigner.scatterCount > 1 ) { @@ -282,7 +289,7 @@ class fullCallingPipeline extends QScript { mergeIndels.rodBind = indelCallFiles mergeIndels.analysisName = base+"_MergeIndels" mergeIndels.memoryLimit = Some(16) - mergeIndels.jobQueue = "gsa" + mergeIndels.jobQueue = qscript.big_mem_queue // 1b. genomically annotate SNPs -- no longer slow val annotated = new GenomicAnnotator with CommandLineGATKArgs @@ -326,7 +333,7 @@ class fullCallingPipeline extends QScript { val clusters_clusterFile = swapExt("SnpCalls/IntermediateFiles",snps.out,".vcf",".cluster") clusters.clusterFile = clusters_clusterFile clusters.memoryLimit = Some(6) - clusters.jobQueue = "gsa" + clusters.jobQueue = qscript.big_mem_queue clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun") clusters.analysisName = base+"_Cluster" diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index f560225cd..ac8885f5a 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -86,6 +86,7 @@ class QCommandLine extends CommandLineProgram with Logging { if (qGraph.hasFailed) { logger.info("Done with errors") + qGraph.logFailed 1 } else { logger.info("Done") diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 632466815..84d9b7e2b 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -29,6 +29,10 @@ class FunctionEdge(var function: QFunction) extends QEdge { currentStatus } + def markAsSkipped() = { + currentStatus = RunnerStatus.SKIPPED + } + def resetToPending() = { currentStatus = RunnerStatus.PENDING function.doneOutputs.foreach(_.delete()) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index ac51fcf03..7aa0920bb 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -13,6 +13,7 @@ import org.broadinstitute.sting.queue.{QSettings, QException} import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging} +import org.apache.commons.lang.StringUtils /** * The internal dependency tracker between sets of function input and output files. @@ -135,14 +136,14 @@ class QGraph extends Logging { * @param qGraph The graph that contains the jobs. * @return A list of prior jobs. */ - def previousFunctions(edge: QEdge) : List[FunctionEdge] = { + private def previousFunctions(edge: QEdge): List[FunctionEdge] = { var previous = List.empty[FunctionEdge] val source = this.jobGraph.getEdgeSource(edge) for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { incomingEdge match { - // Stop recursing when we find a job along the edge and return its job id + // Stop recursing when we find a job along the edge and return its job id case functionEdge: FunctionEdge => previous :+= functionEdge // For any other type of edge find the jobs preceding the edge @@ -266,11 +267,10 @@ class QGraph extends Logging { private def runJobs() = { try { traverseFunctions(edge => { - val isDone = !this.startClean && - edge.status == RunnerStatus.DONE && - this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE) - if (!isDone) + if (startClean) edge.resetToPending() + else + checkDone(edge) }) var readyJobs = getReadyJobs @@ -312,6 +312,43 @@ class QGraph extends Logging { } } + /** + * Checks if an edge is done or if it's an intermediate edge if it can be skipped. + * This function may modify previous edges if it discovers that the edge passed in + * is dependent jobs that were previously marked as skipped. + * @param edge Edge to check to see if it's done or can be skipped. + */ + private def checkDone(edge: FunctionEdge) = { + if (edge.function.isIntermediate) { + // By default we do not need to run intermediate edges. + // Mark any intermediate edges as skipped, if they're not already done. + if (edge.status != RunnerStatus.DONE) + edge.markAsSkipped() + } else { + val previous = this.previousFunctions(edge) + val isDone = edge.status == RunnerStatus.DONE && + previous.forall(edge => edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) + if (!isDone) { + edge.resetToPending() + resetPreviousSkipped(edge, previous) + } + } + } + + /** + * From the previous edges, resets any that are marked as skipped to pending. + * If those that are reset have skipped edges, those skipped edges are recursively also set + * to pending. + * @param edge Dependent edge. + * @param previous Previous edges that provide inputs to edge. + */ + private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge]): Unit = { + for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) { + previousEdge.resetToPending() + resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge)) + } + } + private def newRunner(f: QFunction) = { f match { case cmd: CommandLineFunction => @@ -403,6 +440,7 @@ class QGraph extends Logging { var total = 0 var done = 0 var failed = 0 + var skipped = 0 } /** @@ -442,18 +480,25 @@ class QGraph extends Logging { }) statuses.foreach(status => { - if (status.scatter.total + status.gather.total > 0) { + val sgTotal = status.scatter.total + status.gather.total + val sgDone = status.scatter.done + status.gather.done + val sgFailed = status.scatter.failed + status.gather.failed + val sgSkipped = status.scatter.skipped + status.gather.skipped + if (sgTotal > 0) { var sgStatus = RunnerStatus.PENDING - if (status.scatter.failed + status.gather.failed > 0) + if (sgFailed > 0) sgStatus = RunnerStatus.FAILED - else if (status.scatter.done + status.gather.done == status.scatter.total + status.gather.total) + else if (sgDone == sgTotal) sgStatus = RunnerStatus.DONE - else if (status.scatter.done + status.gather.done > 0) + else if (sgDone + sgSkipped == sgTotal) + sgStatus = RunnerStatus.SKIPPED + else if (sgDone > 0) sgStatus = RunnerStatus.RUNNING status.status = sgStatus } - var info = ("%-" + maxWidth + "s [%#7s]").format(status.analysisName, status.status) + var info = ("%-" + maxWidth + "s [%s]") + .format(status.analysisName, StringUtils.center(status.status.toString, 7)) if (status.scatter.total + status.gather.total > 1) { info += formatSGStatus(status.scatter, "s") info += formatSGStatus(status.gather, "g") @@ -478,12 +523,9 @@ class QGraph extends Logging { private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = { stats.total += 1 edge.status match { - case RunnerStatus.DONE => { - stats.done += 1 - } - case RunnerStatus.FAILED => { - stats.failed += 1 - } + case RunnerStatus.DONE => stats.done += 1 + case RunnerStatus.FAILED => stats.failed += 1 + case RunnerStatus.SKIPPED => stats.skipped += 1 /* can't tell the difference between pending and running right now! */ case RunnerStatus.PENDING => case RunnerStatus.RUNNING => @@ -661,6 +703,18 @@ class QGraph extends Logging { }) } + def logFailed = { + foreachFunction(edge => { + if (edge.status == RunnerStatus.FAILED) { + logger.error("-----") + logger.error("Failed: " + edge.function.description) + logger.error("Log: " + edge.function.jobOutputFile.getAbsolutePath) + if (edge.function.jobErrorFile != null) + logger.error("Error: " + edge.function.jobErrorFile.getAbsolutePath) + } + }) + } + /** * Kills any forked jobs still running. */ diff --git a/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala index 78c834616..1e5a856ca 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/RunnerStatus.scala @@ -5,4 +5,5 @@ object RunnerStatus extends Enumeration { val RUNNING = Value("running") val FAILED = Value("failed") val DONE = Value("done") + val SKIPPED = Value("skipped") } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index a0cee7b16..695fe629b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -116,6 +116,12 @@ trait QFunction { def outputFields = QFunction.classFields(this.functionFieldClass).outputFields /** The @Argument fields on this CommandLineFunction. */ def argumentFields = QFunction.classFields(this.functionFieldClass).argumentFields + + /** + * If true, unless another unfinished function is dependent on this function, + * this function will NOT be run even if the outputs have not been created. + */ + var isIntermediate = false /** * Returns the class that should be used for looking up fields. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index 2963484ea..16748bf6a 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -12,49 +12,38 @@ class CloneFunction extends CommandLineFunction { var index: Int = _ private var overriddenFields = Map.empty[ArgumentSource, Any] + private var withScatterPartCount = 0 private def withScatterPart[A](f: () => A): A = { var originalValues = Map.empty[ArgumentSource, Any] - overriddenFields.foreach{ - case (field, overrideValue) => { - originalValues += field -> originalFunction.getFieldValue(field) - originalFunction.setFieldValue(field, overrideValue) + withScatterPartCount += 1 + if (withScatterPartCount == 1) { + overriddenFields.foreach{ + case (field, overrideValue) => { + originalValues += field -> originalFunction.getFieldValue(field) + originalFunction.setFieldValue(field, overrideValue) + } } } try { f() } finally { - originalValues.foreach{ - case (name, value) => - originalFunction.setFieldValue(name, value) + if (withScatterPartCount == 1) { + originalValues.foreach{ + case (name, value) => + originalFunction.setFieldValue(name, value) + } } + withScatterPartCount -= 1 } } - override def dotString = originalFunction.dotString - override def description = originalFunction.description + override def dotString = withScatterPart(() => originalFunction.dotString) + override def description = withScatterPart(() => originalFunction.description) override protected def functionFieldClass = originalFunction.getClass override def useStatusOutput(file: File) = file != jobOutputFile && file != jobErrorFile && originalFunction.useStatusOutput(file) - override def freezeFieldValues = { - if (this.analysisName == null) - this.analysisName = originalFunction.analysisName - if (this.qSettings == null) - this.qSettings = originalFunction.qSettings - if (this.memoryLimit.isEmpty && originalFunction.memoryLimit.isDefined) - this.memoryLimit = originalFunction.memoryLimit - if (this.jobTempDir == null) - this.jobTempDir = originalFunction.jobTempDir - if (this.jobQueue == null) - this.jobQueue = originalFunction.jobQueue - if (this.jobProject == null) - this.jobProject = originalFunction.jobProject - if (this.jobName == null) - this.jobName = originalFunction.jobName - super.freezeFieldValues - } - def commandLine = withScatterPart(() => originalFunction.commandLine) override def getFieldValue(source: ArgumentSource) = { diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 732c67314..2aebfef3d 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -63,7 +63,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ def generateFunctions() = { var functions = List.empty[QFunction] - var tempDirectories = List.empty[File] // Only depend on input fields that have a value val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_)) @@ -74,14 +73,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Create the scatter function based on @Scatter val scatterFunction = this.newScatterFunction(this.scatterField) + syncFunction(scatterFunction) scatterFunction.addOrder = this.addOrder :+ 1 - scatterFunction.analysisName = this.analysisName - scatterFunction.qSettings = this.qSettings scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) scatterFunction.originalInput = originalInput scatterFunction.setOriginalFunction(this, scatterField) initScatterFunction(scatterFunction, this.scatterField) - tempDirectories :+= scatterFunction.commandDirectory functions :+= scatterFunction // Create the gather functions for each output field @@ -91,14 +88,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { for (gatherField <- outputFieldsWithValues) { val gatherFunction = this.newGatherFunction(gatherField) val gatherOutput = getFieldFile(gatherField) + syncFunction(gatherFunction) gatherFunction.addOrder = this.addOrder :+ gatherAddOrder - gatherFunction.analysisName = this.analysisName - gatherFunction.qSettings = this.qSettings gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) gatherFunction.originalOutput = this.getFieldFile(gatherField) gatherFunction.setOriginalFunction(this, gatherField) initGatherFunction(gatherFunction, gatherField) - tempDirectories :+= gatherFunction.commandDirectory functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction gatherOutputs += gatherField -> gatherOutput @@ -110,11 +105,13 @@ trait ScatterGatherableFunction extends CommandLineFunction { for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() + syncFunction(cloneFunction) cloneFunction.originalFunction = this cloneFunction.index = i cloneFunction.addOrder = this.addOrder :+ (i+1) + cloneFunction.memoryLimit = this.memoryLimit - // Setup the fields on the clone function, outputing each as a relative file in the sg directory. + // Setup the fields on the clone function, outputting each as a relative file in the sg directory. cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i) var scatterPart = new File(originalInput.getName) cloneFunction.setFieldValue(scatterField, scatterPart) @@ -137,7 +134,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { } cloneFunctions :+= cloneFunction - tempDirectories :+= cloneFunction.commandDirectory } functions ++= cloneFunctions @@ -235,6 +231,24 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.setupCloneFunction(cloneFunction, index) } + /** + * Copies standard values from this function to the just created function. + * @param newFunction newly created function. + */ + protected def syncFunction(newFunction: QFunction) = { + newFunction.isIntermediate = this.isIntermediate + newFunction.analysisName = this.analysisName + newFunction.qSettings = this.qSettings + newFunction.jobTempDir = this.jobTempDir + newFunction.jobName = this.jobName + newFunction match { + case newCLFFunction: CommandLineFunction => + newCLFFunction.jobQueue = this.jobQueue + newCLFFunction.jobProject = this.jobProject + case _ => /* ignore */ + } + } + /** * Returns a temporary directory under this scatter gather directory. * @param Sub directory under the scatter gather directory.