diff --git a/build.xml b/build.xml index fe4c7a3f4..874a003eb 100644 --- a/build.xml +++ b/build.xml @@ -957,6 +957,12 @@ + + diff --git a/public/c/bwa/build_linux.sh b/public/c/bwa/build_linux.sh index c713f3963..b3631a28d 100755 --- a/public/c/bwa/build_linux.sh +++ b/public/c/bwa/build_linux.sh @@ -1,5 +1,5 @@ #!/bin/sh -export BWA_HOME="/humgen/gsa-scr1/hanna/src/bwa" +export BWA_HOME="/humgen/gsa-scr1/hanna/src/bwa-trunk/bwa" export JAVA_INCLUDE="/broad/tools/Linux/x86_64/pkgs/jdk_1.6.0_12/include -I/broad/tools/Linux/x86_64/pkgs/jdk_1.6.0_12/include/linux" export TARGET_LIB="libbwa.so" export EXTRA_LIBS="-lc -lz -lstdc++ -lpthread" diff --git a/public/c/bwa/bwa_gateway.cpp b/public/c/bwa/bwa_gateway.cpp index 3f6850e37..00f5aa5bc 100644 --- a/public/c/bwa/bwa_gateway.cpp +++ b/public/c/bwa/bwa_gateway.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "bwase.h" #include "bwa_gateway.h" @@ -27,6 +28,9 @@ BWA::BWA(const char* ann_filename, bwt_restore_sa(reverse_sa_filename, bwts[1]); load_default_options(); + // Always reinitialize the random seed whenever a new set of files are loaded. + initialize_random_seed(); + // initialize the bwase subsystem bwase_initialize(); } @@ -207,6 +211,11 @@ void BWA::load_default_options() options.trim_qual = 0; } +void BWA::initialize_random_seed() +{ + srand48(bns->seed); +} + void BWA::set_max_edit_distance(float edit_distance) { if(edit_distance > 0 && edit_distance < 1) { options.fnr = edit_distance; diff --git a/public/c/bwa/bwa_gateway.h b/public/c/bwa/bwa_gateway.h index 0ef0a129b..2d26ec650 100644 --- a/public/c/bwa/bwa_gateway.h +++ b/public/c/bwa/bwa_gateway.h @@ -37,6 +37,7 @@ class BWA { gap_opt_t options; void load_default_options(); + void initialize_random_seed(); bwa_seq_t* create_sequence(const char* bases, const unsigned read_length); void copy_bases_into_sequence(bwa_seq_t* sequence, const char* bases, const unsigned read_length); Alignment generate_final_alignment_from_sequence(bwa_seq_t* sequence); diff --git a/public/c/bwa/libbwa.so.1 b/public/c/bwa/libbwa.so.1 deleted file mode 100755 index bfa3c2847..000000000 Binary files a/public/c/bwa/libbwa.so.1 and /dev/null differ diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/genotyper/IndelGenotypeLikelihoodsCalculationModel.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/genotyper/IndelGenotypeLikelihoodsCalculationModel.java index be2039780..60ea601d5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/genotyper/IndelGenotypeLikelihoodsCalculationModel.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/genotyper/IndelGenotypeLikelihoodsCalculationModel.java @@ -391,7 +391,7 @@ public class IndelGenotypeLikelihoodsCalculationModel extends GenotypeLikelihood if (DEBUG) System.out.format("hsize: %d eventLength: %d refSize: %d, locStart: %d numpr: %d\n",hsize,eventLength, (int)ref.getWindow().size(), loc.getStart(), numPrefBases); - + //System.out.println(eventLength); haplotypeMap = Haplotype.makeHaplotypeListFromAlleles( alleleList, loc.getStart(), ref, hsize, numPrefBases); @@ -418,8 +418,8 @@ public class IndelGenotypeLikelihoodsCalculationModel extends GenotypeLikelihood - // which genotype likelihoods correspond to two most likely alleles? By convention, likelihood vector is lexically ordered, for example - // for 3 alleles it's 00 01 02 11 12 22 + // which genotype likelihoods correspond to two most likely alleles? By convention, likelihood vector is ordered as for example + // for 3 alleles it's 00 01 11 02 12 22 GLs.put(sample.getKey(), new MultiallelicGenotypeLikelihoods(sample.getKey(), alleleList, genotypeLikelihoods, diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/PairHMMIndelErrorModel.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/PairHMMIndelErrorModel.java index 60262d6f4..55450486b 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/PairHMMIndelErrorModel.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/PairHMMIndelErrorModel.java @@ -1042,8 +1042,8 @@ public class PairHMMIndelErrorModel { int k=0; double maxElement = Double.NEGATIVE_INFINITY; - for (int i=0; i < hSize; i++) { - for (int j=i; j < hSize; j++){ + for (int j=0; j < hSize; j++) { + for (int i=0; i <= j; i++){ genotypeLikelihoods[k++] = haplotypeLikehoodMatrix[i][j]; if (haplotypeLikehoodMatrix[i][j] > maxElement) maxElement = haplotypeLikehoodMatrix[i][j]; diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmplicons.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmplicons.java index 14d462518..cb03d4c61 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmplicons.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmplicons.java @@ -233,6 +233,7 @@ public class ValidationAmplicons extends RodWalker { lowerRepeats(); } else { lowerNonUniqueSegments(); + aligner.close(); } print(); } diff --git a/public/java/test/org/broadinstitute/sting/alignment/AlignerIntegrationTest.java b/public/java/test/org/broadinstitute/sting/alignment/AlignerIntegrationTest.java index dafaf3ffe..a6af034cb 100644 --- a/public/java/test/org/broadinstitute/sting/alignment/AlignerIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/alignment/AlignerIntegrationTest.java @@ -14,7 +14,7 @@ import java.util.Arrays; public class AlignerIntegrationTest extends WalkerTest { @Test public void testBasicAlignment() { - String md5 = "34eb4323742999d6d250a0aaa803c6d5"; + String md5 = "a2bdf907b18114a86ca47f9fc23791bf"; WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec( "-R " + GATKDataLocation + "bwa/human_b36_both.fasta" + " -T Align" + diff --git a/public/java/test/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmpliconsIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmpliconsIntegrationTest.java index 6528f5795..95f4ac0ae 100755 --- a/public/java/test/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmpliconsIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/walkers/validation/ValidationAmpliconsIntegrationTest.java @@ -14,7 +14,7 @@ import java.util.Arrays; */ public class ValidationAmpliconsIntegrationTest extends WalkerTest { - @Test + @Test(enabled=true) public void testWikiExample() { String siteVCF = validationDataLocation + "sites_to_validate.vcf"; String maskVCF = validationDataLocation + "amplicon_mask_sites.vcf"; @@ -27,7 +27,7 @@ public class ValidationAmpliconsIntegrationTest extends WalkerTest { executeTest("Test probes", spec); } - @Test + @Test(enabled=true) public void testWikiExampleNoBWA() { String siteVCF = validationDataLocation + "sites_to_validate.vcf"; String maskVCF = validationDataLocation + "amplicon_mask_sites.vcf"; @@ -40,7 +40,7 @@ public class ValidationAmpliconsIntegrationTest extends WalkerTest { executeTest("Test probes", spec); } - @Test + @Test(enabled=true) public void testWikiExampleMonoFilter() { String siteVCF = validationDataLocation + "sites_to_validate.vcf"; String maskVCF = validationDataLocation + "amplicon_mask_sites.vcf"; diff --git a/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index aa6303a6f..77db34cbc 100644 --- a/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -34,7 +34,6 @@ import org.testng.annotations.Test; import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.*; -import javax.jws.soap.SOAPBinding; import java.io.File; /** @@ -55,25 +54,25 @@ public class LibBatIntegrationTest extends BaseTest { @Test public void testReadConfEnv() { - LibLsf.config_param[] unitsParam = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4); + LibLsf.config_param[] configParams = (LibLsf.config_param[]) new LibLsf.config_param().toArray(4); - unitsParam[0].paramName = "LSF_UNIT_FOR_LIMITS"; - unitsParam[1].paramName = "LSF_CONFDIR"; - unitsParam[2].paramName = "MADE_UP_PARAMETER"; + configParams[0].paramName = "LSF_UNIT_FOR_LIMITS"; + configParams[1].paramName = "LSF_CONFDIR"; + configParams[2].paramName = "MADE_UP_PARAMETER"; - Structure.autoWrite(unitsParam); + Structure.autoWrite(configParams); - if (LibLsf.ls_readconfenv(unitsParam[0], null) != 0) { + if (LibLsf.ls_readconfenv(configParams[0], null) != 0) { Assert.fail(LibLsf.ls_sysmsg()); } - Structure.autoRead(unitsParam); + Structure.autoRead(configParams); - System.out.println("LSF_UNIT_FOR_LIMITS: " + unitsParam[0].paramValue); - Assert.assertNotNull(unitsParam[1].paramValue); - Assert.assertNull(unitsParam[2].paramValue); - Assert.assertNull(unitsParam[3].paramName); - Assert.assertNull(unitsParam[3].paramValue); + System.out.println("LSF_UNIT_FOR_LIMITS: " + configParams[0].paramValue); + Assert.assertNotNull(configParams[1].paramValue); + Assert.assertNull(configParams[2].paramValue); + Assert.assertNull(configParams[3].paramName); + Assert.assertNull(configParams[3].paramValue); } @Test diff --git a/public/packages/Queue.xml b/public/packages/Queue.xml index 58da4398e..589cb45f5 100644 --- a/public/packages/Queue.xml +++ b/public/packages/Queue.xml @@ -41,6 +41,7 @@ + diff --git a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/GATKResourcesBundle.scala b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/GATKResourcesBundle.scala index 150d78019..934cf2a3c 100755 --- a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/GATKResourcesBundle.scala +++ b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/GATKResourcesBundle.scala @@ -15,8 +15,8 @@ class GATKResourcesBundle extends QScript { @Argument(doc="liftOverPerl", required=false) var liftOverPerl: File = new File("./perl/liftOverVCF.pl") - @Argument(shortName = "svn", doc="The SVN version of this release", required=true) - var SVN_VERSION: String = _ + @Argument(shortName = "ver", doc="The SVN version of this release", required=true) + var VERSION: String = _ @Argument(shortName = "bundleDir", doc="Path to root where resource files will be placed", required=false) val BUNDLE_ROOT = new File("/humgen/gsa-hpprojects/GATK/bundle") @@ -32,8 +32,8 @@ class GATKResourcesBundle extends QScript { val SITES_EXT: String = "sites" - def BUNDLE_DIR: File = BUNDLE_ROOT + "/" + SVN_VERSION - def DOWNLOAD_DIR: File = DOWNLOAD_ROOT + "/" + SVN_VERSION + def BUNDLE_DIR: File = BUNDLE_ROOT + "/" + VERSION + def DOWNLOAD_DIR: File = DOWNLOAD_ROOT + "/" + VERSION // REFERENCES class Reference( val name: String, val file: File ) { } @@ -113,6 +113,12 @@ class GATKResourcesBundle extends QScript { addResource(new Resource(hg19.file, "", hg19, false)) addResource(new Resource(hg18.file, "", hg18, false)) + // + // The b37_decoy reference + // + addResource(new Resource("/humgen/1kg/reference/human_g1k_v37_decoy.fasta", + "IGNORE", b37, false, false)) + // // standard VCF files. Will be lifted to each reference // diff --git a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/RecalibrateBaseQualities.scala b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/RecalibrateBaseQualities.scala index f8218148e..cbe53db8d 100755 --- a/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/RecalibrateBaseQualities.scala +++ b/public/scala/qscript/org/broadinstitute/sting/queue/qscripts/RecalibrateBaseQualities.scala @@ -42,8 +42,8 @@ class RecalibrateBaseQualities extends QScript { val recalFile1: File = swapExt(bam, ".bam", ".recal1.csv") val recalFile2: File = swapExt(bam, ".bam", ".recal2.csv") val recalBam: File = swapExt(bam, ".bam", ".recal.bam") - val path1: String = bam + ".before" - val path2: String = bam + ".after" + val path1: String = recalBam + ".before" + val path2: String = recalBam + ".after" add(cov(bam, recalFile1), recal(bam, recalFile1, recalBam), diff --git a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala index 71970a36b..05c1a1775 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -45,7 +45,7 @@ class QSettings { var jobPriority: Option[Int] = None @Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false) - var memoryLimit: Option[Int] = None + var memoryLimit: Option[Double] = None @Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false) var runDirectory = new File(".") diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index 2fbfab5ec..2e3108136 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -33,12 +33,29 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils} */ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { + /** The string representation of the identifier of the running job. */ + def jobIdString: String = null + /** A generated exec shell script. */ protected var jobScript: File = _ /** Which directory to use for the job status files. */ protected def jobStatusDir = function.jobTempDir + /** Amount of time a job can go without status before giving up. */ + private val unknownStatusMaxSeconds = 5 * 60 + + /** Last known status */ + protected var lastStatus: RunnerStatus.Value = _ + + /** The last time the status was updated */ + protected var lastStatusUpdate: Long = _ + + final override def status = this.lastStatus + + def residentRequestMB: Option[Double] = function.memoryLimit.map(_ * 1024) + def residentLimitMB: Option[Double] = residentRequestMB.map( _ * 1.2 ) + override def init() { super.init() var exec = new StringBuilder @@ -53,7 +70,21 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { } exec.append(function.commandLine) - this.jobScript = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) + this.jobScript = IOUtils.writeTempFile(exec.toString(), ".exec", "", jobStatusDir) + } + + protected def updateStatus(updatedStatus: RunnerStatus.Value) { + this.lastStatus = updatedStatus + this.lastStatusUpdate = System.currentTimeMillis + } + + override def checkUnknownStatus() { + val unknownStatusMillis = (System.currentTimeMillis - lastStatusUpdate) + if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + updateStatus(RunnerStatus.FAILED) + logger.error("Unable to read status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), jobIdString, function.description)) + } } override def cleanup() { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala index d2be4939a..30187f7e2 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobManager.scala @@ -44,9 +44,9 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] { /** * Updates the status on a list of functions. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[TRunner]) { - } + def updateStatus(runners: Set[TRunner]): Set[TRunner] = Set.empty /** * Stops a list of functions. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 4b4d44988..de5fbde05 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -52,6 +52,11 @@ trait JobRunner[TFunction <: QFunction] { */ def status: RunnerStatus.Value + /** + * Checks if the status has been unknown for an extended period of time. + */ + def checkUnknownStatus() {} + /** * Returns the function to be run. */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 8ed3f84c1..a52e9c561 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -1005,7 +1005,10 @@ class QGraph extends Logging { .asInstanceOf[Set[JobRunner[QFunction]]] if (managerRunners.size > 0) try { - manager.updateStatus(managerRunners) + val updatedRunners = manager.updateStatus(managerRunners) + for (runner <- managerRunners.diff(updatedRunners)) { + runner.checkUnknownStatus() + } } catch { case e => /* ignore */ } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala index 82edf6221..8c639b5bb 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -40,12 +40,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine /** Job Id of the currently executing job. */ private var jobId: String = _ - - /** Last known status */ - private var lastStatus: RunnerStatus.Value = _ - - /** The last time the status was updated */ - protected var lastStatusUpdate: Long = _ + override def jobIdString = jobId def start() { GridEngineJobRunner.gridEngineSession.synchronized { @@ -82,11 +77,14 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine nativeSpecString += " -q " + function.jobQueue } - // If the memory limit is set (GB) specify the memory limit - if (function.memoryLimit.isDefined) { - val memAvl: String = function.memoryLimit.get + "G" - val memMax: String = (function.memoryLimit.get * 1.2 * 1024).ceil.toInt + "M" - nativeSpecString += " -l mem_free=" + memAvl + ",h_rss=" + memMax + // If the resident set size is requested pass on the memory request + if (residentRequestMB.isDefined) { + nativeSpecString += " -l mem_free=%dM".format(residentRequestMB.get.ceil.toInt) + } + + // If the resident set size limit is defined specify the memory limit + if (residentLimitMB.isDefined) { + nativeSpecString += " -l h_rss=%dM".format(residentLimitMB.get.ceil.toInt) } // If the priority is set (user specified Int) specify the priority @@ -121,21 +119,11 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine logger.info("Submitted Grid Engine job id: " + jobId) } } - - def status = this.lastStatus - - private def updateStatus(updatedStatus: RunnerStatus.Value) { - this.lastStatus = updatedStatus - this.lastStatusUpdate = System.currentTimeMillis - } } object GridEngineJobRunner extends Logging { private val gridEngineSession = SessionFactory.getFactory.getSession - /** Amount of time a job can go without status before giving up. */ - private val unknownStatusMaxSeconds = 5 * 60 - initGridEngine() /** @@ -156,16 +144,14 @@ object GridEngineJobRunner extends Logging { /** * Updates the status of a list of jobs. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[GridEngineJobRunner]) { + def updateStatus(runners: Set[GridEngineJobRunner]) = { var updatedRunners = Set.empty[GridEngineJobRunner] gridEngineSession.synchronized { runners.foreach(runner => if (updateRunnerStatus(runner)) {updatedRunners += runner}) } - - for (runner <- runners.diff(updatedRunners)) { - checkUnknownStatus(runner) - } + updatedRunners } /** @@ -219,20 +205,11 @@ object GridEngineJobRunner extends Logging { logger.warn("Unable to determine status of Grid Engine job id " + runner.jobId, de) } - Option(returnStatus) match { - case Some(returnStatus) => - runner.updateStatus(returnStatus) - return true - case None => return false - } - } - - private def checkUnknownStatus(runner: GridEngineJobRunner) { - val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate) - if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runner.updateStatus(RunnerStatus.FAILED) - logger.error("Unable to read Grid Engine status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description)) + if (returnStatus != null) { + runner.updateStatus(returnStatus) + true + } else { + false } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala index c0fff9125..23ddab619 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobManager.scala @@ -34,6 +34,6 @@ class Lsf706JobManager extends CommandLineJobManager[Lsf706JobRunner] { def runnerType = classOf[Lsf706JobRunner] def create(function: CommandLineFunction) = new Lsf706JobRunner(function) - override def updateStatus(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) } + override def updateStatus(runners: Set[Lsf706JobRunner]) = { Lsf706JobRunner.updateStatus(runners) } override def tryStop(runners: Set[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index ac2f036b4..46dd08332 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -32,8 +32,8 @@ import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import com.sun.jna.ptr.IntByReference -import com.sun.jna.{StringArray, NativeLong} import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import com.sun.jna.{Structure, StringArray, NativeLong} /** * Runs jobs on an LSF compute cluster. @@ -45,12 +45,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR /** Job Id of the currently executing job. */ private var jobId = -1L - - /** Last known status */ - private var lastStatus: RunnerStatus.Value = _ - - /** The last time the status was updated */ - protected var lastStatusUpdate: Long = _ + override def jobIdString = jobId.toString /** * Dispatches the function on the LSF cluster. @@ -85,12 +80,19 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR request.options |= LibBat.SUB_QUEUE } - // If the memory limit is set (GB) specify the memory limit - if (function.memoryLimit.isDefined) { - request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" + // If the resident set size is requested pass on the memory request + if (residentRequestMB.isDefined) { + val memInUnits = Lsf706JobRunner.convertUnits(residentRequestMB.get) + request.resReq = "select[mem>%1$d] rusage[mem=%1$d]".format(memInUnits) request.options |= LibBat.SUB_RES_REQ } + // If the resident set size limit is defined specify the memory limit + if (residentLimitMB.isDefined) { + val memInUnits = Lsf706JobRunner.convertUnits(residentLimitMB.get) + request.rLimits(LibLsf.LSF_RLIMIT_RSS) = memInUnits + } + // If the priority is set (user specified Int) specify the priority if (function.jobPriority.isDefined) { request.userPriority = function.jobPriority.get @@ -122,11 +124,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR } } - def status = this.lastStatus - - private def updateStatus(updatedStatus: RunnerStatus.Value) { - this.lastStatus = updatedStatus - this.lastStatusUpdate = System.currentTimeMillis + override def checkUnknownStatus() { + // TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs. + // Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod + // LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct) + // LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist) + logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(jobId)) + super.checkUnknownStatus() } } @@ -137,17 +141,8 @@ object Lsf706JobRunner extends Logging { /** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */ private val retryExpiredSeconds = 5 * 60 - /** Amount of time a job can go without status before giving up. */ - private val unknownStatusMaxSeconds = 5 * 60 - initLsf() - /** The name of the default queue. */ - private var defaultQueue: String = _ - - /** The run limits for each queue. */ - private var queueRlimitRun = Map.empty[String,Int] - /** * Initialize the Lsf library. */ @@ -161,8 +156,9 @@ object Lsf706JobRunner extends Logging { /** * Bulk updates job statuses. * @param runners Runners to update. + * @return runners which were updated. */ - def updateStatus(runners: Set[Lsf706JobRunner]) { + def updateStatus(runners: Set[Lsf706JobRunner]) = { var updatedRunners = Set.empty[Lsf706JobRunner] Lsf706JobRunner.lsfLibLock.synchronized { @@ -192,70 +188,7 @@ object Lsf706JobRunner extends Logging { } } - for (runner <- runners.diff(updatedRunners)) { - checkUnknownStatus(runner) - } - } - - /** - * Tries to stop any running jobs. - * @param runners Runners to stop. - */ - def tryStop(runners: Set[Lsf706JobRunner]) { - lsfLibLock.synchronized { - // lsb_killbulkjobs does not seem to forward SIGTERM, - // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. - for (runner <- runners.filterNot(_.jobId < 0)) { - try { - if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) - logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) - } catch { - case e => - logger.error("Unable to kill job " + runner.jobId, e) - } - } - } - } - - - /** - * Returns the run limit in seconds for the queue. - * If the queue name is null returns the length of the default queue. - * @param queue Name of the queue or null for the default queue. - * @return the run limit in seconds for the queue. - */ - private def getRlimitRun(queue: String) = { - lsfLibLock.synchronized { - if (queue == null) { - if (defaultQueue != null) { - queueRlimitRun(defaultQueue) - } else { - // Get the info on the default queue. - val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) - if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) - defaultQueue = queueInfo.queue - val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += defaultQueue -> limit - limit - } - } else { - queueRlimitRun.get(queue) match { - case Some(limit) => limit - case None => - // Cache miss. Go get the run limits from LSF. - val queues = new StringArray(Array[String](queue)) - val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) - if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) - val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += queue -> limit - limit - } - } - } + updatedRunners } private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) { @@ -280,20 +213,6 @@ object Lsf706JobRunner extends Logging { ) } - private def checkUnknownStatus(runner: Lsf706JobRunner) { - // TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs. - // Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod - // LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct) - // LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist) - logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId)) - val unknownStatusMillis = (System.currentTimeMillis - runner.lastStatusUpdate) - if (unknownStatusMillis > (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runner.updateStatus(RunnerStatus.FAILED) - logger.error("Unable to read LSF status for %0.2f minutes: job id %d: %s".format(unknownStatusMillis/(60 * 1000D), runner.jobId, runner.function.description)) - } - } - /** * Returns true if LSF is expected to retry running the function. * @param exitInfo The reason the job exited. @@ -309,4 +228,86 @@ object Lsf706JobRunner extends Logging { } } } + + /** + * Tries to stop any running jobs. + * @param runners Runners to stop. + */ + def tryStop(runners: Set[Lsf706JobRunner]) { + lsfLibLock.synchronized { + // lsb_killbulkjobs does not seem to forward SIGTERM, + // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. + for (runner <- runners.filterNot(_.jobId < 0)) { + try { + if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0) + logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId)) + } catch { + case e => + logger.error("Unable to kill job " + runner.jobId, e) + } + } + } + } + + /** The name of the default queue. */ + private lazy val defaultQueue: String = { + lsfLibLock.synchronized { + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) + queueInfo.queue + } + } + + /** The run limits for each queue. */ + private var queueRlimitRun = Map.empty[String,Int] + + /** + * Returns the run limit in seconds for the queue. + * If the queue name is null returns the length of the default queue. + * @param queue Name of the queue or null for the default queue. + * @return the run limit in seconds for the queue. + */ + private def getRlimitRun(queueName: String) = { + lsfLibLock.synchronized { + val queue = if (queueName == null) defaultQueue else queueName + queueRlimitRun.get(queue) match { + case Some(limit) => limit + case None => + // Cache miss. Go get the run limits from LSF. + val queues = new StringArray(Array(queue)) + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += queue -> limit + limit + } + } + } + + private lazy val unitDivisor: Double = { + lsfLibLock.synchronized { + val unitsParam: Array[LibLsf.config_param] = new LibLsf.config_param().toArray(2).asInstanceOf[Array[LibLsf.config_param]] + unitsParam(0).paramName = "LSF_UNIT_FOR_LIMITS" + + Structure.autoWrite(unitsParam.asInstanceOf[Array[Structure]]) + if (LibLsf.ls_readconfenv(unitsParam(0), null) != 0) + throw new QException(LibBat.lsb_sperror("ls_readconfenv() failed")) + Structure.autoRead(unitsParam.asInstanceOf[Array[Structure]]) + + unitsParam(0).paramValue match { + case "MB" => 1D + case "GB" => 1024D + case "TB" => 1024D * 1024 + case "PB" => 1024D * 1024 * 1024 + case "EB" => 1024D * 1024 * 1024 * 1024 + case null => 1D + } + } + } + + private def convertUnits(mb: Double) = (mb / unitDivisor).ceil.toInt } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala index 603511a30..128d8773c 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -50,10 +50,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu // Allow advanced users to update the job. updateJobRun(job) - runStatus = RunnerStatus.RUNNING + updateStatus(RunnerStatus.RUNNING) job.run() - runStatus = RunnerStatus.DONE + updateStatus(RunnerStatus.FAILED) } - def status = runStatus + override def checkUnknownStatus() {} } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 2b1abb2d0..c62fdcd7c 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -9,7 +9,7 @@ trait CommandLineFunction extends QFunction with Logging { def commandLine: String /** Upper memory limit */ - var memoryLimit: Option[Int] = None + var memoryLimit: Option[Double] = None /** Job project to run the command */ var jobProject: String = _ @@ -56,7 +56,7 @@ trait CommandLineFunction extends QFunction with Logging { if (memoryLimit.isEmpty) memoryLimit = qSettings.memoryLimit - super.freezeFieldValues + super.freezeFieldValues() } /** diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala index 72445442e..e8279f62b 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/JavaCommandLineFunction.scala @@ -47,7 +47,7 @@ trait JavaCommandLineFunction extends CommandLineFunction { /** * Memory limit for the java executable, or if None will use the default memoryLimit. */ - var javaMemoryLimit: Option[Int] = None + var javaMemoryLimit: Option[Double] = None /** * Returns the java executable to run. @@ -61,8 +61,8 @@ trait JavaCommandLineFunction extends CommandLineFunction { null } - override def freezeFieldValues = { - super.freezeFieldValues + override def freezeFieldValues() { + super.freezeFieldValues() if (javaMemoryLimit.isEmpty && memoryLimit.isDefined) javaMemoryLimit = memoryLimit @@ -72,7 +72,7 @@ trait JavaCommandLineFunction extends CommandLineFunction { } def javaOpts = "%s -Djava.io.tmpdir=%s" - .format(optional(" -Xmx", javaMemoryLimit, "g"), jobTempDir) + .format(optional(" -Xmx", javaMemoryLimit.map(gb => (gb * 1024).ceil.toInt), "m"), jobTempDir) def commandLine = "java%s %s" .format(javaOpts, javaExecutable) diff --git a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala index 0871e769b..7c76823da 100644 --- a/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala +++ b/public/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -29,7 +29,7 @@ import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec} class HelloWorldPipelineTest { @Test - def testHelloWorld { + def testHelloWorld() { val spec = new PipelineTestSpec spec.name = "HelloWorld" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala" @@ -37,15 +37,23 @@ class HelloWorldPipelineTest { } @Test - def testHelloWorldWithPrefix { + def testHelloWorldWithPrefix() { val spec = new PipelineTestSpec spec.name = "HelloWorldWithPrefix" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPrefix HelloWorld" PipelineTest.executeTest(spec) } + @Test + def testHelloWorldWithMemoryLimit() { + val spec = new PipelineTestSpec + spec.name = "HelloWorldWithPrefix" + spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -memLimit 1.25" + PipelineTest.executeTest(spec) + } + @Test(enabled=false) - def testHelloWorldWithPriority { + def testHelloWorldWithPriority() { val spec = new PipelineTestSpec spec.name = "HelloWorldWithPriority" spec.args = "-S public/scala/qscript/org/broadinstitute/sting/queue/qscripts/examples/HelloWorld.scala -jobPriority 100"