From c58e02a3bd106dd9ee59cee08d60ae8ab647a20d Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Fri, 25 Jan 2013 14:16:39 -0500 Subject: [PATCH] Added a QFunction.jobLocalDir for optionally tracking a node local directory that may have faster intermediate storage, with SGF ensuring that if the directory happens to be on the same machine that it get's a clone specific sub-directory to avoid collisions. --- .../sting/queue/function/QFunction.scala | 18 ++++++++++++++-- .../ScatterGatherableFunction.scala | 21 ++++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 1b54231f3..81c76dd29 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -59,9 +59,17 @@ trait QFunction extends Logging with QJobReport { /** Directory to run the command in. */ var commandDirectory: File = new File(".") - /** Temporary directory to write any files */ + /** Temporary directory to write any files. Must be network accessible. */ var jobTempDir: File = null + /** + * Local path available on all machines to store LOCAL temporary files. Not an @Input, + * nor an @Output. Currently only used for local intermediate files for composite jobs. + * Needs to be an annotated field so that it's mutated during cloning. + */ + @Argument(doc="Local path available on all machines to store LOCAL temporary files.") + var jobLocalDir: File = _ + /** Order the function was added to the graph. */ var addOrder: Seq[Int] = Nil @@ -97,6 +105,7 @@ trait QFunction extends Logging with QJobReport { function.qSettings = this.qSettings function.commandDirectory = this.commandDirectory function.jobTempDir = this.jobTempDir + function.jobLocalDir = this.jobLocalDir function.addOrder = this.addOrder function.jobPriority = this.jobPriority function.jobRestartable = this.jobRestartable @@ -232,6 +241,7 @@ trait QFunction extends Logging with QJobReport { var dirs = Set.empty[File] dirs += commandDirectory dirs += jobTempDir + dirs += jobLocalDir dirs += jobOutputFile.getParentFile if (jobErrorFile != null) dirs += jobErrorFile.getParentFile @@ -370,11 +380,15 @@ trait QFunction extends Logging with QJobReport { if (jobTempDir == null) jobTempDir = qSettings.tempDirectory + if (jobLocalDir == null) + jobLocalDir = jobTempDir + if (jobPriority.isEmpty) jobPriority = qSettings.jobPriority - // Do not set the temp dir relative to the command directory + // Do not set the temp and local dir relative to the command directory jobTempDir = IOUtils.absolute(jobTempDir) + jobLocalDir = IOUtils.absolute(jobLocalDir) absoluteCommandDirectory() } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 66fdf0f8f..67138eb75 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -109,7 +109,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(scatterFunction) scatterFunction.originalFunction = this scatterFunction.originalInputs = inputFiles - scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter") + scatterFunction.commandDirectory = this.scatterGatherCommandDir("scatter") scatterFunction.jobOutputFile = new File("scatter.out") scatterFunction.addOrder = this.addOrder :+ 1 scatterFunction.isIntermediate = true @@ -154,7 +154,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(gatherFunction) gatherFunction.originalFunction = this gatherFunction.originalOutput = gatherOutput - gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) + gatherFunction.commandDirectory = this.scatterGatherCommandDir("gather-" + gatherField.field.getName) gatherFunction.jobOutputFile = new File("gather-" + gatherOutput.getName + ".out") gatherFunction.addOrder = this.addOrder :+ gatherAddOrder @@ -178,10 +178,14 @@ trait ScatterGatherableFunction extends CommandLineFunction { cloneFunction.analysisName = this.analysisName cloneFunction.cloneIndex = i cloneFunction.cloneCount = numClones - cloneFunction.commandDirectory = this.scatterGatherTempDir(dirFormat.format(i)) + cloneFunction.commandDirectory = this.scatterGatherCommandDir(dirFormat.format(i)) cloneFunction.jobOutputFile = if (IOUtils.isSpecialFile(this.jobOutputFile)) this.jobOutputFile else new File(this.jobOutputFile.getName) if (this.jobErrorFile != null) cloneFunction.jobErrorFile = if (IOUtils.isSpecialFile(this.jobErrorFile)) this.jobErrorFile else new File(this.jobErrorFile.getName) + // jic the "local" dir is actually on the network, create different sub local directories for each clone. + // This might be better handled with a hook that allows clones to create unique file names. Right now no hook + // like freezeFieldValues exists for specifying per cloneFunction fields. + cloneFunction.jobLocalDir = this.scatterGatherLocalDir(dirFormat.format(i)) cloneFunction.addOrder = this.addOrder :+ (i+1) cloneFunction.isIntermediate = true @@ -350,7 +354,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { this.copySettingsTo(gatherLogFunction) gatherLogFunction.logs = functions.map(logFile).filter(_ != null) gatherLogFunction.jobOutputFile = logFile(this) - gatherLogFunction.commandDirectory = this.scatterGatherTempDir() + gatherLogFunction.commandDirectory = this.scatterGatherCommandDir() gatherLogFunction.addOrder = this.addOrder :+ addOrder gatherLogFunction.isIntermediate = false gatherLogFunction @@ -361,5 +365,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param subDir directory under the scatter gather directory. * @return temporary directory under this scatter gather directory. */ - private def scatterGatherTempDir(subDir: String = "") = IOUtils.absolute(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) + private def scatterGatherCommandDir(subDir: String = "") = IOUtils.absolute(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir) + + /** + * Returns a sub directory under this job local directory. + * @param subDir directory under the job local directory. + * @return absolute path to a directory under the original job local directory. + */ + private def scatterGatherLocalDir(subDir: String = "") = IOUtils.absolute(this.jobLocalDir, this.jobName + "-sg/" + subDir) }