From dee130ad1b042d604a3ff9e6453203402f1dfd3f Mon Sep 17 00:00:00 2001 From: kshakir Date: Mon, 21 Feb 2011 22:04:35 +0000 Subject: [PATCH] Gather the log files before the actual outputs and mark the log files gatherers as intermediates. Since the outputs will only be gathered iff the logs were gathered this allows the job name to change without causing SG to re-run. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5285 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/queue/engine/QGraph.scala | 4 ++-- .../sting/queue/function/QFunction.scala | 7 +++++++ .../scattergather/GatherFunction.scala | 3 +++ .../ScatterGatherableFunction.scala | 19 +++++++++++++++++-- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 07c03d289..ed71b586c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -484,7 +484,7 @@ class QGraph extends Logging { } private def failedDescription(failed: FunctionEdge) = { - var description = new StringBuilder + val description = new StringBuilder if (settings.retries > 0) description.append("Attempt %d of %d.%n".format(failed.retries + 1, settings.retries + 1)) description.append(failed.function.description) @@ -749,7 +749,7 @@ class QGraph extends Logging { if (running && !settings.keepIntermediates && success) { logger.info("Deleting intermediate files.") traverseFunctions(edge => { - if (edge.function.isIntermediate) { + if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs) { logger.debug("Deleting intermediates:" + edge.function.description) edge.function.deleteOutputs() } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 5c77f53bb..707ae8f13 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -55,6 +55,12 @@ trait QFunction extends Logging { */ var isIntermediate = false + /** + * If true and isIntermediate is true, the files listed + * via outputs will deleted after the command completes. + */ + var deleteIntermediateOutputs = true + /** * Copies settings from this function to another function. * @param function QFunction to copy values to. @@ -70,6 +76,7 @@ trait QFunction extends Logging { function.jobRestartable = this.jobRestartable function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate + function.deleteIntermediateOutputs = this.deleteIntermediateOutputs } /** File to redirect any output. Defaults to .out */ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 3d09c6061..28c3e89a2 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -13,6 +13,9 @@ trait GatherFunction extends QFunction { @Input(doc="Parts to gather back into the original output") var gatherParts: List[File] = Nil + @Input(doc="Other log files that will be gathered before this output", required=false) + var originalLogFiles: List[File] = Nil + @Output(doc="The original output of the scattered function") var originalOutput: File = _ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 18f60a28f..91561609b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -82,6 +82,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Ask the scatter function how many clones to create. val numClones = scatterFunction.scatterCount + // List of the log files that are output by this function. + var logFiles = List(this.jobOutputFile) + if (this.jobErrorFile != null) + logFiles :+= this.jobErrorFile + // Create the gather functions for each output field var gatherFunctions = Map.empty[ArgumentSource, GatherFunction] var gatherOutputs = Map.empty[ArgumentSource, File] @@ -92,12 +97,23 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(gatherFunction) gatherFunction.addOrder = this.addOrder :+ gatherAddOrder gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) - gatherFunction.originalOutput = this.getFieldFile(gatherField) + gatherFunction.originalOutput = gatherOutput initGatherFunction(gatherFunction, gatherField) functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction gatherOutputs += gatherField -> gatherOutput gatherAddOrder += 1 + + // If this is a gather for a log file, make the gather intermediate just in case the log file name changes + // Otherwise have the regular output function wait on the log files to gather + if (isLogFile(gatherOutput)) { + gatherFunction.isIntermediate = true + // Only delete the log files if the original function is an intermediate + // and the intermediate files are supposed to be deleted + gatherFunction.deleteIntermediateOutputs = this.isIntermediate && this.deleteIntermediateOutputs + } else { + gatherFunction.originalLogFiles = logFiles + } } // Create the clone functions for running the parallel jobs @@ -122,7 +138,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Allow the script writer to change the paths to the files. initCloneFunction(cloneFunction, i) - // If the command directory is relative, insert the run directory ahead of it. cloneFunction.absoluteCommandDirectory()