diff --git a/public/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java b/public/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java index 67010c4d5..9c40fb976 100644 --- a/public/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java +++ b/public/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java @@ -147,7 +147,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram { String clpConstructor = String.format("analysisName = \"%s\"%njavaMainClass = \"%s\"%n", clpClassName, clp.getName()); writeClass("org.broadinstitute.sting.queue.function.JavaCommandLineFunction", clpClassName, - false, clpConstructor, ArgumentDefinitionField.getArgumentFields(parser,clp), dependents); + false, clpConstructor, ArgumentDefinitionField.getArgumentFields(parser,clp), dependents, false); if (clp == CommandLineGATK.class) { for (Entry>> walkersByPackage: walkerManager.getWalkerNamesByPackage(false).entrySet()) { @@ -169,7 +169,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram { } writeClass(GATK_EXTENSIONS_PACKAGE_NAME + "." + clpClassName, walkerName, - isScatter, constructor, argumentFields, dependents); + isScatter, constructor, argumentFields, dependents, true); } catch (Exception e) { throw new ReviewedStingException("Error generating wrappers for walker " + walkerType, e); } @@ -241,8 +241,9 @@ public class GATKExtensionsGenerator extends CommandLineProgram { * @throws IOException If the file cannot be written. */ private void writeClass(String baseClass, String className, boolean isScatter, - String constructor, List argumentFields, Set> dependents) throws IOException { - String content = getContent(CLASS_TEMPLATE, baseClass, className, constructor, isScatter, "", argumentFields, dependents); + String constructor, List argumentFields, + Set> dependents, boolean isGATKWalker) throws IOException { + String content = getContent(CLASS_TEMPLATE, baseClass, className, constructor, isScatter, "", argumentFields, dependents, isGATKWalker); writeFile(GATK_EXTENSIONS_PACKAGE_NAME + "." + className, content); } @@ -256,7 +257,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram { */ private void writeFilter(String className, List argumentFields, Set> dependents) throws IOException { String content = getContent(TRAIT_TEMPLATE, "org.broadinstitute.sting.queue.function.CommandLineFunction", - className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields, dependents); + className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields, dependents, false); writeFile(GATK_EXTENSIONS_PACKAGE_NAME + "." + className, content); } @@ -350,7 +351,8 @@ public class GATKExtensionsGenerator extends CommandLineProgram { */ private static String getContent(String scalaTemplate, String baseClass, String className, String constructor, boolean isScatter, String commandLinePrefix, - List argumentFields, Set> dependents) { + List argumentFields, Set> dependents, + boolean isGATKWalker) { StringBuilder arguments = new StringBuilder(); StringBuilder commandLine = new StringBuilder(commandLinePrefix); @@ -384,8 +386,10 @@ public class GATKExtensionsGenerator extends CommandLineProgram { StringBuffer freezeFieldOverride = new StringBuffer(); for (String freezeField: freezeFields) freezeFieldOverride.append(freezeField); - if (freezeFieldOverride.length() > 0) { + if (freezeFieldOverride.length() > 0 || isGATKWalker) { freezeFieldOverride.insert(0, String.format("override def freezeFieldValues = {%nsuper.freezeFieldValues%n")); + if ( isGATKWalker ) + freezeFieldOverride.append(String.format("if ( num_threads.isDefined ) nCoresRequest = num_threads%n")); freezeFieldOverride.append(String.format("}%n%n")); } diff --git a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala index 648f9ffef..e8ac26a57 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -62,6 +62,13 @@ class QSettings { @Argument(fullName="resident_memory_request", shortName="resMemReq", doc="Default resident memory request for jobs, in gigabytes.", required=false) var residentRequest: Option[Double] = None + /** The name of the parallel environment (required for SGE, for example) */ + @Argument(fullName="job_parallel_env", shortName="jobParaEnv", doc="An SGE style parallel environment to use for jobs requesting more than 1 core. Equivalent to submitting jobs with -pe ARG nt for jobs with nt > 1", required=false) + var parallelEnvironmentName: String = "smp_pe" // Broad default + + @Argument(fullName="dontRequestMultipleCores", shortName="multiCoreJerk", doc="If provided, Queue will not request multiple processors for jobs using multiple processors. Sometimes you eat the bear, sometimes the bear eats you.", required=false) + var dontRequestMultipleCores: Boolean = false + @Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false) var runDirectory = new File(".") diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala index 227261912..2aae2fc6b 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/drmaa/DrmaaJobRunner.scala @@ -28,8 +28,8 @@ import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.util.{Logging,Retry} import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} -import java.util.Collections import org.ggf.drmaa._ +import java.util.{Date, Collections} /** * Runs jobs using DRMAA. @@ -103,6 +103,18 @@ class DrmaaJobRunner(val session: Session, val function: CommandLineFunction) ex case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.RUNNING case Session.DONE => val jobInfo: JobInfo = session.wait(jobId, Session.TIMEOUT_NO_WAIT) + + // Update jobInfo + def convertDRMAATime(key: String): Date = { + val v = jobInfo.getResourceUsage.get(key) + if ( v != null ) new Date(v.toString.toDouble.toLong * 1000) else null; + } + if ( jobInfo.getResourceUsage != null ) { + getRunInfo.startTime = convertDRMAATime("start_time") + getRunInfo.doneTime = convertDRMAATime("end_time") + getRunInfo.exechosts = "unknown" + } + if ((jobInfo.hasExited && jobInfo.getExitStatus != 0) || jobInfo.hasSignaled || jobInfo.wasAborted) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala index 96e3ffd95..fca92a7a1 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -52,13 +52,28 @@ class GridEngineJobRunner(session: Session, function: CommandLineFunction) exten nativeSpec += " -q " + function.jobQueue // If the resident set size is requested pass on the memory request - if (function.residentRequest.isDefined) - nativeSpec += " -l mem_free=%dM".format(function.residentRequest.map(_ * 1024).get.ceil.toInt) + // NOTE: 12/20/11: depristo commented this out because mem_free isn't + // such a standard feature in SGE (gsa-engineering queue doesn't support it) + // requiring it can make SGE not so usable. It's dangerous to not enforce + // that we have enough memory to run our jobs, but I'd rather be dangerous + // than not be able to run my jobs at all. +// if (function.residentRequest.isDefined) +// nativeSpec += " -l mem_free=%dM".format(function.residentRequest.map(_ * 1024).get.ceil.toInt) // If the resident set size limit is defined specify the memory limit if (function.residentLimit.isDefined) nativeSpec += " -l h_rss=%dM".format(function.residentLimit.map(_ * 1024).get.ceil.toInt) + // If more than 1 core is requested, set the proper request + // if we aren't being jerks and just stealing cores (previous behavior) + if ( function.nCoresRequest.getOrElse(1) > 1 ) { + if ( function.qSettings.dontRequestMultipleCores ) + logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format( + function.jobName, function.nCoresRequest.get)) + else + nativeSpec += " -pe %s %d".format(function.qSettings.parallelEnvironmentName, function.nCoresRequest.get) + } + // Pass on any job resource requests nativeSpec += function.jobResourceRequests.map(" -l " + _).mkString @@ -70,6 +85,7 @@ class GridEngineJobRunner(session: Session, function: CommandLineFunction) exten if (priority.isDefined) nativeSpec += " -p " + priority.get + logger.debug("Native spec is: %s".format(nativeSpec)) (nativeSpec + " " + super.functionNativeSpec).trim() } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index 323cc63ff..5ef78500c 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -56,6 +56,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR private val selectString = new StringBuffer() private val usageString = new StringBuffer() private val requestString = new StringBuffer() + private val spanString = new StringBuffer() /** * Dispatches the function on the LSF cluster. @@ -100,6 +101,23 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR appendRequest("rusage", usageString, ",", "mem=%d".format(memInUnits)) } + // + // Request multiple cores on the same host. If nCoresRequest > 1, and we + // aren't being jerks and stealing cores, set numProcessors and maxNumProcessors + // and the span[host=1] parameters to get us exactly the right number of + // cores on a single host + // + if ( function.nCoresRequest.getOrElse(1) > 1 ) { + if ( function.qSettings.dontRequestMultipleCores ) + logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format( + function.jobName, function.nCoresRequest.get)) + else { + request.numProcessors = function.nCoresRequest.get + request.maxNumProcessors = request.numProcessors + appendRequest("span", spanString, ",", "hosts=1") + } + } + val resReq = getResourceRequest if (resReq.length > 0) { request.resReq = resReq @@ -167,10 +185,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR requestString.setLength(0) selectString.setLength(0) usageString.setLength(0) + spanString.setLength(0) requestString.append(function.jobResourceRequests.mkString(" ")) extractSection(requestString, "select", selectString) extractSection(requestString, "rusage", usageString) + extractSection(requestString, "span", spanString) } private def extractSection(requestString: StringBuffer, section: String, sectionString: StringBuffer) { @@ -196,7 +216,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR sectionString.insert(sectionString.length() - 1, separator + request) } - private def getResourceRequest = "%s %s %s".format(selectString, usageString, requestString).trim() + private def getResourceRequest = "%s %s %s %s".format(selectString, usageString, spanString, requestString).trim() } object Lsf706JobRunner extends Logging { diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index b08832f22..167dcb593 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -17,6 +17,9 @@ trait CommandLineFunction extends QFunction with Logging { /** Resident memory request */ var residentRequest: Option[Double] = None + /** the number of SMP cores this job wants */ + var nCoresRequest: Option[Int] = None + /** Job project to run the command */ var jobProject: String = _ @@ -45,6 +48,9 @@ trait CommandLineFunction extends QFunction with Logging { if (commandLineFunction.residentRequest.isEmpty) commandLineFunction.residentRequest = this.residentRequest + if (commandLineFunction.nCoresRequest.isEmpty) + commandLineFunction.nCoresRequest = this.nCoresRequest + if (commandLineFunction.jobProject == null) commandLineFunction.jobProject = this.jobProject @@ -100,6 +106,10 @@ trait CommandLineFunction extends QFunction with Logging { if (residentRequest.isEmpty) residentRequest = qSettings.residentRequest + // the default value is 1 core + if (nCoresRequest.isEmpty) + nCoresRequest = Some(1) + if (residentRequest.isEmpty) residentRequest = memoryLimit