Due to recent LSF hiccups put a very brief (.5-2min) retry around getting status. Can't wait too long because statuses are archived an hour after exit.
TODO: Switch to bulk status checks and add status archive lookups. Sending SIGTERM(15) instead of SIGKILL(9) to allow for graceful termination of child process. Printing out the name of the QScripts in the compile error text. Added a pipelineretry -PR pass through for the MFCP and MFCPTest. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5295 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
07d381ec51
commit
f1f9bd6dcc
|
|
@ -28,6 +28,9 @@ class MultiFullCallingPipeline extends QScript {
|
|||
@Argument(doc="pipeline priority", shortName="PP", required = false)
|
||||
var pipelinePriority: Option[Int] = None
|
||||
|
||||
@Argument(doc="pipeline retry", shortName="PR", required = false)
|
||||
var pipelineRetry: Option[Int] = None
|
||||
|
||||
@Argument(doc="run with -tearScript", shortName="TS")
|
||||
var runWithTearScript = false
|
||||
|
||||
|
|
@ -85,6 +88,7 @@ class MultiFullCallingPipeline extends QScript {
|
|||
optional(" -jobQueue ", qscript.pipelineJobQueue) +
|
||||
optional(" -shortJobQueue ", qscript.pipelineShortQueue) +
|
||||
optional(" -jobPriority ", qscript.pipelinePriority) +
|
||||
optional(" -retry ", qscript.pipelineRetry) +
|
||||
optional(" -tearScript ", tearScript) +
|
||||
" -S %s --gatkjar %s -jobProject %s -jobPrefix %s -Y %s -bsub -run"
|
||||
.format(pipelineScript, gatkJar, yamlName, yamlName, yamlFile)
|
||||
|
|
|
|||
|
|
@ -40,8 +40,8 @@ class QScriptManager() extends Logging {
|
|||
|
||||
reporter.printSummary()
|
||||
if (reporter.hasErrors) {
|
||||
val msg = "Compile failed with %d error%s".format(
|
||||
reporter.ERROR.count, plural(reporter.ERROR.count))
|
||||
val msg = "Compile of %s failed with %d error%s".format(
|
||||
scripts.mkString(", "), reporter.ERROR.count, plural(reporter.ERROR.count))
|
||||
throw new QException(msg)
|
||||
}
|
||||
else if (reporter.WARNING.count > 0)
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
|
|||
import org.broadinstitute.sting.utils.Utils
|
||||
import org.broadinstitute.sting.jna.clibrary.LibC
|
||||
import java.util.Date
|
||||
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit}
|
||||
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
|
||||
import com.sun.jna.ptr.IntByReference
|
||||
import com.sun.jna.{StringArray, NativeLong, Memory}
|
||||
import com.sun.jna.{StringArray, NativeLong}
|
||||
|
||||
/**
|
||||
* Runs jobs on an LSF compute cluster.
|
||||
|
|
@ -107,9 +107,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
|
|||
var exitInfo = 0
|
||||
var endTime: NativeLong = null
|
||||
|
||||
val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
|
||||
if (result < 0)
|
||||
throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId))
|
||||
var result = 0
|
||||
Retry.attempt(() => {
|
||||
result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
|
||||
if (result < 0)
|
||||
throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId))
|
||||
}, 0.5, 1, 2)
|
||||
try {
|
||||
if (result > 1)
|
||||
throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId))
|
||||
|
|
@ -179,6 +182,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
|
|||
|
||||
object Lsf706JobRunner extends Logging {
|
||||
private val lsfLibLock = new Object
|
||||
private val SIGTERM = 15
|
||||
|
||||
init()
|
||||
|
||||
|
|
@ -244,23 +248,15 @@ object Lsf706JobRunner extends Logging {
|
|||
*/
|
||||
def tryStop(runners: List[Lsf706JobRunner]) {
|
||||
lsfLibLock.synchronized {
|
||||
for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) {
|
||||
// lsb_killbulkjobs does not seem to forward SIGTERM,
|
||||
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
|
||||
for (jobRunner <- runners.filterNot(_.jobId < 0)) {
|
||||
try {
|
||||
val njobs = jobRunners.size
|
||||
val signalJobs = new signalBulkJobs
|
||||
signalJobs.jobs = {
|
||||
val jobIds = new Memory(8 * njobs)
|
||||
jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs)
|
||||
jobIds
|
||||
}
|
||||
signalJobs.njobs = njobs
|
||||
signalJobs.signal = 9
|
||||
|
||||
if (LibBat.lsb_killbulkjobs(signalJobs) < 0)
|
||||
throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed"))
|
||||
if (LibBat.lsb_signaljob(jobRunner.jobId, SIGTERM) < 0)
|
||||
logger.error(LibBat.lsb_sperror("Unable to kill job " + jobRunner.jobId))
|
||||
} catch {
|
||||
case e =>
|
||||
logger.error("Unable to kill all jobs.", e)
|
||||
logger.error("Unable to kill job " + jobRunner.jobId, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class MultiFullCallingPipelineTest {
|
|||
|
||||
// Run the pipeline with the expected inputs.
|
||||
val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" +
|
||||
" -jobProject %s -YL %s -PJQ %s -stingHome %s")
|
||||
" -jobProject %s -YL %s -PJQ %s -PR 2 -stingHome %s")
|
||||
.format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir)
|
||||
|
||||
val pipelineSpec = new PipelineTestSpec
|
||||
|
|
|
|||
Loading…
Reference in New Issue