For the fullCallingPipeline.q:

- Reading the refseq table from the YAML if not specified on the command line.
 - Removed obsolete -bigMemQueue now that CombineVariants runs in 4g.
 - Added a -mountDir /broad/software option to work around adpr automount issues.
 - Merged the LSF preexec used for automount into the shell script used to execute tasks.
 - Using the LSF C Library to determine when jobs are complete instead of postexec.
 - Updated queue.sh to match the changes above.
 - Updated the FCPTest to match the changes above.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5036 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-01-20 22:34:43 +00:00
parent 8831ec3dce
commit 8855f080c2
13 changed files with 231 additions and 182 deletions

View File

@ -25,6 +25,7 @@
package org.broadinstitute.sting.jna.clibrary; package org.broadinstitute.sting.jna.clibrary;
import com.sun.jna.*; import com.sun.jna.*;
import com.sun.jna.ptr.NativeLongByReference;
/** /**
* Sparse port of the Standard C Library libc -lc. * Sparse port of the Standard C Library libc -lc.
@ -176,4 +177,20 @@ public class LibC {
public NativeLong tv_sec; public NativeLong tv_sec;
public NativeLong tv_usec; public NativeLong tv_usec;
} }
/**
* The time() function returns the value of time in seconds since 0 hours, 0 minutes, 0 seconds, January 1, 1970, Coordinated Universal Time, without including leap seconds. If an error occurs, time() returns the value (time_t)-1.
* The return value is also stored in *tloc, provided that t is non-null.
* @param t the value of time in seconds, provided that t is non-null.
* @return the value of time in seconds
*/
public static native NativeLong time(NativeLongByReference t);
/**
* Returns the difference between two calendar times, (time1 - time0), expressed in seconds.
* @param time1 Time 1
* @param time0 Time 0
* @return the difference between two calendar times, (time1 - time0), expressed in seconds.
*/
public static native double difftime(NativeLong time1, NativeLong time0);
} }

View File

@ -620,9 +620,9 @@ public class Utils {
seqOut[i] = (byte)seqIn[i]; seqOut[i] = (byte)seqIn[i];
} }
return seqOut; return seqOut;
} }
public static boolean isFlagSet(int value, int flag) {
return ((value & flag) == flag);
}
} }

View File

@ -24,6 +24,8 @@
package org.broadinstitute.sting.jna.clibrary; package org.broadinstitute.sting.jna.clibrary;
import com.sun.jna.NativeLong;
import com.sun.jna.ptr.NativeLongByReference;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -40,4 +42,28 @@ public class LibCUnitTest extends BaseTest {
Assert.assertEquals(LibC.unsetenv(testProperty), 0); Assert.assertEquals(LibC.unsetenv(testProperty), 0);
Assert.assertEquals(LibC.getenv(testProperty), null); Assert.assertEquals(LibC.getenv(testProperty), null);
} }
@Test
public void testDifftime() throws Exception {
// Pointer to hold the times
NativeLongByReference ref = new NativeLongByReference();
// time() returns -1 on error.
NativeLong err = new NativeLong(-1L);
LibC.time(ref);
NativeLong time0 = ref.getValue();
Assert.assertNotSame(time0, err, "Time 0 returned an error (-1).");
Thread.sleep(5000L);
LibC.time(ref);
NativeLong time1 = ref.getValue();
Assert.assertNotSame(time1, err, "Time 1 returned an error (-1).");
Assert.assertNotSame(time1, time0, "Time 1 returned same time as Time 0.");
double diff = LibC.difftime(time1, time0);
Assert.assertTrue(diff >= 5, "Time difference was not greater than 5 seconds: " + diff);
}
} }

View File

@ -26,6 +26,7 @@ package org.broadinstitute.sting.jna.lsf.v7_0_6;
import com.sun.jna.ptr.IntByReference; import com.sun.jna.ptr.IntByReference;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
@ -74,7 +75,7 @@ public class LibBatIntegrationTest extends BaseTest {
System.out.println("Waiting for job to run: " + jobId); System.out.println("Waiting for job to run: " + jobId);
int jobStatus = LibBat.JOB_STAT_PEND; int jobStatus = LibBat.JOB_STAT_PEND;
while (isSet(jobStatus, LibBat.JOB_STAT_PEND) || isSet(jobStatus, LibBat.JOB_STAT_RUN)) { while (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_PEND) || Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_RUN)) {
Thread.sleep(30 * 1000L); Thread.sleep(30 * 1000L);
int numJobs = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB); int numJobs = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB);
@ -92,15 +93,11 @@ public class LibBatIntegrationTest extends BaseTest {
LibBat.lsb_closejobinfo(); LibBat.lsb_closejobinfo();
} }
} }
Assert.assertTrue(isSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus)); Assert.assertTrue(Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE), String.format("Unexpected job status: 0x%02x", jobStatus));
Assert.assertTrue(FileUtils.waitFor(outFile, 120), "File not found: " + outFile.getAbsolutePath()); Assert.assertTrue(FileUtils.waitFor(outFile, 120), "File not found: " + outFile.getAbsolutePath());
Assert.assertTrue(outFile.delete(), "Unable to delete " + outFile.getAbsolutePath()); Assert.assertTrue(outFile.delete(), "Unable to delete " + outFile.getAbsolutePath());
Assert.assertEquals(reply.queue, req.queue, "LSF reply queue does not match requested queue."); Assert.assertEquals(reply.queue, req.queue, "LSF reply queue does not match requested queue.");
System.out.println("Validating that we reached the end of the test without exit."); System.out.println("Validating that we reached the end of the test without exit.");
} }
private static boolean isSet(int value, int flag) {
return ((value & flag) == flag);
}
} }

View File

@ -10,6 +10,7 @@ import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, Cl
import org.broadinstitute.sting.queue.util.IOUtils import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.QScript
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.utils.exceptions.UserException
import org.broadinstitute.sting.utils.interval.IntervalUtils import org.broadinstitute.sting.utils.interval.IntervalUtils
import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType
@ -23,7 +24,8 @@ class fullCallingPipeline extends QScript {
@Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false) @Input(doc="path to trigger track (for UnifiedGenotyper)", shortName="trigger", required=false)
var trigger: File = _ var trigger: File = _
@Input(doc="path to refseqTable (for GenomicAnnotator)", shortName="refseqTable") // TODO: Fix command lines that pass -refseqTable
@Input(doc="path to refseqTable (for GenomicAnnotator) if not present in the YAML", shortName="refseqTable", required=false)
var refseqTable: File = _ var refseqTable: File = _
@Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false) @Input(doc="path to Picard FixMateInformation.jar. See http://picard.sourceforge.net/ .", required=false)
@ -56,7 +58,8 @@ class fullCallingPipeline extends QScript {
//@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection") //@Input(doc="Sequencing experiement type (for use by adpr)--Whole_Exome, Whole_Genome, or Hybrid_Selection")
//var protocol: String = _ //var protocol: String = _
@Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false) // TODO: Fix command lines that pass -bigMemQueue
@Argument(doc="Unused", shortName="bigMemQueue", required=false)
var big_mem_queue: String = _ var big_mem_queue: String = _
@Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false) @Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false)
@ -86,8 +89,10 @@ class fullCallingPipeline extends QScript {
dbsnpType = "VCF" dbsnpType = "VCF"
} }
val projectBase: String = qscript.pipeline.getProject.getName val projectBase: String = qscript.pipeline.getProject.getName
// TODO: Fix command lines that pass -refseqTable
if (qscript.refseqTable != null)
qscript.pipeline.getProject.setRefseqTable(qscript.refseqTable)
if (qscript.skip_cleaning) { if (qscript.skip_cleaning) {
//endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind) //endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind)
@ -288,13 +293,12 @@ class fullCallingPipeline extends QScript {
mergeIndels.rodBind = indelCallFiles mergeIndels.rodBind = indelCallFiles
mergeIndels.analysisName = base+"_MergeIndels" mergeIndels.analysisName = base+"_MergeIndels"
mergeIndels.memoryLimit = Some(4) mergeIndels.memoryLimit = Some(4)
mergeIndels.jobQueue = qscript.big_mem_queue
// 1b. genomically annotate SNPs -- no longer slow // 1b. genomically annotate SNPs -- no longer slow
val annotated = new GenomicAnnotator with CommandLineGATKArgs val annotated = new GenomicAnnotator with CommandLineGATKArgs
annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out") annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out")
annotated.rodBind :+= RodBind("variant", "VCF", snps.out) annotated.rodBind :+= RodBind("variant", "VCF", snps.out)
annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.refseqTable) annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.pipeline.getProject.getRefseqTable)
//annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable) //annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable)
annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf") annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf")
//annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet" //annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet"

View File

@ -29,6 +29,9 @@ class QSettings {
@Argument(fullName="temp_directory", shortName="tempDir", doc="Temp directory to pass to functions.", required=false) @Argument(fullName="temp_directory", shortName="tempDir", doc="Temp directory to pass to functions.", required=false)
var tempDirectory = new File(System.getProperty("java.io.tmpdir")) var tempDirectory = new File(System.getProperty("java.io.tmpdir"))
@Argument(fullName="mount_directory", shortName="mountDir", doc="Extra directory to automount via 'cd <dir>' before running functions.", required=false)
var mountDirectories: Set[File] = Set.empty[File]
@ArgumentCollection @ArgumentCollection
val emailSettings = new EmailSettings val emailSettings = new EmailSettings
} }

View File

@ -1,24 +1,45 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.IOUtils
import java.io.File import java.io.File
import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
/** /**
* Runs a command line function. * Runs a command line function.
*/ */
trait CommandLineJobRunner extends JobRunner[CommandLineFunction] { trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** A generated exec shell script. */ /** A generated exec shell script. */
protected var exec: File = _ protected var exec: File = _
/** Which directory to use for the job status files. */ /** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir protected def jobStatusDir = function.jobTempDir
/** The last time the status returned unknown. */
protected var firstUnknownTime: Option[Long] = None
/** Amount of time the status can return unknown before giving up. */
protected val unknownStatusMaxSeconds = 5 * 60
/** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
protected val retryExpiredSeconds = 5 * 60
/** /**
* Writes the function command line to an exec file. * Writes the function command line to an exec file.
*/ */
protected def writeExec() { protected def writeExec() {
this.exec = IOUtils.writeTempFile(function.commandLine, ".exec", "", jobStatusDir) var exec = new StringBuilder
var dirs = function.mountDirectories
for (dir <- function.jobDirectories)
dirs += IOUtils.dirLevel(dir, 2)
if (dirs.size > 0) {
// prepend "cd '<dir_1>' [&& cd '<dir_n>']" to automount the directories.
exec.append(dirs.mkString("cd '", "' && cd '", "'"))
exec.append(" && cd '%s' && \\%n".format(function.commandDirectory))
}
exec.append(function.commandLine)
this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
} }
/** /**
@ -29,17 +50,16 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] {
} }
/** /**
* Builds a command line that can be run to force an automount of the directories. * Outputs the last lines of the error logs.
* @param function Function to look jobDirectories.
* @return A "cd '<dir_1>' [&& cd '<dir_n>']" command.
*/ */
protected def mountCommand(function: CommandLineFunction) = { protected def tailError() = {
var dirs = Set.empty[File] val errorFile = functionErrorFile
for (dir <- function.jobDirectories) if (IOUtils.waitFor(errorFile, 120)) {
dirs += IOUtils.dirLevel(dir, 2) val tailLines = IOUtils.tail(errorFile, 100)
if (dirs.size > 0) val nl = "%n".format()
Some(dirs.mkString("cd '", "' && cd '", "'")) logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
else } else {
None logger.error("Unable to access log file: %s".format(errorFile))
}
} }
} }

View File

@ -1,12 +1,15 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import java.io.File import java.io.File
import com.sun.jna.Memory
import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit}
import org.broadinstitute.sting.utils.Utils
import com.sun.jna.{NativeLong, Memory}
import org.broadinstitute.sting.jna.clibrary.LibC
import java.util.Date
/** /**
* Runs jobs on an LSF compute cluster. * Runs jobs on an LSF compute cluster.
@ -70,14 +73,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
writeExec() writeExec()
request.command = "sh " + exec request.command = "sh " + exec
writePreExec()
request.preExecCmd = "sh " + preExec
request.options |= LibBat.SUB_PRE_EXEC
writePostExec()
request.postExecCmd = "sh " + postExec
request.options3 |= LibBat.SUB3_POST_EXEC
// Allow advanced users to update the request. // Allow advanced users to update the request.
updateJobRun(request) updateJobRun(request)
@ -98,7 +93,6 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
if (jobId < 0) if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job")) throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10) }, 1, 5, 10)
jobStatusPath = IOUtils.absolute(new File(jobStatusDir, "." + jobId)).toString
logger.info("Submitted LSF job id: " + jobId) logger.info("Submitted LSF job id: " + jobId)
} catch { } catch {
case e => case e =>
@ -113,6 +107,111 @@ class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(functi
logger.error("Error: " + bsubCommand, e) logger.error("Error: " + bsubCommand, e)
} }
} }
/**
* Updates and returns the status.
*/
def status = {
try {
var jobStatus = LibBat.JOB_STAT_NULL
var exitStatus = 0
var exitInfo = 0
var endTime: NativeLong = null
LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
try {
val jobInfo = LibBat.lsb_readjobinfo(null)
if (jobInfo == null) {
jobStatus = LibBat.JOB_STAT_UNKWN
exitStatus = 0
exitInfo = 0
endTime = null
} else {
jobStatus = jobInfo.status
exitStatus = jobInfo.exitStatus
exitInfo = jobInfo.exitInfo
endTime = jobInfo.endTime
}
} finally {
LibBat.lsb_closejobinfo()
}
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
}
logger.error("Error: " + bsubCommand + ", unknown status for " + unknownStatusMaxSeconds + " seconds.")
}
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
}
logger.error("Error: " + bsubCommand)
tailError()
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
removeTemporaryFiles()
function.doneOutputs.foreach(_.createNewFile())
runStatus = RunnerStatus.DONE
logger.info("Done: " + bsubCommand)
}
}
} catch {
case e =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + bsubCommand, e)
}
runStatus
}
/**
* Returns true if LSF is expected to retry running the function.
* @param exitInfo The reason the job exited.
* @param endTime THe time the job exited.
* @return true if LSF is expected to retry running the function.
*/
private def willRetry(exitInfo: Int, endTime: NativeLong) = {
exitInfo match {
case LibBat.EXIT_NORMAL => false
case _ => {
val seconds = LibC.difftime(LibC.time(null), endTime)
(seconds <= retryExpiredSeconds)
}
}
}
} }
object Lsf706JobRunner extends Logging { object Lsf706JobRunner extends Logging {

View File

@ -1,6 +1,5 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import java.io.File
import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.util._
@ -10,132 +9,9 @@ import org.broadinstitute.sting.queue.util._
abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
protected var runStatus: RunnerStatus.Value = _ protected var runStatus: RunnerStatus.Value = _
/** Job Id of the currently executing job. */
var jobId = -1L var jobId = -1L
/** A file to look for to validate that the function ran to completion. */
protected var jobStatusPath: String = _
/** A temporary job done file to let Queue know that the process ran successfully. */
private lazy val jobDoneFile = new File(jobStatusPath + ".done")
/** A temporary job done file to let Queue know that the process exited with an error. */
private lazy val jobFailFile = new File(jobStatusPath + ".fail")
/** A generated pre-exec shell script. */
protected var preExec: File = _
/** A generated post-exec shell script. */
protected var postExec: File = _
// TODO: Full bsub command for debugging. // TODO: Full bsub command for debugging.
protected def bsubCommand = "bsub " + function.commandLine protected def bsubCommand = "bsub " + function.commandLine
/**
* Updates and returns the status by looking for job status files.
* After the job status files are detected they are cleaned up from
* the file system and the status is cached.
*
* Note, these temporary job status files are currently different from the
* .done files used to determine if a file has been created successfully.
*/
def status = {
try {
if (logger.isDebugEnabled) {
logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists))
logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists))
}
if (jobFailFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
logger.info("Error: " + bsubCommand)
tailError()
} else if (jobDoneFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.DONE
logger.info("Done: " + bsubCommand)
}
} catch {
case e =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + bsubCommand, e)
}
runStatus
}
/**
* Removes all temporary files used for this LSF job.
*/
override def removeTemporaryFiles() = {
super.removeTemporaryFiles()
IOUtils.tryDelete(preExec)
IOUtils.tryDelete(postExec)
IOUtils.tryDelete(jobDoneFile)
IOUtils.tryDelete(jobFailFile)
}
/**
* Outputs the last lines of the error logs.
*/
protected def tailError() = {
val errorFile = functionErrorFile
if (IOUtils.waitFor(errorFile, 120)) {
val tailLines = IOUtils.tail(errorFile, 100)
val nl = "%n".format()
logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
} else {
logger.error("Unable to access log file: %s".format(errorFile))
}
}
/**
* Writes a pre-exec file to cleanup any status files and
* optionally mount any automount directories on the node.
*/
protected def writePreExec() {
val preExec = new StringBuilder
preExec.append("rm -f '%s/'.$LSB_JOBID.done%n".format(jobStatusDir))
function.doneOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file)))
preExec.append("rm -f '%s/'.$LSB_JOBID.fail%n".format(jobStatusDir))
function.failOutputs.foreach(file => preExec.append("rm -f '%s'%n".format(file)))
mountCommand(function).foreach(command =>
preExec.append("%s%n".format(command)))
this.preExec = IOUtils.writeTempFile(preExec.toString, ".preExec", "", jobStatusDir)
}
/**
* Writes a post-exec file to create the status files.
*/
protected def writePostExec() {
val postExec = new StringBuilder
val touchDone = function.doneOutputs.map("touch '%s'%n".format(_)).mkString
val touchFail = function.failOutputs.map("touch '%s'%n".format(_)).mkString
postExec.append("""|
|if [ "${LSB_JOBPEND:-unset}" != "unset" ]; then
| exit 0
|fi
|
|JOB_STAT_ROOT='%s/'.$LSB_JOBID
|if [ "$LSB_JOBEXIT_STAT" == "0" ]; then
|%stouch "$JOB_STAT_ROOT".done
|else
|%stouch "$JOB_STAT_ROOT".fail
|fi
|""".stripMargin.format(jobStatusDir, touchDone, touchFail))
this.postExec = IOUtils.writeTempFile(postExec.toString, ".postExec", "", jobStatusDir)
}
} }

View File

@ -14,9 +14,6 @@ trait CommandLineFunction extends QFunction with Logging {
/** Upper memory limit */ /** Upper memory limit */
var memoryLimit: Option[Int] = None var memoryLimit: Option[Int] = None
/** Whether a job is restartable */
var jobRestartable = true
/** Job project to run the command */ /** Job project to run the command */
var jobProject: String = _ var jobProject: String = _

View File

@ -34,6 +34,9 @@ trait QFunction extends Logging {
/** Temporary directory to write any files */ /** Temporary directory to write any files */
var jobTempDir: File = null var jobTempDir: File = null
/** Directories to mount via 'cd <dir>' before running the command. */
var mountDirectories: Set[File] = Set.empty[File]
/** Order the function was added to the graph. */ /** Order the function was added to the graph. */
var addOrder: List[Int] = Nil var addOrder: List[Int] = Nil
@ -44,6 +47,9 @@ trait QFunction extends Logging {
*/ */
var jobLimitSeconds: Option[Int] = None var jobLimitSeconds: Option[Int] = None
/** Whether a job is restartable */
var jobRestartable = true
/** /**
* A callback for modifying the run. * A callback for modifying the run.
* NOTE: This function is for ADVANCED use only and is unsupported. * NOTE: This function is for ADVANCED use only and is unsupported.
@ -302,6 +308,8 @@ trait QFunction extends Logging {
// Do not set the temp dir relative to the command directory // Do not set the temp dir relative to the command directory
jobTempDir = IOUtils.absolute(jobTempDir) jobTempDir = IOUtils.absolute(jobTempDir)
mountDirectories ++= qSettings.mountDirectories
absoluteCommandDirectory() absoluteCommandDirectory()
} }

View File

@ -14,6 +14,14 @@ class FullCallingPipelineTest extends BaseTest {
private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
// Explicit directories known to cause problems due to automount failures, and not detected by @Input / @Outputs
private final val mountDirectories = Set("/broad/software")
// In fullCallingPipeline.q VariantEval is always compared against 129.
// Until the newvarianteval is finalized which will allow java import of the prior results,
// we re-run VariantEval to validate the run, and replicate that behavior here.
private final val variantEvalDbsnpFile = new File(BaseTest.b37dbSNP129)
val k1gChr20Dataset = { val k1gChr20Dataset = {
val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20") val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20")
dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list")) dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list"))
@ -42,7 +50,6 @@ class FullCallingPipelineTest extends BaseTest {
dataset.validations :+= new DoubleValidation("eval.dbsnp.all.called.novel.titv.tiTvRatio", 3.0340) dataset.validations :+= new DoubleValidation("eval.dbsnp.all.called.novel.titv.tiTvRatio", 3.0340)
dataset.jobQueue = "gsa" dataset.jobQueue = "gsa"
dataset.bigMemQueue = "gsa"
dataset dataset
} }
@ -51,7 +58,8 @@ class FullCallingPipelineTest extends BaseTest {
val project = new PipelineProject val project = new PipelineProject
project.setName(projectName) project.setName(projectName)
project.setReferenceFile(new File(BaseTest.hg19Reference)) project.setReferenceFile(new File(BaseTest.hg19Reference))
project.setDbsnpFile(new File(BaseTest.b37dbSNP129)) project.setDbsnpFile(new File(BaseTest.b37dbSNP132))
project.setRefseqTable(new File(BaseTest.hg19Refseq))
val squid = "C426" val squid = "C426"
val ids = List( val ids = List(
@ -72,8 +80,6 @@ class FullCallingPipelineTest extends BaseTest {
val dataset = new PipelineDataset val dataset = new PipelineDataset
dataset.pipeline = pipeline dataset.pipeline = pipeline
dataset.refseq = BaseTest.hg19Refseq
dataset.targetTiTv = "3.0"
dataset dataset
} }
@ -92,10 +98,10 @@ class FullCallingPipelineTest extends BaseTest {
// Run the pipeline with the expected inputs. // Run the pipeline with the expected inputs.
val currentDir = new File(".").getAbsolutePath val currentDir = new File(".").getAbsolutePath
var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" + var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" +
" -jobProject %s -Y %s -refseqTable %s" + " -jobProject %s -Y %s" +
" -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" + " -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" +
" --gatkjar %s/dist/GenomeAnalysisTK.jar") " --gatkjar %s/dist/GenomeAnalysisTK.jar")
.format(projectName, yamlFile, dataset.refseq, currentDir, currentDir) .format(projectName, yamlFile, currentDir, currentDir)
if (!dataset.runIndelRealigner) { if (!dataset.runIndelRealigner) {
pipelineCommand += " -skipCleaning" pipelineCommand += " -skipCleaning"
@ -105,8 +111,8 @@ class FullCallingPipelineTest extends BaseTest {
if (dataset.jobQueue != null) if (dataset.jobQueue != null)
pipelineCommand += " -jobQueue " + dataset.jobQueue pipelineCommand += " -jobQueue " + dataset.jobQueue
if (dataset.bigMemQueue != null) for (dir <- mountDirectories)
pipelineCommand += " -bigMemQueue " + dataset.bigMemQueue pipelineCommand += " -mountDir " + dir
// Run the test, at least checking if the command compiles // Run the test, at least checking if the command compiles
PipelineTest.executeTest(testName, pipelineCommand, null) PipelineTest.executeTest(testName, pipelineCommand, null)
@ -128,7 +134,7 @@ class FullCallingPipelineTest extends BaseTest {
var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" + var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" +
" -E %s -reportType R -reportLocation %s -L %s") " -E %s -reportType R -reportLocation %s -L %s")
.format( .format(
dataset.pipeline.getProject.getReferenceFile, dataset.pipeline.getProject.getDbsnpFile, handFilteredVcf, dataset.pipeline.getProject.getReferenceFile, variantEvalDbsnpFile, handFilteredVcf,
evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList) evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList)
for (validation <- dataset.validations) { for (validation <- dataset.validations) {
@ -143,11 +149,8 @@ class FullCallingPipelineTest extends BaseTest {
class PipelineDataset( class PipelineDataset(
var pipeline: Pipeline = null, var pipeline: Pipeline = null,
var refseq: String = null,
var targetTiTv: String = null,
var validations: List[PipelineValidation] = Nil, var validations: List[PipelineValidation] = Nil,
var jobQueue: String = null, var jobQueue: String = null,
var bigMemQueue: String = null,
var runIndelRealigner: Boolean = false) { var runIndelRealigner: Boolean = false) {
override def toString = pipeline.getProject.getName override def toString = pipeline.getProject.getName
} }

View File

@ -5,7 +5,6 @@ STING_HOME=/humgen/gsa-firehose2/pipeline/repositories/StingProduction
REFSEQ_TABLE=/humgen/gsa-hpprojects/GATK/data/Annotations/refseq/refGene-big-table-hg19.txt REFSEQ_TABLE=/humgen/gsa-hpprojects/GATK/data/Annotations/refseq/refGene-big-table-hg19.txt
JOB_QUEUE=gsa JOB_QUEUE=gsa
SHORT_QUEUE=gsa SHORT_QUEUE=gsa
BIG_QUEUE=gsa
TMP_DIR=$PWD/tmp TMP_DIR=$PWD/tmp
mkdir $PWD/tmp mkdir $PWD/tmp
@ -15,4 +14,4 @@ use R-2.10
use Oracle-full-client use Oracle-full-client
use .cx-oracle-5.0.2-python-2.6.5-oracle-full-client-11.1 use .cx-oracle-5.0.2-python-2.6.5-oracle-full-client-11.1
java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -bigMemQueue "$BIG_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -log queue_log.txt -statusTo corin -bsub $2 java -Djava.io.tmpdir="$TMP_DIR" -jar "$STING_HOME"/dist/Queue.jar -jobQueue "$JOB_QUEUE" -shortJobQueue "$SHORT_QUEUE" -jobProject "$SAMPLE_SET" -jobPrefix "$SAMPLE_SET" -tearScript ~/Sting/R/DataProcessingReport/GetTearsheetStats.R -S ~/Sting/scala/qscript/fullCallingPipeline.q -Y "$SAMPLE_SET".yaml -refseqTable "$REFSEQ_TABLE" --gatkjar "$STING_HOME"/dist/GenomeAnalysisTK.jar -mountDir /broad/software -log queue_log.txt -statusTo corin -bsub $2