General improvements to Queue

-- Support for collecting resources info from DRMAA runners
-- Disabled the non-standard mem_free argument so that we can actually use our own SGE cluster gsa4
-- NCoresRequest is a testing queue script for this.
-- Added two command line arguments:
  -- multiCoreJerk: don't request multiple cores for jobs with nt > 1.  This was the old behavior but it's really not the best way to run parallel jobs.  Now with queue if you run nt = 4 the system requests 4 cores on your host.  If this flag is thrown, though, it will only request 1 and you'll just use 4, like a jerk
  -- job_parallel_env: parallel environment named used with SGE to request multicore jobs.  Equivalent to -pe job_parallel_env NT for NT > 1 jobs
This commit is contained in:
Mark DePristo 2011-12-19 15:46:32 -05:00
parent 7204fcc2c3
commit 0cc5c3d799
6 changed files with 80 additions and 11 deletions

View File

@ -147,7 +147,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
String clpConstructor = String.format("analysisName = \"%s\"%njavaMainClass = \"%s\"%n", clpClassName, clp.getName());
writeClass("org.broadinstitute.sting.queue.function.JavaCommandLineFunction", clpClassName,
false, clpConstructor, ArgumentDefinitionField.getArgumentFields(parser,clp), dependents);
false, clpConstructor, ArgumentDefinitionField.getArgumentFields(parser,clp), dependents, false);
if (clp == CommandLineGATK.class) {
for (Entry<String, Collection<Class<? extends Walker>>> walkersByPackage: walkerManager.getWalkerNamesByPackage(false).entrySet()) {
@ -169,7 +169,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
}
writeClass(GATK_EXTENSIONS_PACKAGE_NAME + "." + clpClassName, walkerName,
isScatter, constructor, argumentFields, dependents);
isScatter, constructor, argumentFields, dependents, true);
} catch (Exception e) {
throw new ReviewedStingException("Error generating wrappers for walker " + walkerType, e);
}
@ -241,8 +241,9 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
* @throws IOException If the file cannot be written.
*/
private void writeClass(String baseClass, String className, boolean isScatter,
String constructor, List<? extends ArgumentField> argumentFields, Set<Class<?>> dependents) throws IOException {
String content = getContent(CLASS_TEMPLATE, baseClass, className, constructor, isScatter, "", argumentFields, dependents);
String constructor, List<? extends ArgumentField> argumentFields,
Set<Class<?>> dependents, boolean isGATKWalker) throws IOException {
String content = getContent(CLASS_TEMPLATE, baseClass, className, constructor, isScatter, "", argumentFields, dependents, isGATKWalker);
writeFile(GATK_EXTENSIONS_PACKAGE_NAME + "." + className, content);
}
@ -256,7 +257,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
*/
private void writeFilter(String className, List<? extends ArgumentField> argumentFields, Set<Class<?>> dependents) throws IOException {
String content = getContent(TRAIT_TEMPLATE, "org.broadinstitute.sting.queue.function.CommandLineFunction",
className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields, dependents);
className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields, dependents, false);
writeFile(GATK_EXTENSIONS_PACKAGE_NAME + "." + className, content);
}
@ -350,7 +351,8 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
*/
private static String getContent(String scalaTemplate, String baseClass, String className,
String constructor, boolean isScatter, String commandLinePrefix,
List<? extends ArgumentField> argumentFields, Set<Class<?>> dependents) {
List<? extends ArgumentField> argumentFields, Set<Class<?>> dependents,
boolean isGATKWalker) {
StringBuilder arguments = new StringBuilder();
StringBuilder commandLine = new StringBuilder(commandLinePrefix);
@ -384,8 +386,10 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
StringBuffer freezeFieldOverride = new StringBuffer();
for (String freezeField: freezeFields)
freezeFieldOverride.append(freezeField);
if (freezeFieldOverride.length() > 0) {
if (freezeFieldOverride.length() > 0 || isGATKWalker) {
freezeFieldOverride.insert(0, String.format("override def freezeFieldValues = {%nsuper.freezeFieldValues%n"));
if ( isGATKWalker )
freezeFieldOverride.append(String.format("if ( num_threads.isDefined ) nCoresRequest = num_threads%n"));
freezeFieldOverride.append(String.format("}%n%n"));
}

View File

@ -62,6 +62,13 @@ class QSettings {
@Argument(fullName="resident_memory_request", shortName="resMemReq", doc="Default resident memory request for jobs, in gigabytes.", required=false)
var residentRequest: Option[Double] = None
/** The name of the parallel environment (required for SGE, for example) */
@Argument(fullName="job_parallel_env", shortName="jobParaEnv", doc="An SGE style parallel environment to use for jobs requesting more than 1 core. Equivalent to submitting jobs with -pe ARG nt for jobs with nt > 1", required=false)
var parallelEnvironmentName: String = "smp_pe" // Broad default
@Argument(fullName="dontRequestMultipleCores", shortName="multiCoreJerk", doc="If provided, Queue will not request multiple processors for jobs using multiple processors. Sometimes you eat the bear, sometimes the bear eats you.", required=false)
var dontRequestMultipleCores: Boolean = false
@Argument(fullName="run_directory", shortName="runDir", doc="Root directory to run functions from.", required=false)
var runDirectory = new File(".")

View File

@ -28,8 +28,8 @@ import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.util.{Logging,Retry}
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
import java.util.Collections
import org.ggf.drmaa._
import java.util.{Date, Collections}
/**
* Runs jobs using DRMAA.
@ -103,6 +103,18 @@ class DrmaaJobRunner(val session: Session, val function: CommandLineFunction) ex
case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.RUNNING
case Session.DONE =>
val jobInfo: JobInfo = session.wait(jobId, Session.TIMEOUT_NO_WAIT)
// Update jobInfo
def convertDRMAATime(key: String): Date = {
val v = jobInfo.getResourceUsage.get(key)
if ( v != null ) new Date(v.toString.toDouble.toLong * 1000) else null;
}
if ( jobInfo.getResourceUsage != null ) {
getRunInfo.startTime = convertDRMAATime("start_time")
getRunInfo.doneTime = convertDRMAATime("end_time")
getRunInfo.exechosts = "unknown"
}
if ((jobInfo.hasExited && jobInfo.getExitStatus != 0)
|| jobInfo.hasSignaled
|| jobInfo.wasAborted)

View File

@ -52,13 +52,28 @@ class GridEngineJobRunner(session: Session, function: CommandLineFunction) exten
nativeSpec += " -q " + function.jobQueue
// If the resident set size is requested pass on the memory request
if (function.residentRequest.isDefined)
nativeSpec += " -l mem_free=%dM".format(function.residentRequest.map(_ * 1024).get.ceil.toInt)
// NOTE: 12/20/11: depristo commented this out because mem_free isn't
// such a standard feature in SGE (gsa-engineering queue doesn't support it)
// requiring it can make SGE not so usable. It's dangerous to not enforce
// that we have enough memory to run our jobs, but I'd rather be dangerous
// than not be able to run my jobs at all.
// if (function.residentRequest.isDefined)
// nativeSpec += " -l mem_free=%dM".format(function.residentRequest.map(_ * 1024).get.ceil.toInt)
// If the resident set size limit is defined specify the memory limit
if (function.residentLimit.isDefined)
nativeSpec += " -l h_rss=%dM".format(function.residentLimit.map(_ * 1024).get.ceil.toInt)
// If more than 1 core is requested, set the proper request
// if we aren't being jerks and just stealing cores (previous behavior)
if ( function.nCoresRequest.getOrElse(1) > 1 ) {
if ( function.qSettings.dontRequestMultipleCores )
logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format(
function.jobName, function.nCoresRequest.get))
else
nativeSpec += " -pe %s %d".format(function.qSettings.parallelEnvironmentName, function.nCoresRequest.get)
}
// Pass on any job resource requests
nativeSpec += function.jobResourceRequests.map(" -l " + _).mkString
@ -70,6 +85,7 @@ class GridEngineJobRunner(session: Session, function: CommandLineFunction) exten
if (priority.isDefined)
nativeSpec += " -p " + priority.get
logger.debug("Native spec is: %s".format(nativeSpec))
(nativeSpec + " " + super.functionNativeSpec).trim()
}
}

View File

@ -56,6 +56,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
private val selectString = new StringBuffer()
private val usageString = new StringBuffer()
private val requestString = new StringBuffer()
private val spanString = new StringBuffer()
/**
* Dispatches the function on the LSF cluster.
@ -100,6 +101,23 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
appendRequest("rusage", usageString, ",", "mem=%d".format(memInUnits))
}
//
// Request multiple cores on the same host. If nCoresRequest > 1, and we
// aren't being jerks and stealing cores, set numProcessors and maxNumProcessors
// and the span[host=1] parameters to get us exactly the right number of
// cores on a single host
//
if ( function.nCoresRequest.getOrElse(1) > 1 ) {
if ( function.qSettings.dontRequestMultipleCores )
logger.warn("Sending multicore job %s to farm without requesting appropriate number of cores (%d)".format(
function.jobName, function.nCoresRequest.get))
else {
request.numProcessors = function.nCoresRequest.get
request.maxNumProcessors = request.numProcessors
appendRequest("span", spanString, ",", "hosts=1")
}
}
val resReq = getResourceRequest
if (resReq.length > 0) {
request.resReq = resReq
@ -167,10 +185,12 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
requestString.setLength(0)
selectString.setLength(0)
usageString.setLength(0)
spanString.setLength(0)
requestString.append(function.jobResourceRequests.mkString(" "))
extractSection(requestString, "select", selectString)
extractSection(requestString, "rusage", usageString)
extractSection(requestString, "span", spanString)
}
private def extractSection(requestString: StringBuffer, section: String, sectionString: StringBuffer) {
@ -196,7 +216,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
sectionString.insert(sectionString.length() - 1, separator + request)
}
private def getResourceRequest = "%s %s %s".format(selectString, usageString, requestString).trim()
private def getResourceRequest = "%s %s %s %s".format(selectString, usageString, spanString, requestString).trim()
}
object Lsf706JobRunner extends Logging {

View File

@ -17,6 +17,9 @@ trait CommandLineFunction extends QFunction with Logging {
/** Resident memory request */
var residentRequest: Option[Double] = None
/** the number of SMP cores this job wants */
var nCoresRequest: Option[Int] = None
/** Job project to run the command */
var jobProject: String = _
@ -45,6 +48,9 @@ trait CommandLineFunction extends QFunction with Logging {
if (commandLineFunction.residentRequest.isEmpty)
commandLineFunction.residentRequest = this.residentRequest
if (commandLineFunction.nCoresRequest.isEmpty)
commandLineFunction.nCoresRequest = this.nCoresRequest
if (commandLineFunction.jobProject == null)
commandLineFunction.jobProject = this.jobProject
@ -100,6 +106,10 @@ trait CommandLineFunction extends QFunction with Logging {
if (residentRequest.isEmpty)
residentRequest = qSettings.residentRequest
// the default value is 1 core
if (nCoresRequest.isEmpty)
nCoresRequest = Some(1)
if (residentRequest.isEmpty)
residentRequest = memoryLimit