diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 07c03d289..ed71b586c 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -484,7 +484,7 @@ class QGraph extends Logging { } private def failedDescription(failed: FunctionEdge) = { - var description = new StringBuilder + val description = new StringBuilder if (settings.retries > 0) description.append("Attempt %d of %d.%n".format(failed.retries + 1, settings.retries + 1)) description.append(failed.function.description) @@ -749,7 +749,7 @@ class QGraph extends Logging { if (running && !settings.keepIntermediates && success) { logger.info("Deleting intermediate files.") traverseFunctions(edge => { - if (edge.function.isIntermediate) { + if (edge.function.isIntermediate && edge.function.deleteIntermediateOutputs) { logger.debug("Deleting intermediates:" + edge.function.description) edge.function.deleteOutputs() } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 5c77f53bb..707ae8f13 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -55,6 +55,12 @@ trait QFunction extends Logging { */ var isIntermediate = false + /** + * If true and isIntermediate is true, the files listed + * via outputs will deleted after the command completes. + */ + var deleteIntermediateOutputs = true + /** * Copies settings from this function to another function. * @param function QFunction to copy values to. @@ -70,6 +76,7 @@ trait QFunction extends Logging { function.jobRestartable = this.jobRestartable function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate + function.deleteIntermediateOutputs = this.deleteIntermediateOutputs } /** File to redirect any output. Defaults to .out */ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 3d09c6061..28c3e89a2 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -13,6 +13,9 @@ trait GatherFunction extends QFunction { @Input(doc="Parts to gather back into the original output") var gatherParts: List[File] = Nil + @Input(doc="Other log files that will be gathered before this output", required=false) + var originalLogFiles: List[File] = Nil + @Output(doc="The original output of the scattered function") var originalOutput: File = _ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 18f60a28f..91561609b 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -82,6 +82,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Ask the scatter function how many clones to create. val numClones = scatterFunction.scatterCount + // List of the log files that are output by this function. + var logFiles = List(this.jobOutputFile) + if (this.jobErrorFile != null) + logFiles :+= this.jobErrorFile + // Create the gather functions for each output field var gatherFunctions = Map.empty[ArgumentSource, GatherFunction] var gatherOutputs = Map.empty[ArgumentSource, File] @@ -92,12 +97,23 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(gatherFunction) gatherFunction.addOrder = this.addOrder :+ gatherAddOrder gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) - gatherFunction.originalOutput = this.getFieldFile(gatherField) + gatherFunction.originalOutput = gatherOutput initGatherFunction(gatherFunction, gatherField) functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction gatherOutputs += gatherField -> gatherOutput gatherAddOrder += 1 + + // If this is a gather for a log file, make the gather intermediate just in case the log file name changes + // Otherwise have the regular output function wait on the log files to gather + if (isLogFile(gatherOutput)) { + gatherFunction.isIntermediate = true + // Only delete the log files if the original function is an intermediate + // and the intermediate files are supposed to be deleted + gatherFunction.deleteIntermediateOutputs = this.isIntermediate && this.deleteIntermediateOutputs + } else { + gatherFunction.originalLogFiles = logFiles + } } // Create the clone functions for running the parallel jobs @@ -122,7 +138,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Allow the script writer to change the paths to the files. initCloneFunction(cloneFunction, i) - // If the command directory is relative, insert the run directory ahead of it. cloneFunction.absoluteCommandDirectory()