From 8855f080c2ff685a00db957797915bb0227f18f0 Mon Sep 17 00:00:00 2001 From: kshakir Date: Thu, 20 Jan 2011 22:34:43 +0000 Subject: [PATCH] For the fullCallingPipeline.q: - Reading the refseq table from the YAML if not specified on the command line. - Removed obsolete -bigMemQueue now that CombineVariants runs in 4g. - Added a -mountDir /broad/software option to work around adpr automount issues. - Merged the LSF preexec used for automount into the shell script used to execute tasks. - Using the LSF C Library to determine when jobs are complete instead of postexec. - Updated queue.sh to match the changes above. - Updated the FCPTest to match the changes above. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5036 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/jna/clibrary/LibC.java | 17 +++ .../org/broadinstitute/sting/utils/Utils.java | 10 +- .../sting/jna/clibrary/LibCUnitTest.java | 26 ++++ .../jna/lsf/v7_0_6/LibBatIntegrationTest.java | 9 +- .../qscript/playground/fullCallingPipeline.q | 14 +- .../sting/queue/QSettings.scala | 3 + .../queue/engine/CommandLineJobRunner.scala | 48 +++++-- .../sting/queue/engine/Lsf706JobRunner.scala | 119 +++++++++++++++-- .../sting/queue/engine/LsfJobRunner.scala | 126 +----------------- .../queue/function/CommandLineFunction.scala | 3 - .../sting/queue/function/QFunction.scala | 8 ++ .../pipeline/FullCallingPipelineTest.scala | 27 ++-- shell/queue.sh | 3 +- 13 files changed, 231 insertions(+), 182 deletions(-) diff --git a/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java b/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java index 17a15cd94..b01533ee6 100644 --- a/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java +++ b/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java @@ -25,6 +25,7 @@ package org.broadinstitute.sting.jna.clibrary; import com.sun.jna.*; +import com.sun.jna.ptr.NativeLongByReference; /** * Sparse port of the Standard C Library libc -lc. @@ -176,4 +177,20 @@ public class LibC { public NativeLong tv_sec; public NativeLong tv_usec; } + + /** + * The time() function returns the value of time in seconds since 0 hours, 0 minutes, 0 seconds, January 1, 1970, Coordinated Universal Time, without including leap seconds. If an error occurs, time() returns the value (time_t)-1. + * The return value is also stored in *tloc, provided that t is non-null. + * @param t the value of time in seconds, provided that t is non-null. + * @return the value of time in seconds + */ + public static native NativeLong time(NativeLongByReference t); + + /** + * Returns the difference between two calendar times, (time1 - time0), expressed in seconds. + * @param time1 Time 1 + * @param time0 Time 0 + * @return the difference between two calendar times, (time1 - time0), expressed in seconds. + */ + public static native double difftime(NativeLong time1, NativeLong time0); } diff --git a/java/src/org/broadinstitute/sting/utils/Utils.java b/java/src/org/broadinstitute/sting/utils/Utils.java index e18986afb..34418541b 100755 --- a/java/src/org/broadinstitute/sting/utils/Utils.java +++ b/java/src/org/broadinstitute/sting/utils/Utils.java @@ -620,9 +620,9 @@ public class Utils { seqOut[i] = (byte)seqIn[i]; } return seqOut; - } + } + + public static boolean isFlagSet(int value, int flag) { + return ((value & flag) == flag); + } } - - - - diff --git a/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java b/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java index 1d88dd6af..6120bd211 100644 --- a/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java +++ b/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java @@ -24,6 +24,8 @@ package org.broadinstitute.sting.jna.clibrary; +import com.sun.jna.NativeLong; +import com.sun.jna.ptr.NativeLongByReference; import org.broadinstitute.sting.BaseTest; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,4 +42,28 @@ public class LibCUnitTest extends BaseTest { Assert.assertEquals(LibC.unsetenv(testProperty), 0); Assert.assertEquals(LibC.getenv(testProperty), null); } + + @Test + public void testDifftime() throws Exception { + // Pointer to hold the times + NativeLongByReference ref = new NativeLongByReference(); + + // time() returns -1 on error. + NativeLong err = new NativeLong(-1L); + + LibC.time(ref); + NativeLong time0 = ref.getValue(); + Assert.assertNotSame(time0, err, "Time 0 returned an error (-1)."); + + Thread.sleep(5000L); + + LibC.time(ref); + NativeLong time1 = ref.getValue(); + Assert.assertNotSame(time1, err, "Time 1 returned an error (-1)."); + + Assert.assertNotSame(time1, time0, "Time 1 returned same time as Time 0."); + + double diff = LibC.difftime(time1, time0); + Assert.assertTrue(diff >= 5, "Time difference was not greater than 5 seconds: " + diff); + } } diff --git a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index b20fc7ada..5c79bb140 100644 --- a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -26,6 +26,7 @@ package org.broadinstitute.sting.jna.lsf.v7_0_6; import com.sun.jna.ptr.IntByReference; import org.apache.commons.io.FileUtils; +import org.broadinstitute.sting.utils.Utils; import org.testng.Assert; import org.testng.annotations.Test; import org.broadinstitute.sting.BaseTest; @@ -74,7 +75,7 @@ public class LibBatIntegrationTest extends BaseTest { System.out.println("Waiting for job to run: " + jobId); int jobStatus = LibBat.JOB_STAT_PEND; - while (isSet(jobStatus, LibBat.JOB_STAT_PEND) || isSet(jobStatus, LibBat.JOB_STAT_RUN)) { + while (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_PEND) || Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_RUN)) { Thread.sleep(30 * 1000L); int numJobs = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB); @@ -92,15 +93,11 @@ public class LibBatIntegrationTest extends BaseTest { LibBat.lsb_closejobinfo(); } } - Assert.assertTrue(isSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus)); + Assert.assertTrue(Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus)); Assert.assertTrue(FileUtils.waitFor(outFile, 120), "File not found: " + outFile.getAbsolutePath()); Assert.assertTrue(outFile.delete(), "Unable to delete " + outFile.getAbsolutePath()); Assert.assertEquals(reply.queue, req.queue, "LSF reply queue does not match requested queue."); System.out.println("Validating that we reached the end of the test without exit."); } - - private static boolean isSet(int value, int flag) { - return ((value & flag) == flag); - } } diff --git a/scala/qscript/playground/fullCallingPipeline.q b/scala/qscript/playground/fullCallingPipeline.q index df76a8f46..8d27254b5 100755 --- a/scala/qscript/playground/fullCallingPipeline.q +++ b/scala/qscript/playground/fullCallingPipeline.q @@ -10,6 +10,7 @@ import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, Cl import org.broadinstitute.sting.queue.util.IOUtils import org.broadinstitute.sting.queue.QScript import collection.JavaConversions._ +import org.broadinstitute.sting.utils.exceptions.UserException import org.broadinstitute.sting.utils.interval.IntervalUtils import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType @@ -23,7 +24,8 @@ class fullCallingPipeline extends QScript { @Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false) var trigger: File = _ - @Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable") + // TODO: Fix command lines that pass -refseqTable + @Input(doc="path to refseqTable (for GenomicAnnotator) if not present in the YAML", shortName="refseqTable", required=false) var refseqTable: File = _ @Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false) @@ -56,7 +58,8 @@ class fullCallingPipeline extends QScript { //@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection") //var protocol: String = _ - @Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false) + // TODO: Fix command lines that pass -bigMemQueue + @Argument(doc="Unused", shortName="bigMemQueue", required=false) var big_mem_queue: String = _ @Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false) @@ -86,8 +89,10 @@ class fullCallingPipeline extends QScript { dbsnpType = "VCF" } - val projectBase: String = qscript.pipeline.getProject.getName + // TODO: Fix command lines that pass -refseqTable + if (qscript.refseqTable != null) + qscript.pipeline.getProject.setRefseqTable(qscript.refseqTable) if (qscript.skip_cleaning) { //endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind) @@ -288,13 +293,12 @@ class fullCallingPipeline extends QScript { mergeIndels.rodBind = indelCallFiles mergeIndels.analysisName = base+"_MergeIndels" mergeIndels.memoryLimit = Some(4) - mergeIndels.jobQueue = qscript.big_mem_queue // 1b. genomically annotate SNPs -- no longer slow val annotated = new GenomicAnnotator with CommandLineGATKArgs annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out") annotated.rodBind :+= RodBind("variant", "VCF", snps.out) - annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.refseqTable) + annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable) //annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable) annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf") //annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet" diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala index b3dd6fb3c..3b420bf93 100644 --- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -29,6 +29,9 @@ class QSettings { @Argument(fullName="temp_directory", shortName="tempDir", doc="Temp directory to pass to functions.", required=false) var tempDirectory = new File(System.getProperty("java.io.tmpdir")) + @Argument(fullName="mount_directory", shortName="mountDir", doc="Extra directory to automount via 'cd ' before running functions.", required=false) + var mountDirectories: Set[File] = Set.empty[File] + @ArgumentCollection val emailSettings = new EmailSettings } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index dffe89a1f..eda4ed92a 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -1,24 +1,45 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.queue.util.IOUtils import java.io.File +import org.broadinstitute.sting.queue.util.{Logging, IOUtils} /** * Runs a command line function. */ -trait CommandLineJobRunner extends JobRunner[CommandLineFunction] { +trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { /** A generated exec shell script. */ protected var exec: File = _ /** Which directory to use for the job status files. */ protected def jobStatusDir = function.jobTempDir + /** The last time the status returned unknown. */ + protected var firstUnknownTime: Option[Long] = None + + /** Amount of time the status can return unknown before giving up. */ + protected val unknownStatusMaxSeconds = 5 * 60 + + /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */ + protected val retryExpiredSeconds = 5 * 60 + /** * Writes the function command line to an exec file. */ protected def writeExec() { - this.exec = IOUtils.writeTempFile(function.commandLine, ".exec", "", jobStatusDir) + var exec = new StringBuilder + + var dirs = function.mountDirectories + for (dir <- function.jobDirectories) + dirs += IOUtils.dirLevel(dir, 2) + if (dirs.size > 0) { + // prepend "cd '' [&& cd '']" to automount the directories. + exec.append(dirs.mkString("cd '", "' && cd '", "'")) + exec.append(" && cd '%s' && \\%n".format(function.commandDirectory)) + } + exec.append(function.commandLine) + + this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) } /** @@ -29,17 +50,16 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] { } /** - * Builds a command line that can be run to force an automount of the directories. - * @param function Function to look jobDirectories. - * @return A "cd '' [&& cd '']" command. + * Outputs the last lines of the error logs. */ - protected def mountCommand(function: CommandLineFunction) = { - var dirs = Set.empty[File] - for (dir <- function.jobDirectories) - dirs += IOUtils.dirLevel(dir, 2) - if (dirs.size > 0) - Some(dirs.mkString("cd '", "' && cd '", "'")) - else - None + protected def tailError() = { + val errorFile = functionErrorFile + if (IOUtils.waitFor(errorFile, 120)) { + val tailLines = IOUtils.tail(errorFile, 100) + val nl = "%n".format() + logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) + } else { + logger.error("Unable to access log file: %s".format(errorFile)) + } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 7cc529cce..ddee2eba2 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -1,12 +1,15 @@ package org.broadinstitute.sting.queue.engine import java.io.File -import com.sun.jna.Memory import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} +import org.broadinstitute.sting.utils.Utils +import com.sun.jna.{NativeLong, Memory} +import org.broadinstitute.sting.jna.clibrary.LibC +import java.util.Date /** * Runs jobs on an LSF compute cluster. @@ -70,14 +73,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi writeExec() request.command = "sh " + exec - writePreExec() - request.preExecCmd = "sh " + preExec - request.options |= LibBat.SUB_PRE_EXEC - - writePostExec() - request.postExecCmd = "sh " + postExec - request.options3 |= LibBat.SUB3_POST_EXEC - // Allow advanced users to update the request. updateJobRun(request) @@ -98,7 +93,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi if (jobId < 0) throw new QException(LibBat.lsb_sperror("Unable to submit job")) }, 1, 5, 10) - jobStatusPath = IOUtils.absolute(new File(jobStatusDir, "." + jobId)).toString logger.info("Submitted LSF job id: " + jobId) } catch { case e => @@ -113,6 +107,111 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi logger.error("Error: " + bsubCommand, e) } } + + /** + * Updates and returns the status. + */ + def status = { + try { + var jobStatus = LibBat.JOB_STAT_NULL + var exitStatus = 0 + var exitInfo = 0 + var endTime: NativeLong = null + + LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) + try { + val jobInfo = LibBat.lsb_readjobinfo(null) + if (jobInfo == null) { + jobStatus = LibBat.JOB_STAT_UNKWN + exitStatus = 0 + exitInfo = 0 + endTime = null + } else { + jobStatus = jobInfo.status + exitStatus = jobInfo.exitStatus + exitInfo = jobInfo.exitInfo + endTime = jobInfo.endTime + } + } finally { + LibBat.lsb_closejobinfo() + } + + logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { + val now = new Date().getTime + + if (firstUnknownTime.isEmpty) { + firstUnknownTime = Some(now) + logger.debug("First unknown status for job id: " + jobId) + } + + if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + runStatus = RunnerStatus.FAILED + try { + removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + } catch { + case _ => /* ignore errors in the error handler */ + } + logger.error("Error: " + bsubCommand + ", unknown status for " + unknownStatusMaxSeconds + " seconds.") + } + } else { + // Reset the last time an unknown status was seen. + firstUnknownTime = None + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { + // Exited function that (probably) won't be retried. + runStatus = RunnerStatus.FAILED + try { + removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + } catch { + case _ => /* ignore errors in the error handler */ + } + logger.error("Error: " + bsubCommand) + tailError() + } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { + // Done successfully. + removeTemporaryFiles() + function.doneOutputs.foreach(_.createNewFile()) + runStatus = RunnerStatus.DONE + logger.info("Done: " + bsubCommand) + } + } + } catch { + case e => + runStatus = RunnerStatus.FAILED + try { + removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + bsubCommand, e) + } + + runStatus + } + + /** + * Returns true if LSF is expected to retry running the function. + * @param exitInfo The reason the job exited. + * @param endTime THe time the job exited. + * @return true if LSF is expected to retry running the function. + */ + private def willRetry(exitInfo: Int, endTime: NativeLong) = { + exitInfo match { + case LibBat.EXIT_NORMAL => false + case _ => { + val seconds = LibC.difftime(LibC.time(null), endTime) + (seconds <= retryExpiredSeconds) + } + } + } + } object Lsf706JobRunner extends Logging { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 8275ac65d..fb937f2ab 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -1,6 +1,5 @@ package org.broadinstitute.sting.queue.engine -import java.io.File import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ @@ -10,132 +9,9 @@ import org.broadinstitute.sting.queue.util._ abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { protected var runStatus: RunnerStatus.Value = _ + /** Job Id of the currently executing job. */ var jobId = -1L - /** A file to look for to validate that the function ran to completion. */ - protected var jobStatusPath: String = _ - - /** A temporary job done file to let Queue know that the process ran successfully. */ - private lazy val jobDoneFile = new File(jobStatusPath + ".done") - - /** A temporary job done file to let Queue know that the process exited with an error. */ - private lazy val jobFailFile = new File(jobStatusPath + ".fail") - - /** A generated pre-exec shell script. */ - protected var preExec: File = _ - - /** A generated post-exec shell script. */ - protected var postExec: File = _ - // TODO: Full bsub command for debugging. protected def bsubCommand = "bsub " + function.commandLine - - /** - * Updates and returns the status by looking for job status files. - * After the job status files are detected they are cleaned up from - * the file system and the status is cached. - * - * Note, these temporary job status files are currently different from the - * .done files used to determine if a file has been created successfully. - */ - def status = { - try { - if (logger.isDebugEnabled) { - logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists)) - logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists)) - } - - if (jobFailFile.exists) { - removeTemporaryFiles() - runStatus = RunnerStatus.FAILED - logger.info("Error: " + bsubCommand) - tailError() - } else if (jobDoneFile.exists) { - removeTemporaryFiles() - runStatus = RunnerStatus.DONE - logger.info("Done: " + bsubCommand) - } - } catch { - case e => - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - writeStackTrace(e) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + bsubCommand, e) - } - - runStatus - } - - /** - * Removes all temporary files used for this LSF job. - */ - override def removeTemporaryFiles() = { - super.removeTemporaryFiles() - IOUtils.tryDelete(preExec) - IOUtils.tryDelete(postExec) - IOUtils.tryDelete(jobDoneFile) - IOUtils.tryDelete(jobFailFile) - } - - /** - * Outputs the last lines of the error logs. - */ - protected def tailError() = { - val errorFile = functionErrorFile - if (IOUtils.waitFor(errorFile, 120)) { - val tailLines = IOUtils.tail(errorFile, 100) - val nl = "%n".format() - logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) - } else { - logger.error("Unable to access log file: %s".format(errorFile)) - } - } - - /** - * Writes a pre-exec file to cleanup any status files and - * optionally mount any automount directories on the node. - */ - protected def writePreExec() { - val preExec = new StringBuilder - - preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(jobStatusDir)) - function.doneOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) - preExec.append("rm -f '%s/'.$LSB_JOBID.fail%n".format(jobStatusDir)) - function.failOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file))) - - mountCommand(function).foreach(command => - preExec.append("%s%n".format(command))) - - this.preExec = IOUtils.writeTempFile(preExec.toString, ".preExec", "", jobStatusDir) - } - - /** - * Writes a post-exec file to create the status files. - */ - protected def writePostExec() { - val postExec = new StringBuilder - - val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString - val touchFail = function.failOutputs.map("touch '%s'%n".format(_)).mkString - - postExec.append("""| - |if [ "${LSB_JOBPEND:-unset}" != "unset" ]; then - | exit 0 - |fi - | - |JOB_STAT_ROOT='%s/'.$LSB_JOBID - |if [ "$LSB_JOBEXIT_STAT" == "0" ]; then - |%stouch "$JOB_STAT_ROOT".done - |else - |%stouch "$JOB_STAT_ROOT".fail - |fi - |""".stripMargin.format(jobStatusDir, touchDone, touchFail)) - - this.postExec = IOUtils.writeTempFile(postExec.toString, ".postExec", "", jobStatusDir) - } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 4d6d2734d..1e8ba68d0 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -14,9 +14,6 @@ trait CommandLineFunction extends QFunction with Logging { /** Upper memory limit */ var memoryLimit: Option[Int] = None - /** Whether a job is restartable */ - var jobRestartable = true - /** Job project to run the command */ var jobProject: String = _ diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index ba7eb247f..b177353ab 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -34,6 +34,9 @@ trait QFunction extends Logging { /** Temporary directory to write any files */ var jobTempDir: File = null + /** Directories to mount via 'cd ' before running the command. */ + var mountDirectories: Set[File] = Set.empty[File] + /** Order the function was added to the graph. */ var addOrder: List[Int] = Nil @@ -44,6 +47,9 @@ trait QFunction extends Logging { */ var jobLimitSeconds: Option[Int] = None + /** Whether a job is restartable */ + var jobRestartable = true + /** * A callback for modifying the run. * NOTE: This function is for ADVANCED use only and is unsupported. @@ -302,6 +308,8 @@ trait QFunction extends Logging { // Do not set the temp dir relative to the command directory jobTempDir = IOUtils.absolute(jobTempDir) + mountDirectories ++= qSettings.mountDirectories + absoluteCommandDirectory() } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala index 6d3e4d63d..d334c44e9 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala @@ -14,6 +14,14 @@ class FullCallingPipelineTest extends BaseTest { private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" + // Explicit directories known to cause problems due to automount failures, and not detected by @Input / @Outputs + private final val mountDirectories = Set("/broad/software") + + // In fullCallingPipeline.q VariantEval is always compared against 129. + // Until the newvarianteval is finalized which will allow java import of the prior results, + // we re-run VariantEval to validate the run, and replicate that behavior here. + private final val variantEvalDbsnpFile = new File(BaseTest.b37dbSNP129) + val k1gChr20Dataset = { val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20") dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list")) @@ -42,7 +50,6 @@ class FullCallingPipelineTest extends BaseTest { dataset.validations :+= new DoubleValidation("eval.dbsnp.all.called.novel.titv.tiTvRatio", 3.0340) dataset.jobQueue = "gsa" - dataset.bigMemQueue = "gsa" dataset } @@ -51,7 +58,8 @@ class FullCallingPipelineTest extends BaseTest { val project = new PipelineProject project.setName(projectName) project.setReferenceFile(new File(BaseTest.hg19Reference)) - project.setDbsnpFile(new File(BaseTest.b37dbSNP129)) + project.setDbsnpFile(new File(BaseTest.b37dbSNP132)) + project.setRefseqTable(new File(BaseTest.hg19Refseq)) val squid = "C426" val ids = List( @@ -72,8 +80,6 @@ class FullCallingPipelineTest extends BaseTest { val dataset = new PipelineDataset dataset.pipeline = pipeline - dataset.refseq = BaseTest.hg19Refseq - dataset.targetTiTv = "3.0" dataset } @@ -92,10 +98,10 @@ class FullCallingPipelineTest extends BaseTest { // Run the pipeline with the expected inputs. val currentDir = new File(".").getAbsolutePath var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" + - " -jobProject %s -Y %s -refseqTable %s" + + " -jobProject %s -Y %s" + " -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" + " --gatkjar %s/dist/GenomeAnalysisTK.jar") - .format(projectName, yamlFile, dataset.refseq, currentDir, currentDir) + .format(projectName, yamlFile, currentDir, currentDir) if (!dataset.runIndelRealigner) { pipelineCommand += " -skipCleaning" @@ -105,8 +111,8 @@ class FullCallingPipelineTest extends BaseTest { if (dataset.jobQueue != null) pipelineCommand += " -jobQueue " + dataset.jobQueue - if (dataset.bigMemQueue != null) - pipelineCommand += " -bigMemQueue " + dataset.bigMemQueue + for (dir <- mountDirectories) + pipelineCommand += " -mountDir " + dir // Run the test, at least checking if the command compiles PipelineTest.executeTest(testName, pipelineCommand, null) @@ -128,7 +134,7 @@ class FullCallingPipelineTest extends BaseTest { var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" + " -E %s -reportType R -reportLocation %s -L %s") .format( - dataset.pipeline.getProject.getReferenceFile, dataset.pipeline.getProject.getDbsnpFile, handFilteredVcf, + dataset.pipeline.getProject.getReferenceFile, variantEvalDbsnpFile, handFilteredVcf, evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList) for (validation <- dataset.validations) { @@ -143,11 +149,8 @@ class FullCallingPipelineTest extends BaseTest { class PipelineDataset( var pipeline: Pipeline = null, - var refseq: String = null, - var targetTiTv: String = null, var validations: List[PipelineValidation] = Nil, var jobQueue: String = null, - var bigMemQueue: String = null, var runIndelRealigner: Boolean = false) { override def toString = pipeline.getProject.getName } diff --git a/shell/queue.sh b/shell/queue.sh index 278137387..c34b920bb 100755 --- a/shell/queue.sh +++ b/shell/queue.sh @@ -5,7 +5,6 @@ STING_HOME=/humgen/gsa-firehose2/pipeline/repositories/StingProduction REFSEQ_TABLE=/humgen/gsa-hpprojects/GATK/data/Annotations/refseq/refGene-big-table-hg19.txt JOB_QUEUE=gsa SHORT_QUEUE=gsa -BIG_QUEUE=gsa TMP_DIR=$PWD/tmp mkdir $PWD/tmp @@ -15,4 +14,4 @@ use R-2.10 use Oracle-full-client use .cx-oracle-5.0.2-python-2.6.5-oracle-full-client-11.1 -java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -bigMemQueue "$BIG_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -log queue_log.txt -statusTo corin -bsub $2 +java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -mountDir /broad/software -log queue_log.txt -statusTo corin -bsub $2