From b24172c80fb2285c188775e5a54bbbda1fead575 Mon Sep 17 00:00:00 2001 From: chartl Date: Mon, 20 Sep 2010 00:16:53 +0000 Subject: [PATCH] Queue now utilizes .[file].done to allow skipping of previous jobs, if they have been completed. This is, unfortunately, reliant on a python script to do the post-execution touching of .done files. That is to say, proper resumability is live (but not extensively tested) git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4312 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/fullCallingPipeline.q | 2 +- .../sting/queue/engine/LsfJobRunner.scala | 6 +- .../queue/function/CommandLineFunction.scala | 65 ++++++++++++++++--- 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index 824481b40..c115b74ee 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -220,7 +220,7 @@ class fullCallingPipeline extends QScript { clusters.DBSNP = qscript.dbSNP val clusters_clusterFile = swapExt(new File(snps.out.getAbsolutePath),".vcf",".cluster") clusters.clusterFile = clusters_clusterFile - clusters.memoryLimit = Some(8) + clusters.memoryLimit = Some(4) clusters.jobQueue = "gsa" clusters.use_annotation ++= List("QD", "SB", "HaplotypeScore", "HRun") diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index bedb147c3..61761e82c 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -32,6 +32,10 @@ class LsfJobRunner extends DispatchJobRunner with Logging { if (function.memoryLimit.isDefined) job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") + if ( ! function.commandLine.contains("mkdir")) // wild hack -- ignore mkdirs ?? + job.postExecCommand = function.doneOutputs.foldLeft("python /humgen/gsa-scr1/chartl/sting/python/lsf_post_touch.py ")((b,a) => b + a.getAbsolutePath+" ") + // hacky trailing space, so sue me -- CH + val previous: Iterable[LsfJob] = if (function.isInstanceOf[DispatchWaitFunction]) { job.waitForCompletion = true @@ -39,7 +43,7 @@ class LsfJobRunner extends DispatchJobRunner with Logging { } else { previousJobs(function, qGraph) } - + mountCommand(function) match { case Some(command) => job.preExecCommand = command case None => /* ignore */ diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index e2baeb7f6..92a5485c3 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -93,8 +93,10 @@ trait CommandLineFunction extends QFunction with Logging { */ def outputs = getFieldFiles(outputFields) + def doneOutputs = getDoneFiles(outputFields) + /** - * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. + * Gets the files from the fields. The fields must be a File, a FileExtension, or a List or Set of either. * @param fields Fields to get files. * @return Set[File] for the fields. */ @@ -112,12 +114,18 @@ trait CommandLineFunction extends QFunction with Logging { */ def upToDate = { val inputFiles = inputs - val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile)) - if (outputFiles.size > 0 && outputFiles.forall(_.exists)) { - val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified)) - val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified)) - maxInput < minOutput - } else false + if ( doneOutputs.size > 0 && doneOutputs.forall(_.exists) ) { + val maxInput = inputFiles.foldLeft(Long.MinValue)((date,file) => date.max(file.lastModified)) + val minDone = doneOutputs.foldLeft(Long.MaxValue)((date,file) => date.min(file.lastModified)) + maxInput < minDone + } else false + //val inputFiles = inputs + //val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile)) + //if (outputFiles.size > 0 && outputFiles.forall(_.exists)) { + // val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified)) + // val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified)) + // maxInput < minOutput + //} else false } /** @@ -136,7 +144,48 @@ trait CommandLineFunction extends QFunction with Logging { } /** - * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. + * Gets the done files from the field. The field must be a File, a FileExtension, or a List or Set of either. + * @param fields Field to get files. + * @return Set[File] set of done files for the field. + */ + private def getDoneFiles(fields: List[ArgumentSource]): Set[File] = { + var doneFiles = Set.empty[File] + for ( field <- fields ) { + CollectionUtils.foreach(getFieldValue(field), (fieldValue) => { + val outFile = fieldValueToFile(field,fieldValue) + if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory && ! outFile.getName.endsWith(".out")) { + doneFiles += new File(outFile.getParent + "/." + outFile.getName + ".done") + } + }) + } + doneFiles + + //for ( outFile <- outFiles ) { + // if ( outFile != null && filesAreDifferent(outFile,jobOutputFile) && filesAreDifferent(outFile,jobErrorFile) && ! outFile.isDirectory ) + // doneFiles += new File(outFile.getParent + "." + outFile.getName + ".done") + //} + //doneFiles + } + + /** + * Silly utility function which compresses if statement in getDoneFiles; returns true if two files are different + * @return boolean -- if files are different + */ + private def filesAreDifferent(a: File, b: File): Boolean = { + if ( b == null ) + if ( a == null ) + return false + else + return true + else + if ( a == null ) + return true + else + return ! b.getAbsolutePath.equals(a.getAbsolutePath) + } + + /** + * Gets the file from the field. The field must be a File or a FileExtension and not a List or Set. * @param field Field to get the file. * @return File for the field. */