diff --git a/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java b/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java
index 17a15cd94..b01533ee6 100644
--- a/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java
+++ b/java/src/org/broadinstitute/sting/jna/clibrary/LibC.java
@@ -25,6 +25,7 @@
package org.broadinstitute.sting.jna.clibrary;
import com.sun.jna.*;
+import com.sun.jna.ptr.NativeLongByReference;
/**
* Sparse port of the Standard C Library libc -lc.
@@ -176,4 +177,20 @@ public class LibC {
public NativeLong tv_sec;
public NativeLong tv_usec;
}
+
+ /**
+ * The time() function returns the value of time in seconds since 0 hours, 0 minutes, 0 seconds, January 1, 1970, Coordinated Universal Time, without including leap seconds. If an error occurs, time() returns the value (time_t)-1.
+ * The return value is also stored in *tloc, provided that t is non-null.
+ * @param t the value of time in seconds, provided that t is non-null.
+ * @return the value of time in seconds
+ */
+ public static native NativeLong time(NativeLongByReference t);
+
+ /**
+ * Returns the difference between two calendar times, (time1 - time0), expressed in seconds.
+ * @param time1 Time 1
+ * @param time0 Time 0
+ * @return the difference between two calendar times, (time1 - time0), expressed in seconds.
+ */
+ public static native double difftime(NativeLong time1, NativeLong time0);
}
diff --git a/java/src/org/broadinstitute/sting/utils/Utils.java b/java/src/org/broadinstitute/sting/utils/Utils.java
index e18986afb..34418541b 100755
--- a/java/src/org/broadinstitute/sting/utils/Utils.java
+++ b/java/src/org/broadinstitute/sting/utils/Utils.java
@@ -620,9 +620,9 @@ public class Utils {
seqOut[i] = (byte)seqIn[i];
}
return seqOut;
- }
+ }
+
+ public static boolean isFlagSet(int value, int flag) {
+ return ((value & flag) == flag);
+ }
}
-
-
-
-
diff --git a/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java b/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java
index 1d88dd6af..6120bd211 100644
--- a/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java
+++ b/java/test/org/broadinstitute/sting/jna/clibrary/LibCUnitTest.java
@@ -24,6 +24,8 @@
package org.broadinstitute.sting.jna.clibrary;
+import com.sun.jna.NativeLong;
+import com.sun.jna.ptr.NativeLongByReference;
import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -40,4 +42,28 @@ public class LibCUnitTest extends BaseTest {
Assert.assertEquals(LibC.unsetenv(testProperty), 0);
Assert.assertEquals(LibC.getenv(testProperty), null);
}
+
+ @Test
+ public void testDifftime() throws Exception {
+ // Pointer to hold the times
+ NativeLongByReference ref = new NativeLongByReference();
+
+ // time() returns -1 on error.
+ NativeLong err = new NativeLong(-1L);
+
+ LibC.time(ref);
+ NativeLong time0 = ref.getValue();
+ Assert.assertNotSame(time0, err, "Time 0 returned an error (-1).");
+
+ Thread.sleep(5000L);
+
+ LibC.time(ref);
+ NativeLong time1 = ref.getValue();
+ Assert.assertNotSame(time1, err, "Time 1 returned an error (-1).");
+
+ Assert.assertNotSame(time1, time0, "Time 1 returned same time as Time 0.");
+
+ double diff = LibC.difftime(time1, time0);
+ Assert.assertTrue(diff >= 5, "Time difference was not greater than 5 seconds: " + diff);
+ }
}
diff --git a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java
index b20fc7ada..5c79bb140 100644
--- a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java
+++ b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java
@@ -26,6 +26,7 @@ package org.broadinstitute.sting.jna.lsf.v7_0_6;
import com.sun.jna.ptr.IntByReference;
import org.apache.commons.io.FileUtils;
+import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.broadinstitute.sting.BaseTest;
@@ -74,7 +75,7 @@ public class LibBatIntegrationTest extends BaseTest {
System.out.println("Waiting for job to run: " + jobId);
int jobStatus = LibBat.JOB_STAT_PEND;
- while (isSet(jobStatus, LibBat.JOB_STAT_PEND) || isSet(jobStatus, LibBat.JOB_STAT_RUN)) {
+ while (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_PEND) || Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_RUN)) {
Thread.sleep(30 * 1000L);
int numJobs = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB);
@@ -92,15 +93,11 @@ public class LibBatIntegrationTest extends BaseTest {
LibBat.lsb_closejobinfo();
}
}
- Assert.assertTrue(isSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus));
+ Assert.assertTrue(Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus));
Assert.assertTrue(FileUtils.waitFor(outFile, 120), "File not found: " + outFile.getAbsolutePath());
Assert.assertTrue(outFile.delete(), "Unable to delete " + outFile.getAbsolutePath());
Assert.assertEquals(reply.queue, req.queue, "LSF reply queue does not match requested queue.");
System.out.println("Validating that we reached the end of the test without exit.");
}
-
- private static boolean isSet(int value, int flag) {
- return ((value & flag) == flag);
- }
}
diff --git a/scala/qscript/playground/fullCallingPipeline.q b/scala/qscript/playground/fullCallingPipeline.q
index df76a8f46..8d27254b5 100755
--- a/scala/qscript/playground/fullCallingPipeline.q
+++ b/scala/qscript/playground/fullCallingPipeline.q
@@ -10,6 +10,7 @@ import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, Cl
import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.QScript
import collection.JavaConversions._
+import org.broadinstitute.sting.utils.exceptions.UserException
import org.broadinstitute.sting.utils.interval.IntervalUtils
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType
@@ -23,7 +24,8 @@ class fullCallingPipeline extends QScript {
@Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false)
var trigger: File = _
- @Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable")
+ // TODO: Fix command lines that pass -refseqTable
+ @Input(doc="path to refseqTable (for GenomicAnnotator) if not present in the YAML", shortName="refseqTable", required=false)
var refseqTable: File = _
@Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false)
@@ -56,7 +58,8 @@ class fullCallingPipeline extends QScript {
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
//var protocol: String = _
- @Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false)
+ // TODO: Fix command lines that pass -bigMemQueue
+ @Argument(doc="Unused", shortName="bigMemQueue", required=false)
var big_mem_queue: String = _
@Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false)
@@ -86,8 +89,10 @@ class fullCallingPipeline extends QScript {
dbsnpType = "VCF"
}
-
val projectBase: String = qscript.pipeline.getProject.getName
+ // TODO: Fix command lines that pass -refseqTable
+ if (qscript.refseqTable != null)
+ qscript.pipeline.getProject.setRefseqTable(qscript.refseqTable)
if (qscript.skip_cleaning) {
//endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind)
@@ -288,13 +293,12 @@ class fullCallingPipeline extends QScript {
mergeIndels.rodBind = indelCallFiles
mergeIndels.analysisName = base+"_MergeIndels"
mergeIndels.memoryLimit = Some(4)
- mergeIndels.jobQueue = qscript.big_mem_queue
// 1b. genomically annotate SNPs -- no longer slow
val annotated = new GenomicAnnotator with CommandLineGATKArgs
annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out")
annotated.rodBind :+= RodBind("variant", "VCF", snps.out)
- annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.refseqTable)
+ annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable)
//annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable)
annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf")
//annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet"
diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala
index b3dd6fb3c..3b420bf93 100644
--- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala
+++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala
@@ -29,6 +29,9 @@ class QSettings {
@Argument(fullName="temp_directory", shortName="tempDir", doc="Temp directory to pass to functions.", required=false)
var tempDirectory = new File(System.getProperty("java.io.tmpdir"))
+ @Argument(fullName="mount_directory", shortName="mountDir", doc="Extra directory to automount via 'cd
' before running functions.", required=false)
+ var mountDirectories: Set[File] = Set.empty[File]
+
@ArgumentCollection
val emailSettings = new EmailSettings
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
index dffe89a1f..eda4ed92a 100755
--- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala
@@ -1,24 +1,45 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
-import org.broadinstitute.sting.queue.util.IOUtils
import java.io.File
+import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
/**
* Runs a command line function.
*/
-trait CommandLineJobRunner extends JobRunner[CommandLineFunction] {
+trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** A generated exec shell script. */
protected var exec: File = _
/** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir
+ /** The last time the status returned unknown. */
+ protected var firstUnknownTime: Option[Long] = None
+
+ /** Amount of time the status can return unknown before giving up. */
+ protected val unknownStatusMaxSeconds = 5 * 60
+
+ /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
+ protected val retryExpiredSeconds = 5 * 60
+
/**
* Writes the function command line to an exec file.
*/
protected def writeExec() {
- this.exec = IOUtils.writeTempFile(function.commandLine, ".exec", "", jobStatusDir)
+ var exec = new StringBuilder
+
+ var dirs = function.mountDirectories
+ for (dir <- function.jobDirectories)
+ dirs += IOUtils.dirLevel(dir, 2)
+ if (dirs.size > 0) {
+ // prepend "cd '' [&& cd '']" to automount the directories.
+ exec.append(dirs.mkString("cd '", "' && cd '", "'"))
+ exec.append(" && cd '%s' && \\%n".format(function.commandDirectory))
+ }
+ exec.append(function.commandLine)
+
+ this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
}
/**
@@ -29,17 +50,16 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] {
}
/**
- * Builds a command line that can be run to force an automount of the directories.
- * @param function Function to look jobDirectories.
- * @return A "cd '' [&& cd '']" command.
+ * Outputs the last lines of the error logs.
*/
- protected def mountCommand(function: CommandLineFunction) = {
- var dirs = Set.empty[File]
- for (dir <- function.jobDirectories)
- dirs += IOUtils.dirLevel(dir, 2)
- if (dirs.size > 0)
- Some(dirs.mkString("cd '", "' && cd '", "'"))
- else
- None
+ protected def tailError() = {
+ val errorFile = functionErrorFile
+ if (IOUtils.waitFor(errorFile, 120)) {
+ val tailLines = IOUtils.tail(errorFile, 100)
+ val nl = "%n".format()
+ logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
+ } else {
+ logger.error("Unable to access log file: %s".format(errorFile))
+ }
}
}
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala
index 7cc529cce..ddee2eba2 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala
@@ -1,12 +1,15 @@
package org.broadinstitute.sting.queue.engine
import java.io.File
-import com.sun.jna.Memory
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit}
+import org.broadinstitute.sting.utils.Utils
+import com.sun.jna.{NativeLong, Memory}
+import org.broadinstitute.sting.jna.clibrary.LibC
+import java.util.Date
/**
* Runs jobs on an LSF compute cluster.
@@ -70,14 +73,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
writeExec()
request.command = "sh " + exec
- writePreExec()
- request.preExecCmd = "sh " + preExec
- request.options |= LibBat.SUB_PRE_EXEC
-
- writePostExec()
- request.postExecCmd = "sh " + postExec
- request.options3 |= LibBat.SUB3_POST_EXEC
-
// Allow advanced users to update the request.
updateJobRun(request)
@@ -98,7 +93,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10)
- jobStatusPath = IOUtils.absolute(new File(jobStatusDir, "." + jobId)).toString
logger.info("Submitted LSF job id: " + jobId)
} catch {
case e =>
@@ -113,6 +107,111 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
logger.error("Error: " + bsubCommand, e)
}
}
+
+ /**
+ * Updates and returns the status.
+ */
+ def status = {
+ try {
+ var jobStatus = LibBat.JOB_STAT_NULL
+ var exitStatus = 0
+ var exitInfo = 0
+ var endTime: NativeLong = null
+
+ LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
+ try {
+ val jobInfo = LibBat.lsb_readjobinfo(null)
+ if (jobInfo == null) {
+ jobStatus = LibBat.JOB_STAT_UNKWN
+ exitStatus = 0
+ exitInfo = 0
+ endTime = null
+ } else {
+ jobStatus = jobInfo.status
+ exitStatus = jobInfo.exitStatus
+ exitInfo = jobInfo.exitInfo
+ endTime = jobInfo.endTime
+ }
+ } finally {
+ LibBat.lsb_closejobinfo()
+ }
+
+ logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
+
+ if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
+ val now = new Date().getTime
+
+ if (firstUnknownTime.isEmpty) {
+ firstUnknownTime = Some(now)
+ logger.debug("First unknown status for job id: " + jobId)
+ }
+
+ if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
+ // Unknown status has been returned for a while now.
+ runStatus = RunnerStatus.FAILED
+ try {
+ removeTemporaryFiles()
+ function.failOutputs.foreach(_.createNewFile())
+ } catch {
+ case _ => /* ignore errors in the error handler */
+ }
+ logger.error("Error: " + bsubCommand + ", unknown status for " + unknownStatusMaxSeconds + " seconds.")
+ }
+ } else {
+ // Reset the last time an unknown status was seen.
+ firstUnknownTime = None
+
+ if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
+ // Exited function that (probably) won't be retried.
+ runStatus = RunnerStatus.FAILED
+ try {
+ removeTemporaryFiles()
+ function.failOutputs.foreach(_.createNewFile())
+ } catch {
+ case _ => /* ignore errors in the error handler */
+ }
+ logger.error("Error: " + bsubCommand)
+ tailError()
+ } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
+ // Done successfully.
+ removeTemporaryFiles()
+ function.doneOutputs.foreach(_.createNewFile())
+ runStatus = RunnerStatus.DONE
+ logger.info("Done: " + bsubCommand)
+ }
+ }
+ } catch {
+ case e =>
+ runStatus = RunnerStatus.FAILED
+ try {
+ removeTemporaryFiles()
+ function.failOutputs.foreach(_.createNewFile())
+ writeStackTrace(e)
+ } catch {
+ case _ => /* ignore errors in the exception handler */
+ }
+ logger.error("Error: " + bsubCommand, e)
+ }
+
+ runStatus
+ }
+
+ /**
+ * Returns true if LSF is expected to retry running the function.
+ * @param exitInfo The reason the job exited.
+ * @param endTime THe time the job exited.
+ * @return true if LSF is expected to retry running the function.
+ */
+ private def willRetry(exitInfo: Int, endTime: NativeLong) = {
+ exitInfo match {
+ case LibBat.EXIT_NORMAL => false
+ case _ => {
+ val seconds = LibC.difftime(LibC.time(null), endTime)
+ (seconds <= retryExpiredSeconds)
+ }
+ }
+ }
+
}
object Lsf706JobRunner extends Logging {
diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
index 8275ac65d..fb937f2ab 100644
--- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
+++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala
@@ -1,6 +1,5 @@
package org.broadinstitute.sting.queue.engine
-import java.io.File
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
@@ -10,132 +9,9 @@ import org.broadinstitute.sting.queue.util._
abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
protected var runStatus: RunnerStatus.Value = _
+ /** Job Id of the currently executing job. */
var jobId = -1L
- /** A file to look for to validate that the function ran to completion. */
- protected var jobStatusPath: String = _
-
- /** A temporary job done file to let Queue know that the process ran successfully. */
- private lazy val jobDoneFile = new File(jobStatusPath + ".done")
-
- /** A temporary job done file to let Queue know that the process exited with an error. */
- private lazy val jobFailFile = new File(jobStatusPath + ".fail")
-
- /** A generated pre-exec shell script. */
- protected var preExec: File = _
-
- /** A generated post-exec shell script. */
- protected var postExec: File = _
-
// TODO: Full bsub command for debugging.
protected def bsubCommand = "bsub " + function.commandLine
-
- /**
- * Updates and returns the status by looking for job status files.
- * After the job status files are detected they are cleaned up from
- * the file system and the status is cached.
- *
- * Note, these temporary job status files are currently different from the
- * .done files used to determine if a file has been created successfully.
- */
- def status = {
- try {
- if (logger.isDebugEnabled) {
- logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists))
- logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists))
- }
-
- if (jobFailFile.exists) {
- removeTemporaryFiles()
- runStatus = RunnerStatus.FAILED
- logger.info("Error: " + bsubCommand)
- tailError()
- } else if (jobDoneFile.exists) {
- removeTemporaryFiles()
- runStatus = RunnerStatus.DONE
- logger.info("Done: " + bsubCommand)
- }
- } catch {
- case e =>
- runStatus = RunnerStatus.FAILED
- try {
- removeTemporaryFiles()
- function.failOutputs.foreach(_.createNewFile())
- writeStackTrace(e)
- } catch {
- case _ => /* ignore errors in the exception handler */
- }
- logger.error("Error: " + bsubCommand, e)
- }
-
- runStatus
- }
-
- /**
- * Removes all temporary files used for this LSF job.
- */
- override def removeTemporaryFiles() = {
- super.removeTemporaryFiles()
- IOUtils.tryDelete(preExec)
- IOUtils.tryDelete(postExec)
- IOUtils.tryDelete(jobDoneFile)
- IOUtils.tryDelete(jobFailFile)
- }
-
- /**
- * Outputs the last lines of the error logs.
- */
- protected def tailError() = {
- val errorFile = functionErrorFile
- if (IOUtils.waitFor(errorFile, 120)) {
- val tailLines = IOUtils.tail(errorFile, 100)
- val nl = "%n".format()
- logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
- } else {
- logger.error("Unable to access log file: %s".format(errorFile))
- }
- }
-
- /**
- * Writes a pre-exec file to cleanup any status files and
- * optionally mount any automount directories on the node.
- */
- protected def writePreExec() {
- val preExec = new StringBuilder
-
- preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(jobStatusDir))
- function.doneOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file)))
- preExec.append("rm -f '%s/'.$LSB_JOBID.fail%n".format(jobStatusDir))
- function.failOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file)))
-
- mountCommand(function).foreach(command =>
- preExec.append("%s%n".format(command)))
-
- this.preExec = IOUtils.writeTempFile(preExec.toString, ".preExec", "", jobStatusDir)
- }
-
- /**
- * Writes a post-exec file to create the status files.
- */
- protected def writePostExec() {
- val postExec = new StringBuilder
-
- val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString
- val touchFail = function.failOutputs.map("touch '%s'%n".format(_)).mkString
-
- postExec.append("""|
- |if [ "${LSB_JOBPEND:-unset}" != "unset" ]; then
- | exit 0
- |fi
- |
- |JOB_STAT_ROOT='%s/'.$LSB_JOBID
- |if [ "$LSB_JOBEXIT_STAT" == "0" ]; then
- |%stouch "$JOB_STAT_ROOT".done
- |else
- |%stouch "$JOB_STAT_ROOT".fail
- |fi
- |""".stripMargin.format(jobStatusDir, touchDone, touchFail))
-
- this.postExec = IOUtils.writeTempFile(postExec.toString, ".postExec", "", jobStatusDir)
- }
}
diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
index 4d6d2734d..1e8ba68d0 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala
@@ -14,9 +14,6 @@ trait CommandLineFunction extends QFunction with Logging {
/** Upper memory limit */
var memoryLimit: Option[Int] = None
- /** Whether a job is restartable */
- var jobRestartable = true
-
/** Job project to run the command */
var jobProject: String = _
diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
index ba7eb247f..b177353ab 100644
--- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
+++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala
@@ -34,6 +34,9 @@ trait QFunction extends Logging {
/** Temporary directory to write any files */
var jobTempDir: File = null
+ /** Directories to mount via 'cd ' before running the command. */
+ var mountDirectories: Set[File] = Set.empty[File]
+
/** Order the function was added to the graph. */
var addOrder: List[Int] = Nil
@@ -44,6 +47,9 @@ trait QFunction extends Logging {
*/
var jobLimitSeconds: Option[Int] = None
+ /** Whether a job is restartable */
+ var jobRestartable = true
+
/**
* A callback for modifying the run.
* NOTE: This function is for ADVANCED use only and is unsupported.
@@ -302,6 +308,8 @@ trait QFunction extends Logging {
// Do not set the temp dir relative to the command directory
jobTempDir = IOUtils.absolute(jobTempDir)
+ mountDirectories ++= qSettings.mountDirectories
+
absoluteCommandDirectory()
}
diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala
index 6d3e4d63d..d334c44e9 100644
--- a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala
+++ b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala
@@ -14,6 +14,14 @@ class FullCallingPipelineTest extends BaseTest {
private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
+ // Explicit directories known to cause problems due to automount failures, and not detected by @Input / @Outputs
+ private final val mountDirectories = Set("/broad/software")
+
+ // In fullCallingPipeline.q VariantEval is always compared against 129.
+ // Until the newvarianteval is finalized which will allow java import of the prior results,
+ // we re-run VariantEval to validate the run, and replicate that behavior here.
+ private final val variantEvalDbsnpFile = new File(BaseTest.b37dbSNP129)
+
val k1gChr20Dataset = {
val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20")
dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list"))
@@ -42,7 +50,6 @@ class FullCallingPipelineTest extends BaseTest {
dataset.validations :+= new DoubleValidation("eval.dbsnp.all.called.novel.titv.tiTvRatio", 3.0340)
dataset.jobQueue = "gsa"
- dataset.bigMemQueue = "gsa"
dataset
}
@@ -51,7 +58,8 @@ class FullCallingPipelineTest extends BaseTest {
val project = new PipelineProject
project.setName(projectName)
project.setReferenceFile(new File(BaseTest.hg19Reference))
- project.setDbsnpFile(new File(BaseTest.b37dbSNP129))
+ project.setDbsnpFile(new File(BaseTest.b37dbSNP132))
+ project.setRefseqTable(new File(BaseTest.hg19Refseq))
val squid = "C426"
val ids = List(
@@ -72,8 +80,6 @@ class FullCallingPipelineTest extends BaseTest {
val dataset = new PipelineDataset
dataset.pipeline = pipeline
- dataset.refseq = BaseTest.hg19Refseq
- dataset.targetTiTv = "3.0"
dataset
}
@@ -92,10 +98,10 @@ class FullCallingPipelineTest extends BaseTest {
// Run the pipeline with the expected inputs.
val currentDir = new File(".").getAbsolutePath
var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" +
- " -jobProject %s -Y %s -refseqTable %s" +
+ " -jobProject %s -Y %s" +
" -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" +
" --gatkjar %s/dist/GenomeAnalysisTK.jar")
- .format(projectName, yamlFile, dataset.refseq, currentDir, currentDir)
+ .format(projectName, yamlFile, currentDir, currentDir)
if (!dataset.runIndelRealigner) {
pipelineCommand += " -skipCleaning"
@@ -105,8 +111,8 @@ class FullCallingPipelineTest extends BaseTest {
if (dataset.jobQueue != null)
pipelineCommand += " -jobQueue " + dataset.jobQueue
- if (dataset.bigMemQueue != null)
- pipelineCommand += " -bigMemQueue " + dataset.bigMemQueue
+ for (dir <- mountDirectories)
+ pipelineCommand += " -mountDir " + dir
// Run the test, at least checking if the command compiles
PipelineTest.executeTest(testName, pipelineCommand, null)
@@ -128,7 +134,7 @@ class FullCallingPipelineTest extends BaseTest {
var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" +
" -E %s -reportType R -reportLocation %s -L %s")
.format(
- dataset.pipeline.getProject.getReferenceFile, dataset.pipeline.getProject.getDbsnpFile, handFilteredVcf,
+ dataset.pipeline.getProject.getReferenceFile, variantEvalDbsnpFile, handFilteredVcf,
evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList)
for (validation <- dataset.validations) {
@@ -143,11 +149,8 @@ class FullCallingPipelineTest extends BaseTest {
class PipelineDataset(
var pipeline: Pipeline = null,
- var refseq: String = null,
- var targetTiTv: String = null,
var validations: List[PipelineValidation] = Nil,
var jobQueue: String = null,
- var bigMemQueue: String = null,
var runIndelRealigner: Boolean = false) {
override def toString = pipeline.getProject.getName
}
diff --git a/shell/queue.sh b/shell/queue.sh
index 278137387..c34b920bb 100755
--- a/shell/queue.sh
+++ b/shell/queue.sh
@@ -5,7 +5,6 @@ STING_HOME=/humgen/gsa-firehose2/pipeline/repositories/StingProduction
REFSEQ_TABLE=/humgen/gsa-hpprojects/GATK/data/Annotations/refseq/refGene-big-table-hg19.txt
JOB_QUEUE=gsa
SHORT_QUEUE=gsa
-BIG_QUEUE=gsa
TMP_DIR=$PWD/tmp
mkdir $PWD/tmp
@@ -15,4 +14,4 @@ use R-2.10
use Oracle-full-client
use .cx-oracle-5.0.2-python-2.6.5-oracle-full-client-11.1
-java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -bigMemQueue "$BIG_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -log queue_log.txt -statusTo corin -bsub $2
+java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -mountDir /broad/software -log queue_log.txt -statusTo corin -bsub $2