From 97d29f203e35fd98393f61a28e78134da2b84755 Mon Sep 17 00:00:00 2001 From: Joel Thibault Date: Fri, 12 Oct 2012 17:16:56 -0400 Subject: [PATCH] Add walltime changes to LSF - Check whether the specified attribute is available - Add pipeline test (disabled due to missing attribute) --- .../sting/jna/drmaa/v1_0/JnaSession.java | 18 ++++++++++++++---- .../broadinstitute/sting/queue/QSettings.scala | 8 ++++---- .../queue/engine/drmaa/DrmaaJobRunner.scala | 2 +- .../queue/engine/lsf/Lsf706JobRunner.scala | 7 +++++-- .../queue/function/CommandLineFunction.scala | 2 +- .../examples/HelloWorldPipelineTest.scala | 11 +++++++++++ 6 files changed, 36 insertions(+), 12 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/jna/drmaa/v1_0/JnaSession.java b/public/java/src/org/broadinstitute/sting/jna/drmaa/v1_0/JnaSession.java index 480113e1e..830c6590d 100644 --- a/public/java/src/org/broadinstitute/sting/jna/drmaa/v1_0/JnaSession.java +++ b/public/java/src/org/broadinstitute/sting/jna/drmaa/v1_0/JnaSession.java @@ -210,13 +210,23 @@ public class JnaSession implements Session { } public static void setAttribute(Pointer jt, String name, String value) throws DrmaaException { - checkError(LibDrmaa.drmaa_set_attribute(jt, name, value, getError(), LibDrmaa.DRMAA_ERROR_STRING_BUFFER_LEN)); + if (getAttrNames().contains(name)) { + checkError(LibDrmaa.drmaa_set_attribute(jt, name, value, getError(), LibDrmaa.DRMAA_ERROR_STRING_BUFFER_LEN)); + } + else { + throw new InvalidAttributeValueException("Attribute " + name + " is not supported by this implementation of DRMAA"); + } } public static String getAttribute(Pointer jt, String name) throws DrmaaException { - Memory attrBuffer = new Memory(LibDrmaa.DRMAA_ATTR_BUFFER); - checkError(LibDrmaa.drmaa_get_attribute(jt, name, attrBuffer, LibDrmaa.DRMAA_ATTR_BUFFER_LEN, getError(), LibDrmaa.DRMAA_ERROR_STRING_BUFFER_LEN)); - return attrBuffer.getString(0); + if (getAttrNames().contains(name)) { + Memory attrBuffer = new Memory(LibDrmaa.DRMAA_ATTR_BUFFER); + checkError(LibDrmaa.drmaa_get_attribute(jt, name, attrBuffer, LibDrmaa.DRMAA_ATTR_BUFFER_LEN, getError(), LibDrmaa.DRMAA_ERROR_STRING_BUFFER_LEN)); + return attrBuffer.getString(0); + } + else { + throw new InvalidAttributeValueException("Attribute " + name + " is not supported by this implementation of DRMAA"); + } } public static void setVectorAttribute(Pointer jt, String name, Collection values) throws DrmaaException { diff --git a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala index fb21700ac..b1e98a0e2 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -31,10 +31,6 @@ import org.broadinstitute.sting.commandline.{ClassType, Argument} * Default settings settable on the command line and passed to CommandLineFunctions. */ class QSettings { - - @Argument(fullName="job_walltime", shortName="wallTime", doc="Setting the required walltime when using the drmaa job runner.", required=false) - var jobWalltime: Option[Long] = None - @Argument(fullName="run_name", shortName="runName", doc="A name for this run used for various status messages.", required=false) var runName: String = _ @@ -76,6 +72,10 @@ class QSettings { @Argument(fullName="resident_memory_request_parameter", shortName="resMemReqParam", doc="Parameter for resident memory requests. By default not requested.", required=false) var residentRequestParameter: String = _ + @Argument(fullName="job_walltime", shortName="wallTime", doc="Setting the required DRMAA walltime or LSF run limit.", required=false) + @ClassType(classOf[Long]) + var jobWalltime: Option[Long] = None + /** The name of the parallel environment (required for SGE, for example) */ @Argument(fullName="job_parallel_env", shortName="jobParaEnv", doc="An SGE style parallel environment to use for jobs requesting more than 1 core. Equivalent to submitting jobs with -pe ARG nt for jobs with nt > 1", required=false) var parallelEnvironmentName: String = "smp_pe" // Broad default diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala index 31b314c79..1dca22981 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala @@ -65,7 +65,7 @@ class DrmaaJobRunner(val session: Session, val function: CommandLineFunction) ex drmaaJob.setJoinFiles(true) } - if(function.wallTime != null) + if(!function.wallTime.isEmpty) drmaaJob.setHardWallclockTimeLimit(function.wallTime.get) drmaaJob.setNativeSpecification(functionNativeSpec) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index 2fbea1497..5dc126e49 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -151,8 +151,11 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR throw new QException("setOption_() returned -1 while setting esub"); } - // LSF specific: get the max runtime for the jobQueue and pass it for this job - request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) + if(!function.wallTime.isEmpty) + request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.wallTime.get.toInt + else + // LSF specific: get the max runtime for the jobQueue and pass it for this job + request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) // Run the command as sh request.command = "sh " + jobScript diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index d5870a6c3..2453cc50a 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -33,7 +33,7 @@ import org.broadinstitute.sting.commandline.Argument trait CommandLineFunction extends QFunction with Logging { def commandLine: String - /** Setting the wall time request for drmaa job*/ + /** Setting the wall time request for DRMAA / run limit for LSF */ var wallTime: Option[Long] = None /** Upper memory limit */ diff --git a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala index 50fc529dd..c8085784d 100644 --- a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala +++ b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -126,4 +126,15 @@ class HelloWorldPipelineTest { spec.jobRunners = Seq("GridEngine") PipelineTest.executeTest(spec) } + + // disabled because our DRMAA implementation doesn't support wallTime + @Test(enabled=false, timeOut=36000000) + def testHelloWorldWithWalltime() { + val spec = new PipelineTestSpec + spec.name = "HelloWorldWithWalltime" + spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala" + + " -wallTime 100" + spec.jobRunners = PipelineTest.allJobRunners + PipelineTest.executeTest(spec) + } }