From 6ec3dd0f8cc392018f5607c1eccf2841903a0486 Mon Sep 17 00:00:00 2001 From: kshakir Date: Wed, 25 May 2011 17:16:44 +0000 Subject: [PATCH] Updated GridEngineJobRunner to return status RUNNING instead of PENDING when a job has been sent to GridEngine, even if it hasn't started. Added GridEngine to pipeline tests. Removed passing -jobProject since GridEngine projects must be predefined. Writing the HybridSelectionPipelineTest yaml into the temp directory. Disabled job priority as it needs to be refactored for use by GridEngine and LSF. Fixed WholeGenomePipeline variantmergeoption rename to filteredRecordsMergeType. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5872 348d0f76-0448-11de-a6fe-93d51630548a --- .../playground/WholeGenomePipeline.scala | 2 +- .../gridengine/GridEngineJobRunner.scala | 4 +- .../sting/queue/pipeline/PipelineTest.scala | 77 ++++++++++--------- .../examples/HelloWorldPipelineTest.scala | 2 +- .../HybridSelectionPipelineTest.scala | 8 +- 5 files changed, 47 insertions(+), 46 deletions(-) diff --git a/scala/qscript/playground/WholeGenomePipeline.scala b/scala/qscript/playground/WholeGenomePipeline.scala index d8584a800..a2b4a4945 100644 --- a/scala/qscript/playground/WholeGenomePipeline.scala +++ b/scala/qscript/playground/WholeGenomePipeline.scala @@ -190,7 +190,7 @@ class WholeGenomePipeline extends QScript { combineSNPsIndels.rodBind :+= RodBind("indels", "VCF", selectIndels.out) combineSNPsIndels.rodBind :+= RodBind("all", "VCF", combineChunks.out) combineSNPsIndels.rod_priority_list = "indels,all" - combineSNPsIndels.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION + combineSNPsIndels.filteredRecordsMergeType = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.FilteredRecordMergeType.KEEP_IF_ANY_UNFILTERED combineSNPsIndels.assumeIdenticalSamples = true combineSNPsIndels.out = projectBase + ".unrecalibrated.vcf" combineSNPsIndels.jobOutputFile = combineSNPsIndels.out + ".out" diff --git a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala index d0ac2c760..82edf6221 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/gridengine/GridEngineJobRunner.scala @@ -103,7 +103,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine // Allow advanced users to update the request via QFunction.updateJobRun() updateJobRun(gridEngineJob) - updateStatus(RunnerStatus.PENDING) + updateStatus(RunnerStatus.RUNNING) // Start the job and store the id so it can be killed in tryStop try { @@ -195,7 +195,7 @@ object GridEngineJobRunner extends Logging { try { val jobStatus = gridEngineSession.getJobProgramStatus(runner.jobId); jobStatus match { - case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.PENDING + case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.RUNNING case Session.DONE => val jobInfo: JobInfo = gridEngineSession.wait(runner.jobId, Session.TIMEOUT_NO_WAIT) if ((jobInfo.hasExited && jobInfo.getExitStatus > 0) diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index ca7581bed..6b87981ff 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -38,6 +38,7 @@ import org.broadinstitute.sting.queue.util.{Logging, ProcessController} import java.io.{FileNotFoundException, File} import org.broadinstitute.sting.gatk.report.GATKReportParser import org.apache.commons.io.FileUtils +import org.broadinstitute.sting.queue.engine.CommandLinePluginManager object PipelineTest extends BaseTest with Logging { @@ -58,45 +59,38 @@ object PipelineTest extends BaseTest with Logging { validateK1gBams() - /** The path to the current Sting directory. Useful when specifying Sting resources. */ - val currentStingDir = new File(".").getAbsolutePath - - /** The path to the current build of the GATK jar in the currentStingDir. */ - val currentGATK = new File(currentStingDir, "dist/GenomeAnalysisTK.jar") - private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" val run = System.getProperty("pipeline.run") == "run" + private val jobRunners = { + val commandLinePluginManager = new CommandLinePluginManager + commandLinePluginManager.getPlugins.map(commandLinePluginManager.getName(_)).filterNot(_ == "Shell") + } + /** * Returns the top level output path to this test. * @param testName The name of the test passed to PipelineTest.executeTest() + * @param jobRunner The name of the job manager to run the jobs. * @return the top level output path to this test. */ - def testDir(testName: String) = "pipelinetests/%s/".format(testName) + def testDir(testName: String, jobRunner: String) = "pipelinetests/%s/%s/".format(testName, jobRunner) /** * Returns the directory where relative output files will be written for this test. * @param testName The name of the test passed to PipelineTest.executeTest() + * @param jobRunner The name of the job manager to run the jobs. * @return the directory where relative output files will be written for this test. */ - def runDir(testName: String) = testDir(testName) + "run/" + private def runDir(testName: String, jobRunner: String) = testDir(testName, jobRunner) + "run/" /** * Returns the directory where temp files will be written for this test. * @param testName The name of the test passed to PipelineTest.executeTest() + * @param jobRunner The name of the job manager to run the jobs. * @return the directory where temp files will be written for this test. */ - def tempDir(testName: String) = testDir(testName) + "temp/" - - /** - * Encapsulates a file MD5 - * @param testName The name of the test also passed to PipelineTest.executeTest(). - * @param filePath The file path of the output file, relative to the directory the pipeline is run in. - * @param md5 The expected MD5 - * @return a file md5 that can be appended to the PipelineTestSpec.fileMD5s - */ - def fileMD5(testName: String, filePath: String, md5: String) = (new File(runDir(testName) + filePath), md5) + private def tempDir(testName: String, jobRunner: String) = testDir(testName, jobRunner) + "temp/" /** * Creates a new pipeline from a project. @@ -146,19 +140,28 @@ object PipelineTest extends BaseTest with Logging { * @param pipelineTest test to run. */ def executeTest(pipelineTest: PipelineTestSpec) { + jobRunners.foreach(executeTest(pipelineTest, _)) + } + + /** + * Runs the pipelineTest. + * @param pipelineTest test to run. + * @param jobRunner The name of the job manager to run the jobs. + */ + def executeTest(pipelineTest: PipelineTestSpec, jobRunner: String) { val name = pipelineTest.name if (name == null) - Assert.fail("PipelineTestSpec.name is null.") + Assert.fail("PipelineTestSpec.name is null") println(Utils.dupString('-', 80)); - executeTest(name, pipelineTest.args, pipelineTest.jobQueue, pipelineTest.expectedException) + executeTest(name, pipelineTest.args, pipelineTest.jobQueue, pipelineTest.expectedException, jobRunner) if (run) { - assertMatchingMD5s(name, pipelineTest.fileMD5s.map{case (file, md5) => new File(runDir(name), file) -> md5}, pipelineTest.parameterize) + assertMatchingMD5s(name, pipelineTest.fileMD5s.map{case (file, md5) => new File(runDir(name, jobRunner), file) -> md5}, pipelineTest.parameterize) if (pipelineTest.evalSpec != null) - validateEval(name, pipelineTest.evalSpec) - println(" => %s PASSED".format(name)) + validateEval(name, pipelineTest.evalSpec, jobRunner) + println(" => %s PASSED (%s)".format(name, jobRunner)) } else - println(" => %s PASSED DRY RUN".format(name)) + println(" => %s PASSED DRY RUN (%s)".format(name, jobRunner)) } private def assertMatchingMD5s(name: String, fileMD5s: Traversable[(File, String)], parameterize: Boolean) { @@ -169,16 +172,16 @@ object PipelineTest extends BaseTest with Logging { failed += 1 } if (failed > 0) - Assert.fail("%d of %d MD5s did not match.".format(failed, fileMD5s.size)) + Assert.fail("%d of %d MD5s did not match".format(failed, fileMD5s.size)) } - private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) { + private def validateEval(name: String, evalSpec: PipelineTestEvalSpec, jobRunner: String) { // write the report to the shared validation data location val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss") - val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, name, formatter.format(new Date)) + val reportLocation = "%s%s/%s/validation.%s.eval".format(validationReportsDataLocation, jobRunner, name, formatter.format(new Date)) val report = new File(reportLocation) - FileUtils.copyFile(new File(runDir(name) + evalSpec.evalReport), report); + FileUtils.copyFile(new File(runDir(name, jobRunner) + evalSpec.evalReport), report); val parser = new GATKReportParser parser.parse(report) @@ -206,17 +209,17 @@ object PipelineTest extends BaseTest with Logging { * @param args the argument list * @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null. * @param expectedException the expected exception or null if no exception is expected. + * @param jobRunner The name of the job manager to run the jobs. */ - private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) { + private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_], jobRunner: String) { var command = Utils.escapeExpressions(args) // add the logging level to each of the integration test commands - command = Utils.appendArray(command, "-bsub", "-tempDir", tempDir(name), "-runDir", runDir(name)) + command = Utils.appendArray(command, "-jobRunner", jobRunner, + "-tempDir", tempDir(name, jobRunner), "-runDir", runDir(name, jobRunner)) - if (jobQueue == null) - command = Utils.appendArray(command, "-jobQueue", "hour") - else + if (jobQueue != null) command = Utils.appendArray(command, "-jobQueue", jobQueue) if (run) @@ -238,11 +241,11 @@ object PipelineTest extends BaseTest with Logging { println("Wanted exception %s, saw %s".format(expectedException, e.getClass)) if (expectedException.isInstance(e)) { // it's the type we expected - println(String.format(" => %s PASSED", name)) + println(String.format(" => %s PASSED (%s)", name, jobRunner)) } else { e.printStackTrace() - Assert.fail("Test %s expected exception %s but got %s instead".format( - name, expectedException, e.getClass)) + Assert.fail("Test %s expected exception %s but got %s instead (%s)".format( + name, expectedException, e.getClass, jobRunner)) } } else { // we didn't expect an exception but we got one :-( @@ -257,7 +260,7 @@ object PipelineTest extends BaseTest with Logging { if (expectedException != null) { if (!gotAnException) // we expected an exception but didn't see it - Assert.fail("Test %s expected exception %s but none was thrown".format(name, expectedException.toString)) + Assert.fail("Test %s expected exception %s but none was thrown (%s)".format(name, expectedException.toString, jobRunner)) } else { if (CommandLineProgram.result != 0) throw new RuntimeException("Error running Queue with arguments: " + args) diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala index 6a8c89ab6..8b8fa86a4 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -44,7 +44,7 @@ class HelloWorldPipelineTest { PipelineTest.executeTest(spec) } - @Test + @Test(enabled=false) def testHelloWorldWithPriority { val spec = new PipelineTestSpec spec.name = "HelloWorldWithPriority" diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala index 08c6227a9..5e75c0f89 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/HybridSelectionPipelineTest.scala @@ -67,8 +67,8 @@ class HybridSelectionPipelineTest { // Run the pipeline with the expected inputs. val pipelineCommand = - "-retry 1 -S scala/qscript/playground/HybridSelectionPipeline.scala -jobProject %s -Y %s" - .format(projectName, yamlFile) + "-retry 1 -S scala/qscript/playground/HybridSelectionPipeline.scala -Y %s" + .format(yamlFile) val pipelineSpec = new PipelineTestSpec pipelineSpec.name = testName @@ -83,9 +83,7 @@ class HybridSelectionPipelineTest { } private def writeYaml(testName: String, pipeline: Pipeline) = { - val runDir = PipelineTest.runDir(testName) - val yamlFile = new File(runDir, pipeline.getProject.getName + ".yaml") - yamlFile.getParentFile.mkdirs + val yamlFile = BaseTest.createTempFile(pipeline.getProject.getName, ".yaml") YamlUtils.dump(pipeline, yamlFile) yamlFile }