From e74f28ad891ebf864a7251cdad5ef0ce192dff74 Mon Sep 17 00:00:00 2001 From: kshakir Date: Mon, 31 Jan 2011 23:13:09 +0000 Subject: [PATCH] If there's an LSF queue maximum time limit set and the user hasn't specified one for this job, pass on the queue defined maximum limit with the job. Updated LibBatIntegrationTest to use proper networked temp directory accessible by local machines and nodes. Disabling the FCPTest until the VE3 is incorporated into the fullCallingPipeline. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5151 348d0f76-0448-11de-a6fe-93d51630548a --- .../org/broadinstitute/sting/BaseTest.java | 19 +++++++ .../jna/lsf/v7_0_6/LibBatIntegrationTest.java | 28 ++++++++-- .../sting/queue/engine/Lsf706JobRunner.scala | 51 ++++++++++++++++++- .../playground/FullCallingPipelineTest.scala | 2 +- 4 files changed, 93 insertions(+), 7 deletions(-) diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 6008759df..c30c736a3 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -66,6 +66,9 @@ public abstract class BaseTest { public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf"; + public static final String networkTempDir = "/broad/shptmp/"; + public static final File networkTempDirFile = new File(networkTempDir); + /** * Subdirectory under the ant build directory where we store integration test md5 results */ @@ -339,4 +342,20 @@ public abstract class BaseTest { throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); } } + + /** + * Creates a temp file that will be deleted on exit after tests are complete. + * @param name Prefix of the file. + * @param extension Extension to concat to the end of the file. + * @return A file in the network temporary directory starting with name, ending with extension, which will be deleted after the program exits. + */ + public static File createNetworkTempFile(String name, String extension) { + try { + File file = File.createTempFile(name, extension, networkTempDirFile); + file.deleteOnExit(); + return file; + } catch (IOException ex) { + throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); + } + } } diff --git a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index 5c79bb140..d0e82ed90 100644 --- a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -24,10 +24,12 @@ package org.broadinstitute.sting.jna.lsf.v7_0_6; +import com.sun.jna.StringArray; import com.sun.jna.ptr.IntByReference; import org.apache.commons.io.FileUtils; import org.broadinstitute.sting.utils.Utils; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; @@ -35,9 +37,14 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; import java.io.File; /** - * Really a unit test, but this test will only run on systems with LSF setup. + * Really unit tests, but these test will only run on systems with LSF setup. */ public class LibBatIntegrationTest extends BaseTest { + @BeforeClass + public void initLibBat() { + Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed")); + } + @Test public void testClusterName() { String clusterName = LibLsf.ls_getclustername(); @@ -45,12 +52,25 @@ public class LibBatIntegrationTest extends BaseTest { Assert.assertNotNull(clusterName); } + @Test + public void testReadQueueLimits() { + String queue = "hour"; + StringArray queues = new StringArray(new String[] {queue}); + IntByReference numQueues = new IntByReference(1); + queueInfoEnt queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0); + + Assert.assertEquals(numQueues.getValue(), 1); + Assert.assertNotNull(queueInfo); + Assert.assertEquals(queueInfo.queue, queue); + + int runLimit = queueInfo.rLimits[LibLsf.LSF_RLIMIT_RUN]; + Assert.assertTrue(runLimit > 0, "LSF run limit is not greater than zero: " + runLimit); + } + @Test public void testSubmitEcho() throws InterruptedException { String queue = "hour"; - File outFile = new File("LibBatIntegrationTest.out"); - - Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed")); + File outFile = createNetworkTempFile("LibBatIntegrationTest-", ".out"); submit req = new submit(); diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 61dc04ae7..55680081a 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -5,11 +5,12 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} -import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} import org.broadinstitute.sting.utils.Utils -import com.sun.jna.{NativeLong, Memory} import org.broadinstitute.sting.jna.clibrary.LibC import java.util.Date +import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} +import com.sun.jna.ptr.IntByReference +import com.sun.jna.{StringArray, NativeLong, Memory} /** * Runs jobs on an LSF compute cluster. @@ -73,6 +74,8 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR if (function.jobLimitSeconds.isDefined) { request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get + } else { + request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) } writeExec() @@ -170,6 +173,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR object Lsf706JobRunner extends Logging { init() + /** The name of the default queue. */ + private var defaultQueue: String = _ + + /** The run limits for each queue. */ + private var queueRlimitRun = Map.empty[String,Int] + /** * Initialize the Lsf library. */ @@ -178,6 +187,44 @@ object Lsf706JobRunner extends Logging { throw new QException(LibBat.lsb_sperror("lsb_init() failed")) } + /** + * Returns the run limit in seconds for the queue. + * If the queue name is null returns the length of the default queue. + * @param queue Name of the queue or null for the default queue. + * @return the run limit in seconds for the queue. + */ + def getRlimitRun(queue: String) = { + if (queue == null) { + if (defaultQueue != null) { + queueRlimitRun(defaultQueue) + } else { + // Get the info on the default queue. + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException("Unable to get LSF queue info for the default queue.") + defaultQueue = queueInfo.queue + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += defaultQueue -> limit + limit + } + } else { + queueRlimitRun.get(queue) match { + case Some(limit) => limit + case None => + // Cache miss. Go get the run limits from LSF. + val queues = new StringArray(Array[String](queue)) + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException("Unable to get LSF queue info for queue: " + queue) + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += queue -> limit + limit + } + } + } + /** * Tries to stop any running jobs. * @param runners Runners to stop. diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala index 0b7cd2414..6e2177d02 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala @@ -91,7 +91,7 @@ class FullCallingPipelineTest { final def convertDatasets: Array[Array[AnyRef]] = datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray - @Test(dataProvider="datasets") + @Test(dataProvider="datasets", enabled=false) def testFullCallingPipeline(dataset: PipelineDataset) = { val projectName = dataset.pipeline.getProject.getName val testName = "fullCallingPipeline-" + projectName