Added a QFunction.jobLocalDir for optionally tracking a node local directory that may have faster intermediate storage, with SGF ensuring that if the directory happens to be on the same machine that it get's a clone specific sub-directory to avoid collisions.
This commit is contained in:
parent
fc22a5c71c
commit
c58e02a3bd
|
|
@ -59,9 +59,17 @@ trait QFunction extends Logging with QJobReport {
|
||||||
/** Directory to run the command in. */
|
/** Directory to run the command in. */
|
||||||
var commandDirectory: File = new File(".")
|
var commandDirectory: File = new File(".")
|
||||||
|
|
||||||
/** Temporary directory to write any files */
|
/** Temporary directory to write any files. Must be network accessible. */
|
||||||
var jobTempDir: File = null
|
var jobTempDir: File = null
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Local path available on all machines to store LOCAL temporary files. Not an @Input,
|
||||||
|
* nor an @Output. Currently only used for local intermediate files for composite jobs.
|
||||||
|
* Needs to be an annotated field so that it's mutated during cloning.
|
||||||
|
*/
|
||||||
|
@Argument(doc="Local path available on all machines to store LOCAL temporary files.")
|
||||||
|
var jobLocalDir: File = _
|
||||||
|
|
||||||
/** Order the function was added to the graph. */
|
/** Order the function was added to the graph. */
|
||||||
var addOrder: Seq[Int] = Nil
|
var addOrder: Seq[Int] = Nil
|
||||||
|
|
||||||
|
|
@ -97,6 +105,7 @@ trait QFunction extends Logging with QJobReport {
|
||||||
function.qSettings = this.qSettings
|
function.qSettings = this.qSettings
|
||||||
function.commandDirectory = this.commandDirectory
|
function.commandDirectory = this.commandDirectory
|
||||||
function.jobTempDir = this.jobTempDir
|
function.jobTempDir = this.jobTempDir
|
||||||
|
function.jobLocalDir = this.jobLocalDir
|
||||||
function.addOrder = this.addOrder
|
function.addOrder = this.addOrder
|
||||||
function.jobPriority = this.jobPriority
|
function.jobPriority = this.jobPriority
|
||||||
function.jobRestartable = this.jobRestartable
|
function.jobRestartable = this.jobRestartable
|
||||||
|
|
@ -232,6 +241,7 @@ trait QFunction extends Logging with QJobReport {
|
||||||
var dirs = Set.empty[File]
|
var dirs = Set.empty[File]
|
||||||
dirs += commandDirectory
|
dirs += commandDirectory
|
||||||
dirs += jobTempDir
|
dirs += jobTempDir
|
||||||
|
dirs += jobLocalDir
|
||||||
dirs += jobOutputFile.getParentFile
|
dirs += jobOutputFile.getParentFile
|
||||||
if (jobErrorFile != null)
|
if (jobErrorFile != null)
|
||||||
dirs += jobErrorFile.getParentFile
|
dirs += jobErrorFile.getParentFile
|
||||||
|
|
@ -370,11 +380,15 @@ trait QFunction extends Logging with QJobReport {
|
||||||
if (jobTempDir == null)
|
if (jobTempDir == null)
|
||||||
jobTempDir = qSettings.tempDirectory
|
jobTempDir = qSettings.tempDirectory
|
||||||
|
|
||||||
|
if (jobLocalDir == null)
|
||||||
|
jobLocalDir = jobTempDir
|
||||||
|
|
||||||
if (jobPriority.isEmpty)
|
if (jobPriority.isEmpty)
|
||||||
jobPriority = qSettings.jobPriority
|
jobPriority = qSettings.jobPriority
|
||||||
|
|
||||||
// Do not set the temp dir relative to the command directory
|
// Do not set the temp and local dir relative to the command directory
|
||||||
jobTempDir = IOUtils.absolute(jobTempDir)
|
jobTempDir = IOUtils.absolute(jobTempDir)
|
||||||
|
jobLocalDir = IOUtils.absolute(jobLocalDir)
|
||||||
|
|
||||||
absoluteCommandDirectory()
|
absoluteCommandDirectory()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
|
||||||
this.copySettingsTo(scatterFunction)
|
this.copySettingsTo(scatterFunction)
|
||||||
scatterFunction.originalFunction = this
|
scatterFunction.originalFunction = this
|
||||||
scatterFunction.originalInputs = inputFiles
|
scatterFunction.originalInputs = inputFiles
|
||||||
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter")
|
scatterFunction.commandDirectory = this.scatterGatherCommandDir("scatter")
|
||||||
scatterFunction.jobOutputFile = new File("scatter.out")
|
scatterFunction.jobOutputFile = new File("scatter.out")
|
||||||
scatterFunction.addOrder = this.addOrder :+ 1
|
scatterFunction.addOrder = this.addOrder :+ 1
|
||||||
scatterFunction.isIntermediate = true
|
scatterFunction.isIntermediate = true
|
||||||
|
|
@ -154,7 +154,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
|
||||||
this.copySettingsTo(gatherFunction)
|
this.copySettingsTo(gatherFunction)
|
||||||
gatherFunction.originalFunction = this
|
gatherFunction.originalFunction = this
|
||||||
gatherFunction.originalOutput = gatherOutput
|
gatherFunction.originalOutput = gatherOutput
|
||||||
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
|
gatherFunction.commandDirectory = this.scatterGatherCommandDir("gather-" + gatherField.field.getName)
|
||||||
gatherFunction.jobOutputFile = new File("gather-" + gatherOutput.getName + ".out")
|
gatherFunction.jobOutputFile = new File("gather-" + gatherOutput.getName + ".out")
|
||||||
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
|
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
|
||||||
|
|
||||||
|
|
@ -178,10 +178,14 @@ trait ScatterGatherableFunction extends CommandLineFunction {
|
||||||
cloneFunction.analysisName = this.analysisName
|
cloneFunction.analysisName = this.analysisName
|
||||||
cloneFunction.cloneIndex = i
|
cloneFunction.cloneIndex = i
|
||||||
cloneFunction.cloneCount = numClones
|
cloneFunction.cloneCount = numClones
|
||||||
cloneFunction.commandDirectory = this.scatterGatherTempDir(dirFormat.format(i))
|
cloneFunction.commandDirectory = this.scatterGatherCommandDir(dirFormat.format(i))
|
||||||
cloneFunction.jobOutputFile = if (IOUtils.isSpecialFile(this.jobOutputFile)) this.jobOutputFile else new File(this.jobOutputFile.getName)
|
cloneFunction.jobOutputFile = if (IOUtils.isSpecialFile(this.jobOutputFile)) this.jobOutputFile else new File(this.jobOutputFile.getName)
|
||||||
if (this.jobErrorFile != null)
|
if (this.jobErrorFile != null)
|
||||||
cloneFunction.jobErrorFile = if (IOUtils.isSpecialFile(this.jobErrorFile)) this.jobErrorFile else new File(this.jobErrorFile.getName)
|
cloneFunction.jobErrorFile = if (IOUtils.isSpecialFile(this.jobErrorFile)) this.jobErrorFile else new File(this.jobErrorFile.getName)
|
||||||
|
// jic the "local" dir is actually on the network, create different sub local directories for each clone.
|
||||||
|
// This might be better handled with a hook that allows clones to create unique file names. Right now no hook
|
||||||
|
// like freezeFieldValues exists for specifying per cloneFunction fields.
|
||||||
|
cloneFunction.jobLocalDir = this.scatterGatherLocalDir(dirFormat.format(i))
|
||||||
cloneFunction.addOrder = this.addOrder :+ (i+1)
|
cloneFunction.addOrder = this.addOrder :+ (i+1)
|
||||||
cloneFunction.isIntermediate = true
|
cloneFunction.isIntermediate = true
|
||||||
|
|
||||||
|
|
@ -350,7 +354,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
|
||||||
this.copySettingsTo(gatherLogFunction)
|
this.copySettingsTo(gatherLogFunction)
|
||||||
gatherLogFunction.logs = functions.map(logFile).filter(_ != null)
|
gatherLogFunction.logs = functions.map(logFile).filter(_ != null)
|
||||||
gatherLogFunction.jobOutputFile = logFile(this)
|
gatherLogFunction.jobOutputFile = logFile(this)
|
||||||
gatherLogFunction.commandDirectory = this.scatterGatherTempDir()
|
gatherLogFunction.commandDirectory = this.scatterGatherCommandDir()
|
||||||
gatherLogFunction.addOrder = this.addOrder :+ addOrder
|
gatherLogFunction.addOrder = this.addOrder :+ addOrder
|
||||||
gatherLogFunction.isIntermediate = false
|
gatherLogFunction.isIntermediate = false
|
||||||
gatherLogFunction
|
gatherLogFunction
|
||||||
|
|
@ -361,5 +365,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
|
||||||
* @param subDir directory under the scatter gather directory.
|
* @param subDir directory under the scatter gather directory.
|
||||||
* @return temporary directory under this scatter gather directory.
|
* @return temporary directory under this scatter gather directory.
|
||||||
*/
|
*/
|
||||||
private def scatterGatherTempDir(subDir: String = "") = IOUtils.absolute(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir)
|
private def scatterGatherCommandDir(subDir: String = "") = IOUtils.absolute(this.scatterGatherDirectory, this.jobName + "-sg/" + subDir)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a sub directory under this job local directory.
|
||||||
|
* @param subDir directory under the job local directory.
|
||||||
|
* @return absolute path to a directory under the original job local directory.
|
||||||
|
*/
|
||||||
|
private def scatterGatherLocalDir(subDir: String = "") = IOUtils.absolute(this.jobLocalDir, this.jobName + "-sg/" + subDir)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue