If there's an LSF queue maximum time limit set and the user hasn't specified one for this job, pass on the queue defined maximum limit with the job.

Updated LibBatIntegrationTest to use proper networked temp directory accessible by local machines and nodes.
Disabling the FCPTest until the VE3 is incorporated into the fullCallingPipeline.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5151 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-01-31 23:13:09 +00:00
parent f258363cfc
commit e74f28ad89
4 changed files with 93 additions and 7 deletions

View File

@ -66,6 +66,9 @@ public abstract class BaseTest {
public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod";
public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf";
public static final String networkTempDir = "/broad/shptmp/";
public static final File networkTempDirFile = new File(networkTempDir);
/**
* Subdirectory under the ant build directory where we store integration test md5 results
*/
@ -339,4 +342,20 @@ public abstract class BaseTest {
throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex);
}
}
/**
* Creates a temp file that will be deleted on exit after tests are complete.
* @param name Prefix of the file.
* @param extension Extension to concat to the end of the file.
* @return A file in the network temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createNetworkTempFile(String name, String extension) {
try {
File file = File.createTempFile(name, extension, networkTempDirFile);
file.deleteOnExit();
return file;
} catch (IOException ex) {
throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex);
}
}
}

View File

@ -24,10 +24,12 @@
package org.broadinstitute.sting.jna.lsf.v7_0_6;
import com.sun.jna.StringArray;
import com.sun.jna.ptr.IntByReference;
import org.apache.commons.io.FileUtils;
import org.broadinstitute.sting.utils.Utils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*;
@ -35,9 +37,14 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*;
import java.io.File;
/**
* Really a unit test, but this test will only run on systems with LSF setup.
* Really unit tests, but these test will only run on systems with LSF setup.
*/
public class LibBatIntegrationTest extends BaseTest {
@BeforeClass
public void initLibBat() {
Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed"));
}
@Test
public void testClusterName() {
String clusterName = LibLsf.ls_getclustername();
@ -45,12 +52,25 @@ public class LibBatIntegrationTest extends BaseTest {
Assert.assertNotNull(clusterName);
}
@Test
public void testReadQueueLimits() {
String queue = "hour";
StringArray queues = new StringArray(new String[] {queue});
IntByReference numQueues = new IntByReference(1);
queueInfoEnt queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0);
Assert.assertEquals(numQueues.getValue(), 1);
Assert.assertNotNull(queueInfo);
Assert.assertEquals(queueInfo.queue, queue);
int runLimit = queueInfo.rLimits[LibLsf.LSF_RLIMIT_RUN];
Assert.assertTrue(runLimit > 0, "LSF run limit is not greater than zero: " + runLimit);
}
@Test
public void testSubmitEcho() throws InterruptedException {
String queue = "hour";
File outFile = new File("LibBatIntegrationTest.out");
Assert.assertFalse(LibBat.lsb_init("LibBatIntegrationTest") < 0, LibBat.lsb_sperror("lsb_init() failed"));
File outFile = createNetworkTempFile("LibBatIntegrationTest-", ".out");
submit req = new submit();

View File

@ -5,11 +5,12 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit}
import org.broadinstitute.sting.utils.Utils
import com.sun.jna.{NativeLong, Memory}
import org.broadinstitute.sting.jna.clibrary.LibC
import java.util.Date
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{signalBulkJobs, submitReply, submit}
import com.sun.jna.ptr.IntByReference
import com.sun.jna.{StringArray, NativeLong, Memory}
/**
* Runs jobs on an LSF compute cluster.
@ -73,6 +74,8 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
if (function.jobLimitSeconds.isDefined) {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get
} else {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue)
}
writeExec()
@ -170,6 +173,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
object Lsf706JobRunner extends Logging {
init()
/** The name of the default queue. */
private var defaultQueue: String = _
/** The run limits for each queue. */
private var queueRlimitRun = Map.empty[String,Int]
/**
* Initialize the Lsf library.
*/
@ -178,6 +187,44 @@ object Lsf706JobRunner extends Logging {
throw new QException(LibBat.lsb_sperror("lsb_init() failed"))
}
/**
* Returns the run limit in seconds for the queue.
* If the queue name is null returns the length of the default queue.
* @param queue Name of the queue or null for the default queue.
* @return the run limit in seconds for the queue.
*/
def getRlimitRun(queue: String) = {
if (queue == null) {
if (defaultQueue != null) {
queueRlimitRun(defaultQueue)
} else {
// Get the info on the default queue.
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException("Unable to get LSF queue info for the default queue.")
defaultQueue = queueInfo.queue
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += defaultQueue -> limit
limit
}
} else {
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array[String](queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException("Unable to get LSF queue info for queue: " + queue)
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
}
}
/**
* Tries to stop any running jobs.
* @param runners Runners to stop.

View File

@ -91,7 +91,7 @@ class FullCallingPipelineTest {
final def convertDatasets: Array[Array[AnyRef]] =
datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray
@Test(dataProvider="datasets")
@Test(dataProvider="datasets", enabled=false)
def testFullCallingPipeline(dataset: PipelineDataset) = {
val projectName = dataset.pipeline.getProject.getName
val testName = "fullCallingPipeline-" + projectName