Updated GridEngineJobRunner to return status RUNNING instead of PENDING when a job has been sent to GridEngine, even if it hasn't started.
Added GridEngine to pipeline tests. Removed passing -jobProject since GridEngine projects must be predefined. Writing the HybridSelectionPipelineTest yaml into the temp directory. Disabled job priority as it needs to be refactored for use by GridEngine and LSF. Fixed WholeGenomePipeline variantmergeoption rename to filteredRecordsMergeType. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5872 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
3565eca2dd
commit
6ec3dd0f8c
|
|
@ -190,7 +190,7 @@ class WholeGenomePipeline extends QScript {
|
||||||
combineSNPsIndels.rodBind :+= RodBind("indels", "VCF", selectIndels.out)
|
combineSNPsIndels.rodBind :+= RodBind("indels", "VCF", selectIndels.out)
|
||||||
combineSNPsIndels.rodBind :+= RodBind("all", "VCF", combineChunks.out)
|
combineSNPsIndels.rodBind :+= RodBind("all", "VCF", combineChunks.out)
|
||||||
combineSNPsIndels.rod_priority_list = "indels,all"
|
combineSNPsIndels.rod_priority_list = "indels,all"
|
||||||
combineSNPsIndels.variantmergeoption = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION
|
combineSNPsIndels.filteredRecordsMergeType = org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.FilteredRecordMergeType.KEEP_IF_ANY_UNFILTERED
|
||||||
combineSNPsIndels.assumeIdenticalSamples = true
|
combineSNPsIndels.assumeIdenticalSamples = true
|
||||||
combineSNPsIndels.out = projectBase + ".unrecalibrated.vcf"
|
combineSNPsIndels.out = projectBase + ".unrecalibrated.vcf"
|
||||||
combineSNPsIndels.jobOutputFile = combineSNPsIndels.out + ".out"
|
combineSNPsIndels.jobOutputFile = combineSNPsIndels.out + ".out"
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ class GridEngineJobRunner(val function: CommandLineFunction) extends CommandLine
|
||||||
// Allow advanced users to update the request via QFunction.updateJobRun()
|
// Allow advanced users to update the request via QFunction.updateJobRun()
|
||||||
updateJobRun(gridEngineJob)
|
updateJobRun(gridEngineJob)
|
||||||
|
|
||||||
updateStatus(RunnerStatus.PENDING)
|
updateStatus(RunnerStatus.RUNNING)
|
||||||
|
|
||||||
// Start the job and store the id so it can be killed in tryStop
|
// Start the job and store the id so it can be killed in tryStop
|
||||||
try {
|
try {
|
||||||
|
|
@ -195,7 +195,7 @@ object GridEngineJobRunner extends Logging {
|
||||||
try {
|
try {
|
||||||
val jobStatus = gridEngineSession.getJobProgramStatus(runner.jobId);
|
val jobStatus = gridEngineSession.getJobProgramStatus(runner.jobId);
|
||||||
jobStatus match {
|
jobStatus match {
|
||||||
case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.PENDING
|
case Session.QUEUED_ACTIVE => returnStatus = RunnerStatus.RUNNING
|
||||||
case Session.DONE =>
|
case Session.DONE =>
|
||||||
val jobInfo: JobInfo = gridEngineSession.wait(runner.jobId, Session.TIMEOUT_NO_WAIT)
|
val jobInfo: JobInfo = gridEngineSession.wait(runner.jobId, Session.TIMEOUT_NO_WAIT)
|
||||||
if ((jobInfo.hasExited && jobInfo.getExitStatus > 0)
|
if ((jobInfo.hasExited && jobInfo.getExitStatus > 0)
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import org.broadinstitute.sting.queue.util.{Logging, ProcessController}
|
||||||
import java.io.{FileNotFoundException, File}
|
import java.io.{FileNotFoundException, File}
|
||||||
import org.broadinstitute.sting.gatk.report.GATKReportParser
|
import org.broadinstitute.sting.gatk.report.GATKReportParser
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
|
import org.broadinstitute.sting.queue.engine.CommandLinePluginManager
|
||||||
|
|
||||||
object PipelineTest extends BaseTest with Logging {
|
object PipelineTest extends BaseTest with Logging {
|
||||||
|
|
||||||
|
|
@ -58,45 +59,38 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
|
|
||||||
validateK1gBams()
|
validateK1gBams()
|
||||||
|
|
||||||
/** The path to the current Sting directory. Useful when specifying Sting resources. */
|
|
||||||
val currentStingDir = new File(".").getAbsolutePath
|
|
||||||
|
|
||||||
/** The path to the current build of the GATK jar in the currentStingDir. */
|
|
||||||
val currentGATK = new File(currentStingDir, "dist/GenomeAnalysisTK.jar")
|
|
||||||
|
|
||||||
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
|
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
|
||||||
|
|
||||||
val run = System.getProperty("pipeline.run") == "run"
|
val run = System.getProperty("pipeline.run") == "run"
|
||||||
|
|
||||||
|
private val jobRunners = {
|
||||||
|
val commandLinePluginManager = new CommandLinePluginManager
|
||||||
|
commandLinePluginManager.getPlugins.map(commandLinePluginManager.getName(_)).filterNot(_ == "Shell")
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the top level output path to this test.
|
* Returns the top level output path to this test.
|
||||||
* @param testName The name of the test passed to PipelineTest.executeTest()
|
* @param testName The name of the test passed to PipelineTest.executeTest()
|
||||||
|
* @param jobRunner The name of the job manager to run the jobs.
|
||||||
* @return the top level output path to this test.
|
* @return the top level output path to this test.
|
||||||
*/
|
*/
|
||||||
def testDir(testName: String) = "pipelinetests/%s/".format(testName)
|
def testDir(testName: String, jobRunner: String) = "pipelinetests/%s/%s/".format(testName, jobRunner)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the directory where relative output files will be written for this test.
|
* Returns the directory where relative output files will be written for this test.
|
||||||
* @param testName The name of the test passed to PipelineTest.executeTest()
|
* @param testName The name of the test passed to PipelineTest.executeTest()
|
||||||
|
* @param jobRunner The name of the job manager to run the jobs.
|
||||||
* @return the directory where relative output files will be written for this test.
|
* @return the directory where relative output files will be written for this test.
|
||||||
*/
|
*/
|
||||||
def runDir(testName: String) = testDir(testName) + "run/"
|
private def runDir(testName: String, jobRunner: String) = testDir(testName, jobRunner) + "run/"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the directory where temp files will be written for this test.
|
* Returns the directory where temp files will be written for this test.
|
||||||
* @param testName The name of the test passed to PipelineTest.executeTest()
|
* @param testName The name of the test passed to PipelineTest.executeTest()
|
||||||
|
* @param jobRunner The name of the job manager to run the jobs.
|
||||||
* @return the directory where temp files will be written for this test.
|
* @return the directory where temp files will be written for this test.
|
||||||
*/
|
*/
|
||||||
def tempDir(testName: String) = testDir(testName) + "temp/"
|
private def tempDir(testName: String, jobRunner: String) = testDir(testName, jobRunner) + "temp/"
|
||||||
|
|
||||||
/**
|
|
||||||
* Encapsulates a file MD5
|
|
||||||
* @param testName The name of the test also passed to PipelineTest.executeTest().
|
|
||||||
* @param filePath The file path of the output file, relative to the directory the pipeline is run in.
|
|
||||||
* @param md5 The expected MD5
|
|
||||||
* @return a file md5 that can be appended to the PipelineTestSpec.fileMD5s
|
|
||||||
*/
|
|
||||||
def fileMD5(testName: String, filePath: String, md5: String) = (new File(runDir(testName) + filePath), md5)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new pipeline from a project.
|
* Creates a new pipeline from a project.
|
||||||
|
|
@ -146,19 +140,28 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
* @param pipelineTest test to run.
|
* @param pipelineTest test to run.
|
||||||
*/
|
*/
|
||||||
def executeTest(pipelineTest: PipelineTestSpec) {
|
def executeTest(pipelineTest: PipelineTestSpec) {
|
||||||
|
jobRunners.foreach(executeTest(pipelineTest, _))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the pipelineTest.
|
||||||
|
* @param pipelineTest test to run.
|
||||||
|
* @param jobRunner The name of the job manager to run the jobs.
|
||||||
|
*/
|
||||||
|
def executeTest(pipelineTest: PipelineTestSpec, jobRunner: String) {
|
||||||
val name = pipelineTest.name
|
val name = pipelineTest.name
|
||||||
if (name == null)
|
if (name == null)
|
||||||
Assert.fail("PipelineTestSpec.name is null.")
|
Assert.fail("PipelineTestSpec.name is null")
|
||||||
println(Utils.dupString('-', 80));
|
println(Utils.dupString('-', 80));
|
||||||
executeTest(name, pipelineTest.args, pipelineTest.jobQueue, pipelineTest.expectedException)
|
executeTest(name, pipelineTest.args, pipelineTest.jobQueue, pipelineTest.expectedException, jobRunner)
|
||||||
if (run) {
|
if (run) {
|
||||||
assertMatchingMD5s(name, pipelineTest.fileMD5s.map{case (file, md5) => new File(runDir(name), file) -> md5}, pipelineTest.parameterize)
|
assertMatchingMD5s(name, pipelineTest.fileMD5s.map{case (file, md5) => new File(runDir(name, jobRunner), file) -> md5}, pipelineTest.parameterize)
|
||||||
if (pipelineTest.evalSpec != null)
|
if (pipelineTest.evalSpec != null)
|
||||||
validateEval(name, pipelineTest.evalSpec)
|
validateEval(name, pipelineTest.evalSpec, jobRunner)
|
||||||
println(" => %s PASSED".format(name))
|
println(" => %s PASSED (%s)".format(name, jobRunner))
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
println(" => %s PASSED DRY RUN".format(name))
|
println(" => %s PASSED DRY RUN (%s)".format(name, jobRunner))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def assertMatchingMD5s(name: String, fileMD5s: Traversable[(File, String)], parameterize: Boolean) {
|
private def assertMatchingMD5s(name: String, fileMD5s: Traversable[(File, String)], parameterize: Boolean) {
|
||||||
|
|
@ -169,16 +172,16 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
failed += 1
|
failed += 1
|
||||||
}
|
}
|
||||||
if (failed > 0)
|
if (failed > 0)
|
||||||
Assert.fail("%d of %d MD5s did not match.".format(failed, fileMD5s.size))
|
Assert.fail("%d of %d MD5s did not match".format(failed, fileMD5s.size))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) {
|
private def validateEval(name: String, evalSpec: PipelineTestEvalSpec, jobRunner: String) {
|
||||||
// write the report to the shared validation data location
|
// write the report to the shared validation data location
|
||||||
val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss")
|
val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss")
|
||||||
val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, name, formatter.format(new Date))
|
val reportLocation = "%s%s/%s/validation.%s.eval".format(validationReportsDataLocation, jobRunner, name, formatter.format(new Date))
|
||||||
val report = new File(reportLocation)
|
val report = new File(reportLocation)
|
||||||
|
|
||||||
FileUtils.copyFile(new File(runDir(name) + evalSpec.evalReport), report);
|
FileUtils.copyFile(new File(runDir(name, jobRunner) + evalSpec.evalReport), report);
|
||||||
|
|
||||||
val parser = new GATKReportParser
|
val parser = new GATKReportParser
|
||||||
parser.parse(report)
|
parser.parse(report)
|
||||||
|
|
@ -206,17 +209,17 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
* @param args the argument list
|
* @param args the argument list
|
||||||
* @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null.
|
* @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null.
|
||||||
* @param expectedException the expected exception or null if no exception is expected.
|
* @param expectedException the expected exception or null if no exception is expected.
|
||||||
|
* @param jobRunner The name of the job manager to run the jobs.
|
||||||
*/
|
*/
|
||||||
private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) {
|
private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_], jobRunner: String) {
|
||||||
var command = Utils.escapeExpressions(args)
|
var command = Utils.escapeExpressions(args)
|
||||||
|
|
||||||
// add the logging level to each of the integration test commands
|
// add the logging level to each of the integration test commands
|
||||||
|
|
||||||
command = Utils.appendArray(command, "-bsub", "-tempDir", tempDir(name), "-runDir", runDir(name))
|
command = Utils.appendArray(command, "-jobRunner", jobRunner,
|
||||||
|
"-tempDir", tempDir(name, jobRunner), "-runDir", runDir(name, jobRunner))
|
||||||
|
|
||||||
if (jobQueue == null)
|
if (jobQueue != null)
|
||||||
command = Utils.appendArray(command, "-jobQueue", "hour")
|
|
||||||
else
|
|
||||||
command = Utils.appendArray(command, "-jobQueue", jobQueue)
|
command = Utils.appendArray(command, "-jobQueue", jobQueue)
|
||||||
|
|
||||||
if (run)
|
if (run)
|
||||||
|
|
@ -238,11 +241,11 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
println("Wanted exception %s, saw %s".format(expectedException, e.getClass))
|
println("Wanted exception %s, saw %s".format(expectedException, e.getClass))
|
||||||
if (expectedException.isInstance(e)) {
|
if (expectedException.isInstance(e)) {
|
||||||
// it's the type we expected
|
// it's the type we expected
|
||||||
println(String.format(" => %s PASSED", name))
|
println(String.format(" => %s PASSED (%s)", name, jobRunner))
|
||||||
} else {
|
} else {
|
||||||
e.printStackTrace()
|
e.printStackTrace()
|
||||||
Assert.fail("Test %s expected exception %s but got %s instead".format(
|
Assert.fail("Test %s expected exception %s but got %s instead (%s)".format(
|
||||||
name, expectedException, e.getClass))
|
name, expectedException, e.getClass, jobRunner))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// we didn't expect an exception but we got one :-(
|
// we didn't expect an exception but we got one :-(
|
||||||
|
|
@ -257,7 +260,7 @@ object PipelineTest extends BaseTest with Logging {
|
||||||
if (expectedException != null) {
|
if (expectedException != null) {
|
||||||
if (!gotAnException)
|
if (!gotAnException)
|
||||||
// we expected an exception but didn't see it
|
// we expected an exception but didn't see it
|
||||||
Assert.fail("Test %s expected exception %s but none was thrown".format(name, expectedException.toString))
|
Assert.fail("Test %s expected exception %s but none was thrown (%s)".format(name, expectedException.toString, jobRunner))
|
||||||
} else {
|
} else {
|
||||||
if (CommandLineProgram.result != 0)
|
if (CommandLineProgram.result != 0)
|
||||||
throw new RuntimeException("Error running Queue with arguments: " + args)
|
throw new RuntimeException("Error running Queue with arguments: " + args)
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ class HelloWorldPipelineTest {
|
||||||
PipelineTest.executeTest(spec)
|
PipelineTest.executeTest(spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(enabled=false)
|
||||||
def testHelloWorldWithPriority {
|
def testHelloWorldWithPriority {
|
||||||
val spec = new PipelineTestSpec
|
val spec = new PipelineTestSpec
|
||||||
spec.name = "HelloWorldWithPriority"
|
spec.name = "HelloWorldWithPriority"
|
||||||
|
|
|
||||||
|
|
@ -67,8 +67,8 @@ class HybridSelectionPipelineTest {
|
||||||
|
|
||||||
// Run the pipeline with the expected inputs.
|
// Run the pipeline with the expected inputs.
|
||||||
val pipelineCommand =
|
val pipelineCommand =
|
||||||
"-retry 1 -S scala/qscript/playground/HybridSelectionPipeline.scala -jobProject %s -Y %s"
|
"-retry 1 -S scala/qscript/playground/HybridSelectionPipeline.scala -Y %s"
|
||||||
.format(projectName, yamlFile)
|
.format(yamlFile)
|
||||||
|
|
||||||
val pipelineSpec = new PipelineTestSpec
|
val pipelineSpec = new PipelineTestSpec
|
||||||
pipelineSpec.name = testName
|
pipelineSpec.name = testName
|
||||||
|
|
@ -83,9 +83,7 @@ class HybridSelectionPipelineTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def writeYaml(testName: String, pipeline: Pipeline) = {
|
private def writeYaml(testName: String, pipeline: Pipeline) = {
|
||||||
val runDir = PipelineTest.runDir(testName)
|
val yamlFile = BaseTest.createTempFile(pipeline.getProject.getName, ".yaml")
|
||||||
val yamlFile = new File(runDir, pipeline.getProject.getName + ".yaml")
|
|
||||||
yamlFile.getParentFile.mkdirs
|
|
||||||
YamlUtils.dump(pipeline, yamlFile)
|
YamlUtils.dump(pipeline, yamlFile)
|
||||||
yamlFile
|
yamlFile
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue