From d185c2961f032964445ff9b9e8cbed7174ec28f7 Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 15 Feb 2011 18:26:14 +0000 Subject: [PATCH] Added pipeline for calling FCP in batches called MultiFullCallingPipeline. Bug smashes for the MCFP: Synchronized access to LSF library and modifications to the QGraph. If values are missing from the graph with -run make sure to exit with a non-zero. Refactored QGraph to pre-generate a unique Int for each QNode speeding up getHashCode/equals inside the graph. Added jobPriority and removed jobLimitSeconds from QFunction. All scatter gather is by default in a single sub directory queueScatterGather. Moved some FCPTest into BaseTest/PipelineTest for use by MFCPTest. Rev'ed the 1000G bams used for validation from v1 to v2 and added code to look for the bams before running other tests. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5247 348d0f76-0448-11de-a6fe-93d51630548a --- .../org/broadinstitute/sting/BaseTest.java | 4 + .../jna/lsf/v7_0_6/LibBatIntegrationTest.java | 3 + .../playground/MultiFullCallingPipeline.scala | 88 +++++ .../sting/queue/QCommandLine.scala | 6 +- .../sting/queue/QSettings.scala | 3 + .../sting/queue/engine/FunctionEdge.scala | 4 +- .../sting/queue/engine/Lsf706JobRunner.scala | 293 +++++++------- .../sting/queue/engine/MappingEdge.scala | 4 +- .../sting/queue/engine/QEdge.scala | 21 +- .../sting/queue/engine/QGraph.scala | 369 ++++++++++-------- .../sting/queue/engine/QNode.scala | 15 +- .../sting/queue/function/QFunction.scala | 13 +- .../ScatterGatherableFunction.scala | 8 +- .../sting/queue/pipeline/PipelineTest.scala | 145 ++++++- .../examples/HelloWorldPipelineTest.scala | 42 +- .../playground/FullCallingPipelineTest.scala | 104 ++--- .../MultiFullCallingPipelineTest.scala | 108 +++++ 17 files changed, 826 insertions(+), 404 deletions(-) create mode 100644 scala/qscript/playground/MultiFullCallingPipeline.scala create mode 100644 scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index c30c736a3..693a593f0 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -66,6 +66,10 @@ public abstract class BaseTest { public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf"; + public static final String intervalsLocation = GATKDataLocation; + public static final String hg19Intervals = intervalsLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list"; + public static final String hg19Chr20Intervals = intervalsLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list"; + public static final String networkTempDir = "/broad/shptmp/"; public static final File networkTempDirFile = new File(networkTempDir); diff --git a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java index d0e82ed90..7b15d1a7e 100644 --- a/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/jna/lsf/v7_0_6/LibBatIntegrationTest.java @@ -86,6 +86,9 @@ public class LibBatIntegrationTest extends BaseTest { req.outFile = outFile.getPath(); req.options |= LibBat.SUB_OUT_FILE; + req.userPriority = 100; + req.options2 |= LibBat.SUB2_JOB_PRIORITY; + req.command = "echo \"Hello world.\""; submitReply reply = new submitReply(); diff --git a/scala/qscript/playground/MultiFullCallingPipeline.scala b/scala/qscript/playground/MultiFullCallingPipeline.scala new file mode 100644 index 000000000..d83e0324f --- /dev/null +++ b/scala/qscript/playground/MultiFullCallingPipeline.scala @@ -0,0 +1,88 @@ +import collection.JavaConversions +import org.broadinstitute.sting.queue.function.JarCommandLineFunction +import org.broadinstitute.sting.queue.QScript +import org.broadinstitute.sting.queue.util.IOUtils +import org.broadinstitute.sting.utils.text.XReadLines + +class MultiFullCallingPipeline extends QScript { + qscript => + + @Input(doc="Sting home", shortName="stingHome") + var stingHome: File = _ + + @Input(doc="yaml lists to run", shortName="YL") + var yamlList: File = _ + + @Argument(doc="number of jobs per batch", shortName="BS") + var batchSize: Int = _ + + @Argument(doc="pipeline status to", shortName="PS", required = false) + var pipelineStatusTo: String = _ + + @Argument(doc="pipeline job queue", shortName="PJQ", required = false) + var pipelineJobQueue: String = _ + + @Argument(doc="pipeline short queue", shortName="PSQ", required = false) + var pipelineShortQueue: String = _ + + @Argument(doc="pipeline priority", shortName="PP", required = false) + var pipelinePriority: Option[Int] = None + + def script { + // Global arguments for all pipeline runs + stingHome = IOUtils.absolute(stingHome) + val queueJar = new File(stingHome, "dist/Queue.jar") + val pipelineScript = new File(stingHome, "scala/qscript/playground/FullCallingPipeline.q") + val gatkJar = new File(stingHome, "dist/GenomeAnalysisTK.jar") + val tearScript = new File(stingHome, "R/DataProcessingReport/GetTearsheetStats.R") + + // Parse the yaml list + var yamls = List.empty[File] + for (yaml <- JavaConversions.asScalaIterator(new XReadLines(yamlList))) + yamls :+= new File(yaml) + + // The list of previous outputs + val lastOuts = new Array[File](batchSize) + for (yamlGroup <- yamls.grouped(batchSize)) { + for ((yaml, i) <- yamlGroup.zipWithIndex) { + // Get the last output for index(i), which is null for the first job. + val lastOut = lastOuts(i) + + // Run the pipeline on the yaml waiting for the last output. + val runPipeline = new RunPipeline(yaml, lastOut) + + // Add this run to the graph. + add(runPipeline) + + // Have the next job at index(i) wait for this output file. + lastOuts(i) = runPipeline.jobOutputFile + } + } + + /** + * Runs a yaml in a pipeline only after a previous pipeline + * run has produced the passed in output file. + */ + class RunPipeline(yamlFile: File, lastOutput: File) extends JarCommandLineFunction { + @Input(doc="output file to wait for", required=false) + var waitJobOutputFile = lastOutput + + commandDirectory = yamlFile.getParentFile + jobOutputFile = IOUtils.absolute(commandDirectory, "queue.out") + jarFile = queueJar + memoryLimit = Some(1) + + private var yamlName = yamlFile.getName.stripSuffix(".yaml") + + override def commandLine = super.commandLine + + optional(" -statusTo ", qscript.pipelineStatusTo) + + optional(" -jobQueue ", qscript.pipelineJobQueue) + + optional(" -shortJobQueue ", qscript.pipelineShortQueue) + + optional(" -jobPriority ", qscript.pipelinePriority) + + " -S %s --gatkjar %s -tearScript %s -jobProject %s -jobPrefix %s -Y %s -bsub -run" + .format(pipelineScript, gatkJar, tearScript, yamlName, yamlName, yamlFile) + + override def dotString = "Queue: " + yamlName + } + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 0a2a53a32..c99457d1b 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -48,11 +48,11 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } - qGraph.run + qGraph.run() - if (qGraph.hasFailed) { + if (!qGraph.success) { logger.info("Done with errors") - qGraph.logFailed + qGraph.logFailed() 1 } else { logger.info("Done") diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala index b3dd6fb3c..993932bd5 100644 --- a/scala/src/org/broadinstitute/sting/queue/QSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -17,6 +17,9 @@ class QSettings { @Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false) var jobProject: String = "Queue" + @Argument(fullName="job_priority", shortName="jobPriority", doc="Default priority for jobs.", required=false) + var jobPriority: Option[Int] = None + @Argument(fullName="job_scatter_gather_directory", shortName="jobSGDir", doc="Default directory to place scatter gather output for compute farm jobs.", required=false) var jobScatterGatherDirectory: File = _ diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 23c4661d0..0a165f10e 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -10,7 +10,7 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils} * and then the runner is specified later when the time comes to * execute the function in the edge. */ -class FunctionEdge(var function: QFunction) extends QEdge with Logging { +class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNode) extends QEdge with Logging { var runner: JobRunner[_] =_ /** @@ -131,8 +131,6 @@ class FunctionEdge(var function: QFunction) extends QEdge with Logging { runner = null } - def inputs = function.inputs - def outputs = function.outputs override def dotString = function.dotString /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index 149f4f040..70fdf2d48 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -31,125 +31,132 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR * @param function Command to run. */ def start() = { - val request = new submit - for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) + Lsf706JobRunner.lsfLibLock.synchronized { + val request = new submit + for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) request.rLimits(i) = LibLsf.DEFAULT_RLIMIT; - request.outFile = function.jobOutputFile.getPath - request.options |= LibBat.SUB_OUT_FILE + request.outFile = function.jobOutputFile.getPath + request.options |= LibBat.SUB_OUT_FILE - if (function.jobErrorFile != null) { - request.errFile = function.jobErrorFile.getPath - request.options |= LibBat.SUB_ERR_FILE - } + if (function.jobErrorFile != null) { + request.errFile = function.jobErrorFile.getPath + request.options |= LibBat.SUB_ERR_FILE + } - if (function.jobProject != null) { - request.projectName = function.jobProject - request.options |= LibBat.SUB_PROJECT_NAME - } + if (function.jobProject != null) { + request.projectName = function.jobProject + request.options |= LibBat.SUB_PROJECT_NAME + } - if (function.jobQueue != null) { - request.queue = function.jobQueue - request.options |= LibBat.SUB_QUEUE - } + if (function.jobQueue != null) { + request.queue = function.jobQueue + request.options |= LibBat.SUB_QUEUE + } - if (IOUtils.absolute(new File(".")) != function.commandDirectory) { - request.cwd = function.commandDirectory.getPath - request.options3 |= LibBat.SUB3_CWD - } + if (IOUtils.absolute(new File(".")) != function.commandDirectory) { + request.cwd = function.commandDirectory.getPath + request.options3 |= LibBat.SUB3_CWD + } - if (function.jobRestartable) { - request.options |= LibBat.SUB_RERUNNABLE - } + if (function.jobRestartable) { + request.options |= LibBat.SUB_RERUNNABLE + } - if (function.memoryLimit.isDefined) { - request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" - request.options |= LibBat.SUB_RES_REQ - } + if (function.memoryLimit.isDefined) { + request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" + request.options |= LibBat.SUB_RES_REQ + } - if (function.description != null) { - request.jobName = function.description.take(1000) - request.options |= LibBat.SUB_JOB_NAME - } + if (function.description != null) { + request.jobName = function.description.take(1000) + request.options |= LibBat.SUB_JOB_NAME + } + + if (function.jobPriority.isDefined) { + request.userPriority = function.jobPriority.get + request.options2 |= LibBat.SUB2_JOB_PRIORITY + } - if (function.jobLimitSeconds.isDefined) { - request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get - } else { request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) + + writeExec() + request.command = "sh " + exec + + // Allow advanced users to update the request. + updateJobRun(request) + + runStatus = RunnerStatus.RUNNING + Retry.attempt(() => { + val reply = new submitReply + jobId = LibBat.lsb_submit(request, reply) + if (jobId < 0) + throw new QException(LibBat.lsb_sperror("Unable to submit job")) + }, 1, 5, 10) + logger.info("Submitted LSF job id: " + jobId) } - - writeExec() - request.command = "sh " + exec - - // Allow advanced users to update the request. - updateJobRun(request) - - runStatus = RunnerStatus.RUNNING - Retry.attempt(() => { - val reply = new submitReply - jobId = LibBat.lsb_submit(request, reply) - if (jobId < 0) - throw new QException(LibBat.lsb_sperror("Unable to submit job")) - }, 1, 5, 10) - logger.info("Submitted LSF job id: " + jobId) } /** * Updates and returns the status. */ def status = { - var jobStatus = LibBat.JOB_STAT_UNKWN - var exitStatus = 0 - var exitInfo = 0 - var endTime: NativeLong = null + Lsf706JobRunner.lsfLibLock.synchronized { + var jobStatus = LibBat.JOB_STAT_UNKWN + var exitStatus = 0 + var exitInfo = 0 + var endTime: NativeLong = null - val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) - if (result < 0) - throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) - try { - if (result > 0) { - val more = new IntByReference(result) - val jobInfo = LibBat.lsb_readjobinfo(more) - if (jobInfo == null) - throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId)) - jobStatus = jobInfo.status - exitStatus = jobInfo.exitStatus - exitInfo = jobInfo.exitInfo - endTime = jobInfo.endTime + val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) + if (result < 0) + throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) + try { + if (result > 1) + throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId)) + else if (result == 1) { + val more = new IntByReference(result) + val jobInfo = LibBat.lsb_readjobinfo(more) + if (jobInfo == null) + throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId)) + jobStatus = jobInfo.status + exitStatus = jobInfo.exitStatus + exitInfo = jobInfo.exitInfo + endTime = jobInfo.endTime + } + } finally { + LibBat.lsb_closejobinfo() } - } finally { - LibBat.lsb_closejobinfo() + + logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { + val now = new Date().getTime + + if (firstUnknownTime.isEmpty) { + firstUnknownTime = Some(now) + logger.debug("First unknown status for job id: " + jobId) + } + + if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. + runStatus = RunnerStatus.FAILED + logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description)) + } + } else { + // Reset the last time an unknown status was seen. + firstUnknownTime = None + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { + // Exited function that (probably) won't be retried. + runStatus = RunnerStatus.FAILED + } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { + // Done successfully. + runStatus = RunnerStatus.DONE + } + } + + runStatus } - - logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { - val now = new Date().getTime - - if (firstUnknownTime.isEmpty) { - firstUnknownTime = Some(now) - logger.debug("First unknown status for job id: " + jobId) - } - - if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runStatus = RunnerStatus.FAILED - logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description)) - } - } else { - // Reset the last time an unknown status was seen. - firstUnknownTime = None - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { - // Exited function that (probably) won't be retried. - runStatus = RunnerStatus.FAILED - } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { - // Done successfully. - runStatus = RunnerStatus.DONE - } - } - - runStatus } /** @@ -171,6 +178,8 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR } object Lsf706JobRunner extends Logging { + private val lsfLibLock = new Object + init() /** The name of the default queue. */ @@ -183,8 +192,10 @@ object Lsf706JobRunner extends Logging { * Initialize the Lsf library. */ private def init() = { - if (LibBat.lsb_init("Queue") < 0) - throw new QException(LibBat.lsb_sperror("lsb_init() failed")) + lsfLibLock.synchronized { + if (LibBat.lsb_init("Queue") < 0) + throw new QException(LibBat.lsb_sperror("lsb_init() failed")) + } } /** @@ -194,33 +205,35 @@ object Lsf706JobRunner extends Logging { * @return the run limit in seconds for the queue. */ def getRlimitRun(queue: String) = { - if (queue == null) { - if (defaultQueue != null) { - queueRlimitRun(defaultQueue) - } else { - // Get the info on the default queue. - val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) - if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) - defaultQueue = queueInfo.queue - val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += defaultQueue -> limit - limit - } - } else { - queueRlimitRun.get(queue) match { - case Some(limit) => limit - case None => - // Cache miss. Go get the run limits from LSF. - val queues = new StringArray(Array[String](queue)) + lsfLibLock.synchronized { + if (queue == null) { + if (defaultQueue != null) { + queueRlimitRun(defaultQueue) + } else { + // Get the info on the default queue. val numQueues = new IntByReference(1) - val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0) if (queueInfo == null) - throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue")) + defaultQueue = queueInfo.queue val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) - queueRlimitRun += queue -> limit + queueRlimitRun += defaultQueue -> limit limit + } + } else { + queueRlimitRun.get(queue) match { + case Some(limit) => limit + case None => + // Cache miss. Go get the run limits from LSF. + val queues = new StringArray(Array[String](queue)) + val numQueues = new IntByReference(1) + val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) + if (queueInfo == null) + throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) + val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) + queueRlimitRun += queue -> limit + limit + } } } } @@ -230,23 +243,25 @@ object Lsf706JobRunner extends Logging { * @param runners Runners to stop. */ def tryStop(runners: List[Lsf706JobRunner]) { - for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { - try { - val njobs = jobRunners.size - val signalJobs = new signalBulkJobs - signalJobs.jobs = { - val jobIds = new Memory(8 * njobs) - jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) - jobIds - } - signalJobs.njobs = njobs - signalJobs.signal = 9 + lsfLibLock.synchronized { + for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { + try { + val njobs = jobRunners.size + val signalJobs = new signalBulkJobs + signalJobs.jobs = { + val jobIds = new Memory(8 * njobs) + jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) + jobIds + } + signalJobs.njobs = njobs + signalJobs.signal = 9 - if (LibBat.lsb_killbulkjobs(signalJobs) < 0) - throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) - } catch { - case e => - logger.error("Unable to kill all jobs.", e) + if (LibBat.lsb_killbulkjobs(signalJobs) < 0) + throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) + } catch { + case e => + logger.error("Unable to kill all jobs.", e) + } } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala index 6f42a705b..1d56009f3 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/MappingEdge.scala @@ -1,12 +1,10 @@ package org.broadinstitute.sting.queue.engine -import java.io.File - /** * Utility class to map a set of inputs to set of outputs. * The QGraph uses this function internally to map between user defined functions. */ -class MappingEdge(val inputs: Set[File], val outputs: Set[File]) extends QEdge { +class MappingEdge(val inputs: QNode, val outputs: QNode) extends QEdge { /** * For debugging purposes returns . * @return diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala index 265d60a74..1608e3c08 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QEdge.scala @@ -1,23 +1,32 @@ package org.broadinstitute.sting.queue.engine -import java.io.File - /** * An edge in the QGraph */ trait QEdge { /** - * Set of inputs for this function. + * List of inputs for this function sorted by path. */ - def inputs: Set[File] + def inputs: QNode /** - * Set of outputs for this function. + * List of outputs for this function sorted by path. */ - def outputs: Set[File] + def outputs: QNode /** * The function description in .dot files */ def dotString = "" + + override def hashCode = inputs.hashCode + outputs.hashCode + + override def equals(obj: Any) = { + obj match { + case other: QEdge => + this.inputs == other.inputs && + this.outputs == other.outputs + case _ => false + } + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 99b14b49b..f0ddd18c0 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -22,8 +22,10 @@ class QGraph extends Logging { var debugMode = false private def dryRun = !settings.run + private var numMissingValues = 0 private val jobGraph = newGraph - private var shuttingDown = false + private var running = true + private val runningLock = new Object private val nl = "%n".format() private val inProcessManager = new InProcessJobManager @@ -35,9 +37,15 @@ class QGraph extends Logging { */ def add(command: QFunction) { try { - command.qSettings = settings.qSettings - command.freeze - addEdge(new FunctionEdge(command)) + runningLock.synchronized { + if (running) { + command.qSettings = settings.qSettings + command.freeze + val inputs = getQNode(command.inputs.toList.sortWith(_.compareTo(_) < 0)) + val outputs = getQNode(command.outputs.toList.sortWith(_.compareTo(_) < 0)) + addEdge(new FunctionEdge(command, inputs, outputs)) + } + } } catch { case e: Exception => throw new QException("Error adding function: " + command, e) @@ -47,42 +55,45 @@ class QGraph extends Logging { /** * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ - def run = { + def run() { + runningLock.synchronized { + if (running) { + IOUtils.checkTempDir(settings.qSettings.tempDirectory) + fillGraph + val isReady = numMissingValues == 0 - IOUtils.checkTempDir(settings.qSettings.tempDirectory) - val numMissingValues = fillGraph - val isReady = numMissingValues == 0 + if (this.jobGraph.edgeSet.isEmpty) { + logger.warn("Nothing to run! Were any Functions added?"); + } else if (settings.getStatus) { + logger.info("Checking pipeline status.") + logStatus() + } else if (this.dryRun) { + dryRunJobs() + } else if (isReady) { + logger.info("Running jobs.") + runJobs() + } - if (this.jobGraph.edgeSet.isEmpty) { - logger.warn("Nothing to run! Were any Functions added?"); - } else if (settings.getStatus) { - logger.info("Checking pipeline status.") - logStatus() - } else if (this.dryRun) { - dryRunJobs() - } else if (isReady) { - logger.info("Running jobs.") - runJobs() - } + if (numMissingValues > 0) { + logger.error("Total missing values: " + numMissingValues) + } - if (numMissingValues > 0) { - logger.error("Total missing values: " + numMissingValues) - } - - if (isReady && this.dryRun) { - logger.info("Dry run completed successfully!") - logger.info("Re-run with \"-run\" to execute the functions.") + if (running && isReady && this.dryRun) { + logger.info("Dry run completed successfully!") + logger.info("Re-run with \"-run\" to execute the functions.") + } + } } } - private def fillGraph = { + private def fillGraph { logger.info("Generating graph.") fill if (settings.dotFile != null) renderToDot(settings.dotFile) - var numMissingValues = validate + validate() - if (numMissingValues == 0 && settings.bsubAllJobs) { + if (running && numMissingValues == 0 && settings.bsubAllJobs) { logger.info("Generating scatter gather jobs.") val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) @@ -98,20 +109,18 @@ class QGraph extends Logging { logger.info("Removing original jobs.") this.jobGraph.removeAllEdges(scatterGathers) - prune + prune() logger.info("Adding scatter gather jobs.") - addedFunctions.foreach(this.add(_)) + addedFunctions.foreach(function => if (running) this.add(function)) logger.info("Regenerating graph.") fill val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile if (scatterGatherDotFile != null) renderToDot(scatterGatherDotFile) - numMissingValues = validate + validate() } - - numMissingValues } private def scatterGatherable(edge: QEdge) = { @@ -153,23 +162,25 @@ class QGraph extends Logging { * Fills in the graph using mapping functions, then removes out of date * jobs, then cleans up mapping functions and nodes that aren't need. */ - private def fill = { - fillIn - prune + private def fill() { + fillIn() + prune() } /** * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs. */ - private def fillIn = { + private def fillIn() { // clone since edgeSet is backed by the graph - asScalaSet(jobGraph.edgeSet).clone.foreach { - case cmd: FunctionEdge => { - addCollectionOutputs(cmd.outputs) - addCollectionInputs(cmd.inputs) + asScalaSet(jobGraph.edgeSet).clone.foreach(edge => { + if (running) edge match { + case cmd: FunctionEdge => { + addCollectionOutputs(cmd.outputs) + addCollectionInputs(cmd.inputs) + } + case map: MappingEdge => /* do nothing for mapping edges */ } - case map: MappingEdge => /* do nothing for mapping edges */ - } + }) } private def getReadyJobs = { @@ -190,37 +201,40 @@ class QGraph extends Logging { /** * Removes mapping edges that aren't being used, and nodes that don't belong to anything. */ - private def prune = { + private def prune() { var pruning = true while (pruning) { pruning = false val filler = jobGraph.edgeSet.filter(isFiller(_)) if (filler.size > 0) { jobGraph.removeAllEdges(filler) - pruning = true + pruning = running } } - jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_))) + if (running) + jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_))) } /** * Validates that the functions in the graph have no missing values and that there are no cycles. - * @return Number of missing values. */ - private def validate = { - var numMissingValues = 0 - asScalaSet(jobGraph.edgeSet).foreach { - case cmd: FunctionEdge => - val missingFieldValues = cmd.function.missingFields - if (missingFieldValues.size > 0) { - numMissingValues += missingFieldValues.size - logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description)) - for (missing <- missingFieldValues) - logger.error(" " + missing) + private def validate() { + asScalaSet(jobGraph.edgeSet).foreach( + edge => + if (running) edge match + { + case cmd: FunctionEdge => + val missingFieldValues = cmd.function.missingFields + if (missingFieldValues.size > 0) { + numMissingValues += missingFieldValues.size + logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description)) + for (missing <- missingFieldValues) + logger.error(" " + missing) + } + case map: MappingEdge => /* do nothing for mapping edges */ } - case map: MappingEdge => /* do nothing for mapping edges */ - } + ) val detector = new CycleDetector(jobGraph) if (detector.detectCycles) { @@ -229,27 +243,31 @@ class QGraph extends Logging { logger.error(" " + cycle) throw new QException("Cycles were detected in the graph.") } - - numMissingValues } /** * Dry-runs the jobs by traversing the graph. */ - private def dryRunJobs() = { + private def dryRunJobs() { updateGraphStatus(false) var readyJobs = getReadyJobs - while (!shuttingDown && readyJobs.size > 0) { + while (running && readyJobs.size > 0) { + logger.debug("+++++++") readyJobs.foreach(edge => { - logEdge(edge) - edge.markAsDone + if (running) { + logEdge(edge) + edge.markAsDone + } }) readyJobs = getReadyJobs } } - private def logEdge(edge: FunctionEdge) = { + private def logEdge(edge: FunctionEdge) { logger.info("-------") + if (logger.isDebugEnabled) { + logger.debug("Inputs: " + edge.inputs) + } logger.info(StringUtils.capitalize(edge.status.toString) + ": " + edge.function.description) if (logger.isDebugEnabled) logger.debug(edge.function.commandDirectory + " > " + edge.function.description) @@ -261,7 +279,7 @@ class QGraph extends Logging { /** * Logs job statuses by traversing the graph and looking for status-related files */ - private def logStatus() = { + private def logStatus() { updateGraphStatus(false) doStatus(status => logger.info(status)) } @@ -269,7 +287,7 @@ class QGraph extends Logging { /** * Runs the jobs by traversing the graph. */ - private def runJobs() = { + private def runJobs() { try { if (settings.bsubAllJobs) commandLineManager = new Lsf706JobManager @@ -284,7 +302,7 @@ class QGraph extends Logging { var readyJobs = getReadyJobs var runningJobs = Set.empty[FunctionEdge] - while (!shuttingDown && readyJobs.size + runningJobs.size > 0) { + while (running && readyJobs.size + runningJobs.size > 0) { var exitedJobs = List.empty[FunctionEdge] var failedJobs = List.empty[FunctionEdge] @@ -296,12 +314,14 @@ class QGraph extends Logging { exitedJobs.foreach(runner => runningJobs -= runner) readyJobs.foreach(f => { - f.runner = newRunner(f.function) - f.start() - f.status match { - case RunnerStatus.RUNNING => runningJobs += f - case RunnerStatus.FAILED => failedJobs :+= f - case RunnerStatus.DONE => /* do nothing and move on */ + if (running) { + f.runner = newRunner(f.function) + f.start() + f.status match { + case RunnerStatus.RUNNING => runningJobs += f + case RunnerStatus.FAILED => failedJobs :+= f + case RunnerStatus.DONE => /* do nothing and move on */ + } } }) @@ -329,7 +349,7 @@ class QGraph extends Logging { * Updates the status of edges in the graph. * @param cleanOutputs If true will delete outputs when setting edges to pending. */ - private def updateGraphStatus(cleanOutputs: Boolean) = { + private def updateGraphStatus(cleanOutputs: Boolean) { traverseFunctions(edge => checkDone(edge, cleanOutputs)) } @@ -340,7 +360,7 @@ class QGraph extends Logging { * @param edge Edge to check to see if it's done or can be skipped. * @param cleanOutputs If true will delete outputs when setting edges to pending. */ - private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) = { + private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) { if (edge.function.isIntermediate) { // By default we do not need to run intermediate edges. // Mark any intermediate edges as skipped, if they're not already done. @@ -365,7 +385,7 @@ class QGraph extends Logging { * @param previous Previous edges that provide inputs to edge. * @param cleanOutputs If true will clean up the output files when resetting skipped jobs to pending. */ - private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean): Unit = { + private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean) { for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) { previousEdge.resetToPending(cleanOutputs) resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge), cleanOutputs) @@ -383,8 +403,8 @@ class QGraph extends Logging { } } - private def emailFailedJobs(failed: List[FunctionEdge]) = { - if (settings.statusEmailTo.size > 0) { + private def emailFailedJobs(failed: List[FunctionEdge]) { + if (running && settings.statusEmailTo.size > 0) { val emailMessage = new EmailMessage emailMessage.from = settings.statusEmailFrom emailMessage.to = settings.statusEmailTo @@ -394,7 +414,7 @@ class QGraph extends Logging { } } - private def checkRetryJobs(failed: List[FunctionEdge]) = { + private def checkRetryJobs(failed: List[FunctionEdge]) { if (settings.retries > 0) { for (failedJob <- failed) { if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) { @@ -410,8 +430,8 @@ class QGraph extends Logging { } } - private def emailStatus() = { - if (settings.statusEmailTo.size > 0) { + private def emailStatus() { + if (running && settings.statusEmailTo.size > 0) { var failed = List.empty[FunctionEdge] foreachFunction(edge => { if (edge.status == RunnerStatus.FAILED) { @@ -433,7 +453,7 @@ class QGraph extends Logging { } } - private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) = { + private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) { val logs = failed.flatMap(edge => logFiles(edge)) if (emailMessage.body == null) @@ -549,7 +569,7 @@ class QGraph extends Logging { /** * Updates a status map with scatter/gather status information (e.g. counts) */ - private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) = { + private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) { if (edge.function.isInstanceOf[GatherFunction]) { updateSGStatus(stats.gather, edge) } else if (edge.function.isInstanceOf[CloneFunction]) { @@ -559,7 +579,7 @@ class QGraph extends Logging { } } - private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = { + private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) { stats.total += 1 edge.status match { case RunnerStatus.DONE => stats.done += 1 @@ -584,57 +604,56 @@ class QGraph extends Logging { * @return A new graph */ private def newGraph = new SimpleDirectedGraph[QNode, QEdge](new EdgeFactory[QNode, QEdge] { - def createEdge(input: QNode, output: QNode) = new MappingEdge(input.files, output.files)}) + def createEdge(input: QNode, output: QNode) = new MappingEdge(input, output)}) - private def addEdge(edge: QEdge) = { - val inputs = QNode(edge.inputs) - val outputs = QNode(edge.outputs) - val newSource = jobGraph.addVertex(inputs) - val newTarget = jobGraph.addVertex(outputs) - val removedEdges = jobGraph.removeAllEdges(inputs, outputs) - val added = jobGraph.addEdge(inputs, outputs, edge) - if (this.debugMode) { - logger.debug("Mapped from: " + inputs) - logger.debug("Mapped to: " + outputs) - logger.debug("Mapped via: " + edge) - logger.debug("Removed edges: " + removedEdges) - logger.debug("New source?: " + newSource) - logger.debug("New target?: " + newTarget) - logger.debug("") + private var nextNodeId = 0 + private def getQNode(files: List[File]) = { + jobGraph.vertexSet.find(node => node.files == files) match { + case Some(node) => + node + case None => + if (nextNodeId % 100 == 0) + logger.debug("adding QNode: " + nextNodeId) + val node = new QNode(nextNodeId, files) + nextNodeId += 1 + jobGraph.addVertex(node) + node } } - /** - * Checks to see if the set of files has more than one file and if so adds input mappings between the set and the individual files. - * @param files Set to check. - */ - private def addCollectionInputs(files: Set[File]): Unit = { - if (files.size > 1) - for (file <- files) - addMappingEdge(Set(file), files) + private def addEdge(edge: QEdge) { + jobGraph.removeAllEdges(edge.inputs, edge.outputs) + jobGraph.addEdge(edge.inputs, edge.outputs, edge) } /** - * Checks to see if the set of files has more than one file and if so adds output mappings between the individual files and the set. - * @param files Set to check. + * Adds input mappings between the node's files and the individual files. + * @param inputs Input node. */ - private def addCollectionOutputs(files: Set[File]): Unit = { - if (files.size > 1) - for (file <- files) - addMappingEdge(files, Set(file)) + private def addCollectionInputs(inputs: QNode) { + if (inputs.files.size > 1) + for (file <- inputs.files) { + if (running) { + val input = getQNode(List(file)) + if (!jobGraph.containsEdge(input, inputs)) + addEdge(new MappingEdge(input, inputs)) + } + } } /** - * Adds a directed graph edge between the input set and the output set if there isn't a direct relationship between the two nodes already. - * @param input Input set of files. - * @param output Output set of files. + * Adds output mappings between the node's files and the individual files. + * @param outputs Output node. */ - private def addMappingEdge(input: Set[File], output: Set[File]) = { - val hasEdge = input == output || - jobGraph.getEdge(QNode(input), QNode(output)) != null || - jobGraph.getEdge(QNode(output), QNode(input)) != null - if (!hasEdge) - addEdge(new MappingEdge(input, output)) + private def addCollectionOutputs(outputs: QNode) { + if (outputs.files.size > 1) + for (file <- outputs.files) { + if (running) { + val output = getQNode(List(file)) + if (!jobGraph.containsEdge(outputs, output)) + addEdge(new MappingEdge(outputs, output)) + } + } } /** @@ -644,13 +663,12 @@ class QGraph extends Logging { * @return true if the edge is not needed in the graph. */ private def isFiller(edge: QEdge) = { - if (edge.isInstanceOf[MappingEdge]) { - if (jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0) - true - else if (jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0) - true - else false - } else false + edge match { + case mapping: MappingEdge => + jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0 && + jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0 + case _ => false + } } /** @@ -658,8 +676,10 @@ class QGraph extends Logging { * @param node Node (set of files) to check. * @return true if this set of files is not needed in the graph. */ - private def isOrphan(node: QNode) = - (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 + private def isOrphan(node: QNode) = { + jobGraph.incomingEdgesOf(node).size == 0 && + jobGraph.outgoingEdgesOf(node).size == 0 + } /** * Utility function for running a method over all function edges. @@ -670,7 +690,7 @@ class QGraph extends Logging { .filter(_.isInstanceOf[FunctionEdge]) .asInstanceOf[List[FunctionEdge]] .sortWith(compare(_,_)) - .foreach(f(_)) + .foreach(edge => if (running) f(edge)) } private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean = @@ -699,21 +719,23 @@ class QGraph extends Logging { * Utility function for running a method over all functions, but traversing the nodes in order of dependency. * @param edgeFunction Function to run for each FunctionEdge. */ - private def traverseFunctions(f: (FunctionEdge) => Unit) = { + private def traverseFunctions(f: (FunctionEdge) => Unit) { val iterator = new TopologicalOrderIterator(this.jobGraph) iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = { - event.getEdge match { - case functionEdge: FunctionEdge => f(functionEdge) - case map: MappingEdge => /* do nothing for mapping functions */ + if (running) { + event.getEdge match { + case functionEdge: FunctionEdge => f(functionEdge) + case map: MappingEdge => /* do nothing for mapping functions */ + } } } }) iterator.foreach(_ => {}) } - private def deleteIntermediateOutputs() = { - if (!settings.keepIntermediates && !hasFailed) { + private def deleteIntermediateOutputs() { + if (running && !settings.keepIntermediates && success) { logger.info("Deleting intermediate files.") traverseFunctions(edge => { if (edge.function.isIntermediate) { @@ -729,7 +751,7 @@ class QGraph extends Logging { * http://en.wikipedia.org/wiki/DOT_language * @param file Path to output the .dot file. */ - private def renderToDot(file: java.io.File) = { + private def renderToDot(file: java.io.File) { val out = new java.io.FileWriter(file) // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a @@ -745,46 +767,61 @@ class QGraph extends Logging { } /** - * Returns true if any of the jobs in the graph have a status of failed. - * @return true if any of the jobs in the graph have a status of failed. + * Returns true if no functions have missing values nor a status of failed. + * @return true if no functions have missing values nor a status of failed. */ - def hasFailed = { - !this.dryRun && this.jobGraph.edgeSet.exists(edge => { - edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED - }) + def success = { + if (numMissingValues > 0) { + false + } else if (this.dryRun) { + true + } else { + !this.jobGraph.edgeSet.exists(edge => { + edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED + }) + } } - def logFailed = { + def logFailed() { foreachFunction(edge => { if (edge.status == RunnerStatus.FAILED) logEdge(edge) }) } + /** + * Returns true if the graph was shutdown instead of exiting on its own. + */ + def isShutdown = !running + /** * Kills any forked jobs still running. */ def shutdown() { - shuttingDown = true - val runners = getRunningJobs.map(_.runner) - val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] - if (manager != null) { - val managerRunners = runners - .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) - .asInstanceOf[List[JobRunner[QFunction]]] - if (managerRunners.size > 0) + // Signal the main thread to shutdown. + running = false + // Wait for the thread to finish and exit normally. + runningLock.synchronized { + val runners = getRunningJobs.map(_.runner) + val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] + if (manager != null) { + val managerRunners = runners + .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) + .asInstanceOf[List[JobRunner[QFunction]]] + if (managerRunners.size > 0) + try { + manager.tryStop(managerRunners) + } catch { + case e => /* ignore */ + } + } + runners.foreach(runner => try { - manager.tryStop(managerRunners) + runner.removeTemporaryFiles() } catch { case e => /* ignore */ } + ) } - runners.foreach(runner => - try { - runner.removeTemporaryFiles() - } catch { - case e => /* ignore */ - } - ) } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala index 480c1c88f..a86c08aae 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QNode.scala @@ -4,6 +4,17 @@ import java.io.File /** * Represents a state between QFunctions the directed acyclic QGraph - * @param files The set of files that represent this node state. + * @param files The list of files that represent this node state ordered by file name. */ -case class QNode (val files: Set[File]) +class QNode (val id: Int, val files: List[File]) { + override def equals(obj: Any) = { + obj match { + case other: QNode => this.id == other.id + case _ => false + } + } + + override def hashCode = id + + override def toString = files.toString +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index c727c4f47..5c77f53bb 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -37,12 +37,8 @@ trait QFunction extends Logging { /** Order the function was added to the graph. */ var addOrder: List[Int] = Nil - /** - * EXPERIMENTAL AND NOT SUPPORTED!! - * Limits the number of seconds that the job will run. - * TODO: Replace with full resource specifications. - */ - var jobLimitSeconds: Option[Int] = None + /** Job priority */ + var jobPriority: Option[Int] = None /** Whether a job is restartable */ var jobRestartable = true @@ -70,7 +66,7 @@ trait QFunction extends Logging { function.commandDirectory = this.commandDirectory function.jobTempDir = this.jobTempDir function.addOrder = this.addOrder - function.jobLimitSeconds = this.jobLimitSeconds + function.jobPriority = this.jobPriority function.jobRestartable = this.jobRestartable function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate @@ -319,6 +315,9 @@ trait QFunction extends Logging { if (jobTempDir == null) jobTempDir = qSettings.tempDirectory + if (jobPriority.isEmpty) + jobPriority = qSettings.jobPriority + // Do not set the temp dir relative to the command directory jobTempDir = IOUtils.absolute(jobTempDir) diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 3b3ae5442..18f60a28f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -149,9 +149,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { super.freezeFieldValues if (this.scatterGatherDirectory == null) { - this.scatterGatherDirectory = qSettings.jobScatterGatherDirectory - if (this.scatterGatherDirectory == null) - this.scatterGatherDirectory = this.commandDirectory + if (qSettings.jobScatterGatherDirectory != null) { + this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory) + } else { + this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather") + } } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index 36556628a..1a96136ee 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -1,16 +1,59 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.pipeline +import collection.JavaConversions._ import org.broadinstitute.sting.utils.Utils import org.testng.Assert import org.broadinstitute.sting.commandline.CommandLineProgram -import java.io.File -import org.broadinstitute.sting.queue.util.{TextFormatUtils, ProcessController} import java.util.Date import java.text.SimpleDateFormat import org.broadinstitute.sting.{WalkerTest, BaseTest} import org.broadinstitute.sting.queue.{QException, QCommandLine} +import org.broadinstitute.sting.datasources.pipeline.{Pipeline, PipelineProject, PipelineSample} +import org.broadinstitute.sting.queue.util.{Logging, ProcessController} +import java.io.{FileNotFoundException, File} -object PipelineTest { +object PipelineTest extends BaseTest with Logging { + + case class K1gBam(squidId: String, sampleId: String, version: Int) + + /** 1000G BAMs used for validation */ + val k1gBams = List( + new K1gBam("C474", "NA19651", 2), + new K1gBam("C474", "NA19655", 2), + new K1gBam("C474", "NA19669", 2), + new K1gBam("C454", "NA19834", 2), + new K1gBam("C460", "HG01440", 2), + new K1gBam("C456", "NA12342", 2), + new K1gBam("C456", "NA12748", 2), + new K1gBam("C474", "NA19649", 2), + new K1gBam("C474", "NA19652", 2), + new K1gBam("C474", "NA19654", 2)) + + validateK1gBams() /** The path to the current Sting directory. Useful when specifying Sting resources. */ val currentStingDir = new File(".").getAbsolutePath @@ -18,6 +61,10 @@ object PipelineTest { /** The path to the current build of the GATK jar in the currentStingDir. */ val currentGATK = new File(currentStingDir, "dist/GenomeAnalysisTK.jar") + private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" + + val run = System.getProperty("pipeline.run") == "run" + /** * Returns the top level output path to this test. * @param testName The name of the test passed to PipelineTest.executeTest() @@ -48,12 +95,54 @@ object PipelineTest { */ def fileMD5(testName: String, filePath: String, md5: String) = (new File(runDir(testName) + filePath), md5) - private var runningCommandLines = Set.empty[QCommandLine] + /** + * Creates a new pipeline from a project. + * @param project Pipeline project info. + * @param samples List of samples. + * @return a new pipeline project. + */ + def createPipeline(project: PipelineProject, samples: List[PipelineSample]) = { + val pipeline = new Pipeline + pipeline.setProject(project) + pipeline.setSamples(samples) + pipeline + } - private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" + /** + * Creates a new pipeline project for hg19 with b37 132 dbsnp for genotyping, and b37 129 dbsnp for eval. + * @param projectName Name of the project. + * @param chr20 True if only chr20 should be evaluated or the whole exome. + * @return a new pipeline project. + */ + def createHg19Project(projectName: String, chr20: Boolean) = { + val project = new PipelineProject + project.setName(projectName) + project.setReferenceFile(new File(BaseTest.hg19Reference)) + project.setGenotypeDbsnp(new File(BaseTest.b37dbSNP132)) + project.setEvalDbsnp(new File(BaseTest.b37dbSNP129)) + project.setRefseqTable(new File(BaseTest.hg19Refseq)) + project.setIntervalList(new File(if (chr20) BaseTest.hg19Chr20Intervals else BaseTest.hg19Intervals)) + project + } - val run = System.getProperty("pipeline.run") == "run" + /** + * Creates a 1000G pipeline sample from one of the bams. + * @param idPrefix Text to prepend to the sample name. + * @param k1gBam bam to create the sample for. + * @return the created pipeline sample. + */ + def createK1gSample(idPrefix: String, k1gBam: K1gBam) = { + val sample = new PipelineSample + sample.setId(idPrefix + "_" + k1gBam.sampleId) + sample.setBamFiles(Map("cleaned" -> getPicardBam(k1gBam))) + sample.setTags(Map("SQUIDProject" -> k1gBam.squidId, "CollaboratorID" -> k1gBam.sampleId)) + sample + } + /** + * Runs the pipelineTest. + * @param pipelineTest test to run. + */ def executeTest(pipelineTest: PipelineTestSpec) { val name = pipelineTest.name if (name == null) @@ -78,7 +167,7 @@ object PipelineTest { failed += 1 } if (failed > 0) - Assert.fail("%d of %d MD5%s did not match.".format(failed, fileMD5s.size, TextFormatUtils.plural(failed))) + Assert.fail("%d of %d MD5s did not match.".format(failed, fileMD5s.size)) } private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) { @@ -115,7 +204,7 @@ object PipelineTest { * @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null. * @param expectedException the expected exception or null if no exception is expected. */ - def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) { + private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) { var command = Utils.escapeExpressions(args) // add the logging level to each of the integration test commands @@ -172,6 +261,46 @@ object PipelineTest { } } + /** + * Throws an exception if any of the 1000G bams do not exist and warns if they are out of date. + */ + private def validateK1gBams() { + var missingBams = List.empty[File] + for (k1gBam <- k1gBams) { + val latest = getLatestVersion(k1gBam) + val bam = getPicardBam(k1gBam) + if (k1gBam.version != latest) + logger.warn("1000G bam is not the latest version %d: %s".format(latest, k1gBam)) + if (!bam.exists) + missingBams :+= bam + } + if (missingBams.size > 0) { + val nl = "%n".format() + throw new FileNotFoundException("The following 1000G bam files are missing.%n%s".format(missingBams.mkString(nl))) + } + } + + private def getPicardBam(k1gBam: K1gBam): File = + getPicardBam(k1gBam.squidId, k1gBam.sampleId, k1gBam.version) + + private def getPicardBam(squidId: String, sampleId: String, version: Int): File = + new File(getPicardDir(squidId, sampleId, version), sampleId + ".bam") + + private def getPicardDir(squidId: String, sampleId: String, version: Int) = + new File("/seq/picard_aggregation/%1$s/%2$s/v%3$s/".format(squidId, sampleId, version)) + + private def getLatestVersion(k1gBam: K1gBam): Int = + getLatestVersion(k1gBam.squidId, k1gBam.sampleId, k1gBam.version) + + private def getLatestVersion(squidId: String, sampleId: String, startVersion: Int): Int = { + var version = startVersion + while (new File(getPicardDir(squidId, sampleId, version + 1), "finished.txt").exists) + version += 1 + version + } + + private var runningCommandLines = Set.empty[QCommandLine] + Runtime.getRuntime.addShutdownHook(new Thread { /** Cleanup as the JVM shuts down. */ override def run { diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala index a648838a7..6a8c89ab6 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -1,3 +1,27 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.pipeline.examples import org.testng.annotations.Test @@ -7,8 +31,24 @@ class HelloWorldPipelineTest { @Test def testHelloWorld { val spec = new PipelineTestSpec - spec.name = "helloworld" + spec.name = "HelloWorld" + spec.args = "-S scala/qscript/examples/HelloWorld.scala" + PipelineTest.executeTest(spec) + } + + @Test + def testHelloWorldWithPrefix { + val spec = new PipelineTestSpec + spec.name = "HelloWorldWithPrefix" spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPrefix HelloWorld" PipelineTest.executeTest(spec) } + + @Test + def testHelloWorldWithPriority { + val spec = new PipelineTestSpec + spec.name = "HelloWorldWithPriority" + spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPriority 100" + PipelineTest.executeTest(spec) + } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala index 96cea668a..7b6ae9446 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala @@ -1,31 +1,41 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + package org.broadinstitute.sting.queue.pipeline.playground import org.testng.annotations.{DataProvider, Test} import collection.JavaConversions._ import java.io.File -import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline} +import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, Pipeline} import org.broadinstitute.sting.utils.yaml.YamlUtils -import org.broadinstitute.sting.BaseTest import org.broadinstitute.sting.queue.pipeline._ class FullCallingPipelineTest { def datasets = List(k1gChr20Dataset, k1gExomeDataset) - val k1gBams = List( - new K1gBam("C474", "NA19651", 1), - new K1gBam("C474", "NA19655", 1), - new K1gBam("C474", "NA19669", 1), - new K1gBam("C454", "NA19834", 1), - new K1gBam("C460", "HG01440", 1), - new K1gBam("C456", "NA12342", 1), - new K1gBam("C456", "NA12748", 1), - new K1gBam("C474", "NA19649", 1), - new K1gBam("C474", "NA19652", 1), - new K1gBam("C474", "NA19654", 1)) - val k1gChr20Dataset = { - val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20") - dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list")) + val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 1348) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 1124) @@ -38,8 +48,7 @@ class FullCallingPipelineTest { } val k1gExomeDataset = { - val dataset = newK1gDataset("Barcoded_1000G_WEx") - dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list")) + val dataset = newK1gDataset("Barcoded_1000G_WEx", false) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 50755) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 40894) @@ -53,34 +62,12 @@ class FullCallingPipelineTest { dataset } - class K1gBam(val squidId: String, val sampleId: String, val version: Int) - - def newK1gDataset(projectName: String) = { - val project = new PipelineProject - project.setName(projectName) - project.setReferenceFile(new File(BaseTest.hg19Reference)) - project.setGenotypeDbsnp(new File(BaseTest.b37dbSNP132)) - project.setEvalDbsnp(new File(BaseTest.b37dbSNP129)) - project.setRefseqTable(new File(BaseTest.hg19Refseq)) - + def newK1gDataset(projectName: String, chr20: Boolean) = { + val project = PipelineTest.createHg19Project(projectName, chr20) var samples = List.empty[PipelineSample] - for (k1gBam <- k1gBams) { - val sample = new PipelineSample - sample.setId(projectName + "_" + k1gBam.sampleId) - sample.setBamFiles(Map("recalibrated" -> new File("/seq/picard_aggregation/%1$s/%2$s/v%3$s/%2$s.bam" - .format(k1gBam.squidId, k1gBam.sampleId, k1gBam.version)))) - sample.setTags(Map("SQUIDProject" -> k1gBam.squidId, "CollaboratorID" -> k1gBam.sampleId)) - samples :+= sample - } - - val pipeline = new Pipeline - pipeline.setProject(project) - pipeline.setSamples(samples) - - val dataset = new PipelineDataset - dataset.pipeline = pipeline - - dataset + for (k1gBam <- PipelineTest.k1gBams) + samples :+= PipelineTest.createK1gSample(projectName, k1gBam) + new PipelineDataset(PipelineTest.createPipeline(project, samples)) } @DataProvider(name="datasets")//, parallel=true) @@ -92,49 +79,40 @@ class FullCallingPipelineTest { val projectName = dataset.pipeline.getProject.getName val testName = "FullCallingPipeline-" + projectName val yamlFile = writeYaml(testName, dataset.pipeline) - var cleanType = "cleaned" // Run the pipeline with the expected inputs. - var pipelineCommand = ("-retry 1 -S scala/qscript/playground/FullCallingPipeline.q" + + val pipelineCommand = ("-retry 1 -S scala/qscript/playground/FullCallingPipeline.q" + " -jobProject %s -Y %s" + " -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" + " --gatkjar %s") .format(projectName, yamlFile, PipelineTest.currentStingDir, PipelineTest.currentGATK) - if (!dataset.runIndelRealigner) { - pipelineCommand += " -skipCleaning" - cleanType = "uncleaned" - } - val pipelineSpec = new PipelineTestSpec pipelineSpec.name = testName pipelineSpec.args = pipelineCommand pipelineSpec.jobQueue = dataset.jobQueue pipelineSpec.evalSpec = new PipelineTestEvalSpec - pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType)) + pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.cleaned.annotated.handfiltered.vcf".format(projectName)) pipelineSpec.evalSpec.reference = dataset.pipeline.getProject.getReferenceFile pipelineSpec.evalSpec.intervals = dataset.pipeline.getProject.getIntervalList pipelineSpec.evalSpec.dbsnp = dataset.pipeline.getProject.getEvalDbsnp pipelineSpec.evalSpec.validations = dataset.validations - // Run the test, at least checking if the command compiles PipelineTest.executeTest(pipelineSpec) } - class PipelineDataset( - var pipeline: Pipeline = null, - var validations: List[PipelineValidation] = Nil, - var jobQueue: String = null, - var runIndelRealigner: Boolean = false) { - override def toString = pipeline.getProject.getName - } - private def writeYaml(testName: String, pipeline: Pipeline) = { val runDir = PipelineTest.runDir(testName) - new File(runDir).mkdirs val yamlFile = new File(runDir, pipeline.getProject.getName + ".yaml") + yamlFile.getParentFile.mkdirs YamlUtils.dump(pipeline, yamlFile) yamlFile } + + class PipelineDataset(var pipeline: Pipeline = null, + var validations: List[PipelineValidation] = Nil, + var jobQueue: String = null) { + override def toString = pipeline.getProject.getName + } } diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala new file mode 100644 index 000000000..ab064b588 --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/MultiFullCallingPipelineTest.scala @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.pipeline.playground + +import collection.JavaConversions._ +import org.broadinstitute.sting.datasources.pipeline.Pipeline +import org.broadinstitute.sting.utils.yaml.YamlUtils +import org.testng.annotations.{Test, DataProvider} +import org.broadinstitute.sting.queue.pipeline.{PipelineTestSpec, PipelineTest} +import java.io.{PrintWriter, File} +import org.apache.commons.io.IOUtils + +class MultiFullCallingPipelineTest { + def datasets = List(k1gChr20Dataset) + + val k1gChr20Dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true, "hour") + val k1gExomeDataset = newK1gDataset("Barcoded_1000G_WEx", false, "gsa") + + def newK1gDataset(datasetName: String, chr20: Boolean, pipelineJobQueue: String) = { + var dataset = new MultiPipelineDataset + dataset.name = datasetName + dataset.pipelineJobQueue = pipelineJobQueue + for (k1gBam <- PipelineTest.k1gBams) { + val project = PipelineTest.createHg19Project("SingleSample_" + k1gBam.sampleId, chr20) + val sample = PipelineTest.createK1gSample("Sample", k1gBam) + dataset.samplePipelines :+= PipelineTest.createPipeline(project, List(sample)) + } + dataset + } + + @DataProvider(name="datasets")//, parallel=true) + final def convertDatasets: Array[Array[AnyRef]] = + datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray + + @Test(dataProvider="datasets", enabled=false) + def testMultiFullCallingPipeline(dataset: MultiPipelineDataset) = { + val projectName = dataset.name + val testName = "MultiFullCallingPipeline-" + projectName + + var yamlFiles = List.empty[File] + for (samplePipeline <- dataset.samplePipelines) + yamlFiles :+= writeYaml(testName, samplePipeline) + + val yamlList = writeYamlList(testName, yamlFiles) + + // Run the pipeline with the expected inputs. + val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" + + " -jobProject %s -YL %s -PJQ %s -stingHome %s") + .format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir) + + val pipelineSpec = new PipelineTestSpec + pipelineSpec.name = testName + pipelineSpec.args = pipelineCommand + pipelineSpec.jobQueue = "gsa" + + PipelineTest.executeTest(pipelineSpec) + } + + private def writeYaml(testName: String, pipeline: Pipeline) = { + val runDir = PipelineTest.runDir(testName) + val yamlFile = new File(runDir, pipeline.getProject.getName + "/" + pipeline.getProject.getName + ".yaml").getAbsoluteFile + yamlFile.getParentFile.mkdirs + YamlUtils.dump(pipeline, yamlFile) + yamlFile + } + + private def writeYamlList(testName: String, yamlFiles: List[File]) = { + val runDir = PipelineTest.runDir(testName) + val yamlList = new File(runDir, testName + "_yamls.list").getAbsoluteFile + yamlList.getParentFile.mkdirs + val writer = new PrintWriter(yamlList) + try { + for (yamlFile <- yamlFiles) + writer.println(yamlFile.toString) + } finally { + IOUtils.closeQuietly(writer) + } + yamlList + } + + class MultiPipelineDataset (var name: String = null, + var pipelineJobQueue: String = null, + var samplePipelines: List[Pipeline] = Nil) { + override def toString = name + } +}