diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 6008759df..c30c736a3 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -66,6 +66,9 @@ public abstract class BaseTest { public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf"; + public static final String networkTempDir = "/broad/shptmp/"; + public static final File networkTempDirFile = new File(networkTempDir); + /** * Subdirectory under the ant build directory where we store integration test md5 results */ @@ -339,4 +342,20 @@ public abstract class BaseTest { throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); } } + + /** + * Creates a temp file that will be deleted on exit after tests are complete. + * @param name Prefix of the file. + * @param extension Extension to concat to the end of the file. + * @return A file in the network temporary directory starting with name, ending with extension, which will be deleted after the program exits. + */ + public static File createNetworkTempFile(String name, String extension) { + try { + File file = File.createTempFile(name, extension, networkTempDirFile); + file.deleteOnExit(); + return file; + } catch (IOException ex) { + throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); + } + } } diff --git a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index 5c79bb140..d0e82ed90 100644 --- a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -24,10 +24,12 @@ package org.broadinstitute.sting.jna.lsf.v7_0_6; +import com.sun.jna.StringArray; import com.sun.jna.ptr.IntByReference; import org.apache.commons.io.FileUtils; import org.broadinstitute.sting.utils.Utils; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; @@ -35,9 +37,14 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; import java.io.File; /** - * Really a unit test, but this test will only run on systems with LSF setup. + * Really unit tests, but these test will only run on systems with LSF setup. */ public class LibBatIntegrationTest extends BaseTest { + @BeforeClass + public void initLibBat() { + Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed")); + } + @Test public void testClusterName() { String clusterName = LibLsf.ls_getclustername(); @@ -45,12 +52,25 @@ public class LibBatIntegrationTest extends BaseTest { Assert.assertNotNull(clusterName); } + @Test + public void testReadQueueLimits() { + String queue = "hour"; + StringArray queues = new StringArray(new String[] {queue}); + IntByReference numQueues = new IntByReference(1); + queueInfoEnt queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0); + + Assert.assertEquals(numQueues.getValue(), 1); + Assert.assertNotNull(queueInfo); + Assert.assertEquals(queueInfo.queue, queue); + + int runLimit = queueInfo.rLimits[LibLsf.LSF_RLIMIT_RUN]; + Assert.assertTrue(runLimit > 0, "LSF run limit is not greater than zero: " + runLimit); + } + @Test public void testSubmitEcho() throws InterruptedException { String queue = "hour"; - File outFile = new File("LibBatIntegrationTest.out"); - - Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed")); + File outFile = createNetworkTempFile("LibBatIntegrationTest-", ".out"); submit req = new submit(); diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 61dc04ae7..55680081a 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -5,11 +5,12 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} -import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} import org.broadinstitute.sting.utils.Utils -import com.sun.jna.{NativeLong, Memory} import org.broadinstitute.sting.jna.clibrary.LibC import java.util.Date +import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit} +import com.sun.jna.ptr.IntByReference +import com.sun.jna.{StringArray, NativeLong, Memory} /** * Runs jobs on an LSF compute cluster. @@ -73,6 +74,8 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR if (function.jobLimitSeconds.isDefined) { request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get + } else { + request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) } writeExec() @@ -170,6 +173,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR object Lsf706JobRunner extends Logging { init() + /** The name of the default queue. */ + private var defaultQueue: String = _ + + /** The run limits for each queue. */ + private var queueRlimitRun = Map.empty[String,Int] + /** * Initialize the Lsf library. */ @@ -178,6 +187,44 @@ object Lsf706JobRunner extends Logging { throw new QException(LibBat.lsb_sperror("lsb_init() failed")) } + /** + * Returns the run limit in seconds for the queue. + * If the queue name is null returns the length of the default queue. + * @param queue Name of the queue or null for the default queue. + * @return the run limit in seconds for the queue. + */ + def getRlimitRun(queue: String) = { + if (queue == null) { + if (defaultQueue != null) { + queueRlimitRun(defaultQueue) + } else { + // Get the info on the default queue. + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException("Unable to get LSF queue info for the default queue.") + defaultQueue = queueInfo.queue + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += defaultQueue -> limit + limit + } + } else { + queueRlimitRun.get(queue) match { + case Some(limit) => limit + case None => + // Cache miss. Go get the run limits from LSF. + val queues = new StringArray(Array[String](queue)) + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException("Unable to get LSF queue info for queue: " + queue) + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += queue -> limit + limit + } + } + } + /** * Tries to stop any running jobs. * @param runners Runners to stop. diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala index 0b7cd2414..6e2177d02 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala @@ -91,7 +91,7 @@ class FullCallingPipelineTest { final def convertDatasets: Array[Array[AnyRef]] = datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray - @Test(dataProvider="datasets") + @Test(dataProvider="datasets", enabled=false) def testFullCallingPipeline(dataset: PipelineDataset) = { val projectName = dataset.pipeline.getProject.getName val testName = "fullCallingPipeline-" + projectName