Queue now utilizes .[file].done to allow skipping of previous jobs, if they have been completed. This is, unfortunately, reliant on a python script to do the post-execution touching of .done files.
That is to say, proper resumability is live (but not extensively tested) git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4312 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
f64b6fddc1
commit
b24172c80f
|
|
@ -220,7 +220,7 @@ class fullCallingPipeline extends QScript {
|
|||
clusters.DBSNP = qscript.dbSNP
|
||||
val clusters_clusterFile = swapExt(new File(snps.out.getAbsolutePath),".vcf",".cluster")
|
||||
clusters.clusterFile = clusters_clusterFile
|
||||
clusters.memoryLimit = Some(8)
|
||||
clusters.memoryLimit = Some(4)
|
||||
clusters.jobQueue = "gsa"
|
||||
|
||||
clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun")
|
||||
|
|
|
|||
|
|
@ -32,6 +32,10 @@ class LsfJobRunner extends DispatchJobRunner with Logging {
|
|||
if (function.memoryLimit.isDefined)
|
||||
job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]")
|
||||
|
||||
if ( ! function.commandLine.contains("mkdir")) // wild hack -- ignore mkdirs ??
|
||||
job.postExecCommand = function.doneOutputs.foldLeft("python /humgen/gsa-scr1/chartl/sting/python/lsf_post_touch.py ")((b,a) => b + a.getAbsolutePath+" ")
|
||||
// hacky trailing space, so sue me -- CH
|
||||
|
||||
val previous: Iterable[LsfJob] =
|
||||
if (function.isInstanceOf[DispatchWaitFunction]) {
|
||||
job.waitForCompletion = true
|
||||
|
|
@ -39,7 +43,7 @@ class LsfJobRunner extends DispatchJobRunner with Logging {
|
|||
} else {
|
||||
previousJobs(function, qGraph)
|
||||
}
|
||||
|
||||
|
||||
mountCommand(function) match {
|
||||
case Some(command) => job.preExecCommand = command
|
||||
case None => /* ignore */
|
||||
|
|
|
|||
|
|
@ -93,8 +93,10 @@ trait CommandLineFunction extends QFunction with Logging {
|
|||
*/
|
||||
def outputs = getFieldFiles(outputFields)
|
||||
|
||||
def doneOutputs = getDoneFiles(outputFields)
|
||||
|
||||
/**
|
||||
* Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either.
|
||||
* Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either.
|
||||
* @param fields Fields to get files.
|
||||
* @return Set[File] for the fields.
|
||||
*/
|
||||
|
|
@ -112,12 +114,18 @@ trait CommandLineFunction extends QFunction with Logging {
|
|||
*/
|
||||
def upToDate = {
|
||||
val inputFiles = inputs
|
||||
val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile))
|
||||
if (outputFiles.size > 0 && outputFiles.forall(_.exists)) {
|
||||
val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified))
|
||||
val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified))
|
||||
maxInput < minOutput
|
||||
} else false
|
||||
if ( doneOutputs.size > 0 && doneOutputs.forall(_.exists) ) {
|
||||
val maxInput = inputFiles.foldLeft(Long.MinValue)((date,file) => date.max(file.lastModified))
|
||||
val minDone = doneOutputs.foldLeft(Long.MaxValue)((date,file) => date.min(file.lastModified))
|
||||
maxInput < minDone
|
||||
} else false
|
||||
//val inputFiles = inputs
|
||||
//val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile))
|
||||
//if (outputFiles.size > 0 && outputFiles.forall(_.exists)) {
|
||||
// val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified))
|
||||
// val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified))
|
||||
// maxInput < minOutput
|
||||
//} else false
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -136,7 +144,48 @@ trait CommandLineFunction extends QFunction with Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Gets the file from the field. The field must be a File or a FileExtension and not a List or Set.
|
||||
* Gets the done files from the field. The field must be a File, a FileExtension, or a List or Set of either.
|
||||
* @param fields Field to get files.
|
||||
* @return Set[File] set of done files for the field.
|
||||
*/
|
||||
private def getDoneFiles(fields: List[ArgumentSource]): Set[File] = {
|
||||
var doneFiles = Set.empty[File]
|
||||
for ( field <- fields ) {
|
||||
CollectionUtils.foreach(getFieldValue(field), (fieldValue) => {
|
||||
val outFile = fieldValueToFile(field,fieldValue)
|
||||
if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory && ! outFile.getName.endsWith(".out")) {
|
||||
doneFiles += new File(outFile.getParent + "/." + outFile.getName + ".done")
|
||||
}
|
||||
})
|
||||
}
|
||||
doneFiles
|
||||
|
||||
//for ( outFile <- outFiles ) {
|
||||
// if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory )
|
||||
// doneFiles += new File(outFile.getParent + "." + outFile.getName + ".done")
|
||||
//}
|
||||
//doneFiles
|
||||
}
|
||||
|
||||
/**
|
||||
* Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different
|
||||
* @return boolean -- if files are different
|
||||
*/
|
||||
private def filesAreDifferent(a: File, b: File): Boolean = {
|
||||
if ( b == null )
|
||||
if ( a == null )
|
||||
return false
|
||||
else
|
||||
return true
|
||||
else
|
||||
if ( a == null )
|
||||
return true
|
||||
else
|
||||
return ! b.getAbsolutePath.equals(a.getAbsolutePath)
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the file from the field. The field must be a File or a FileExtension and not a List or Set.
|
||||
* @param field Field to get the file.
|
||||
* @return File for the field.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Reference in New Issue