From 787e5d85e99267fb1cf20ba635b0bbdd9506e360 Mon Sep 17 00:00:00 2001 From: kshakir Date: Mon, 22 Nov 2010 22:59:42 +0000 Subject: [PATCH] Added the ability to test pipelines in dry or live mode via 'ant pipelinetest' and 'ant pipelinetest -Dpipeline.run=run'. Added an initial test for genotyping chr20 on ten 1000G bams. Since tribble needs logging support too, for now setting the logging level and appending the console logger to the root logger, not just to "org.broadinstitute.sting". Updated IntervalUtilsUnitTest to output to a temp directory and not the SVN controlled testdata directory. Added refseq tables and dbsnps to validation data in BaseTest. Now waiting up to two minutes for gather parts to propagate over NFS before attempting to merge the files. Setting scatter/gather directories relative to the -run directory instead of the current directory that queue is running. Fixed a bug where escaping test expressions didn't handle delimiters at the beginning or end of the String. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4717 348d0f76-0448-11de-a6fe-93d51630548a --- build.xml | 13 ++ .../sting/commandline/CommandLineProgram.java | 2 +- .../sting/commandline/CommandLineUtils.java | 12 +- .../org/broadinstitute/sting/utils/Utils.java | 11 +- .../org/broadinstitute/sting/BaseTest.java | 12 ++ .../org/broadinstitute/sting/WalkerTest.java | 27 +++- .../sting/utils/UtilsUnitTest.java | 34 +++++ .../utils/interval/IntervalUtilsUnitTest.java | 22 ++- .../sting/queue/engine/LsfJobRunner.scala | 3 +- .../extensions/gatk/VcfGatherFunction.scala | 1 + .../sting/queue/function/QFunction.scala | 16 ++- .../scattergather/GatherFunction.scala | 11 ++ .../ScatterGatherableFunction.scala | 4 + .../SimpleTextGatherFunction.scala | 1 + .../sting/queue/util/IOUtils.scala | 55 +++++++- .../FullCallingPipelineIntegrationTest.scala | 19 --- .../pipeline/FullCallingPipelineTest.scala | 132 ++++++++++++++++++ .../PipelineTest.scala} | 20 +-- 18 files changed, 340 insertions(+), 55 deletions(-) delete mode 100644 scala/test/org/broadinstitute/sting/queue/FullCallingPipelineIntegrationTest.scala create mode 100644 scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala rename scala/test/org/broadinstitute/sting/queue/{QScriptTest.scala => pipeline/PipelineTest.scala} (84%) diff --git a/build.xml b/build.xml index 98d5d5526..af943bc37 100644 --- a/build.xml +++ b/build.xml @@ -162,6 +162,11 @@ + + + + + @@ -507,6 +512,7 @@ listeners="org.testng.reporters.FailedReporter,org.testng.reporters.JUnitXMLReporter,org.broadinstitute.sting.StingTextReporter"> + @@ -686,6 +698,7 @@ + diff --git a/java/src/org/broadinstitute/sting/commandline/CommandLineProgram.java b/java/src/org/broadinstitute/sting/commandline/CommandLineProgram.java index 7714ebe28..2cd5d36b6 100644 --- a/java/src/org/broadinstitute/sting/commandline/CommandLineProgram.java +++ b/java/src/org/broadinstitute/sting/commandline/CommandLineProgram.java @@ -288,7 +288,7 @@ public abstract class CommandLineProgram { throw new ArgumentException("Unable to match: " + logging_level + " to a logging level, make sure it's a valid level (INFO, DEBUG, ERROR, FATAL, OFF)"); } - CommandLineUtils.getStingLogger().setLevel(par); + Logger.getRootLogger().setLevel(par); } /** diff --git a/java/src/org/broadinstitute/sting/commandline/CommandLineUtils.java b/java/src/org/broadinstitute/sting/commandline/CommandLineUtils.java index 6eb772dde..99608f167 100644 --- a/java/src/org/broadinstitute/sting/commandline/CommandLineUtils.java +++ b/java/src/org/broadinstitute/sting/commandline/CommandLineUtils.java @@ -166,8 +166,8 @@ public class CommandLineUtils { } } // Extracted from BasicConfigurator.configure(), but only applied to the Sting logger. - getStingLogger().addAppender(new ConsoleAppender( - new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN))); + Logger.getRootLogger().addAppender(new ConsoleAppender( + new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN))); } /** @@ -177,8 +177,10 @@ public class CommandLineUtils { */ @SuppressWarnings("unchecked") public static void setLayout(Logger logger, PatternLayout layout) { - Enumeration e = (Enumeration) logger.getAllAppenders(); - for (Appender appender: Collections.list(e)) - appender.setLayout(layout); + for (; logger != null; logger = (Logger)logger.getParent()) { + Enumeration e = (Enumeration) logger.getAllAppenders(); + for (Appender appender: Collections.list(e)) + appender.setLayout(layout); + } } } diff --git a/java/src/org/broadinstitute/sting/utils/Utils.java b/java/src/org/broadinstitute/sting/utils/Utils.java index 31fe6252d..e18986afb 100755 --- a/java/src/org/broadinstitute/sting/utils/Utils.java +++ b/java/src/org/broadinstitute/sting/utils/Utils.java @@ -337,11 +337,18 @@ public class Utils { private static String[] escapeExpressions(String args, String delimiter) { String[] command = {}; String[] split = args.split(delimiter); + String arg; for (int i = 0; i < split.length - 1; i += 2) { - command = Utils.concatArrays(command, split[i].trim().split(" ")); + arg = split[i].trim(); + if (arg.length() > 0) // if the unescaped arg has a size + command = Utils.concatArrays(command, arg.split(" ")); command = Utils.concatArrays(command, new String[]{split[i + 1]}); } - return Utils.concatArrays(command, split[split.length - 1].trim().split(" ")); + arg = split[split.length - 1].trim(); + if (split.length % 2 == 1) // if the command ends with a delimiter + if (arg.length() > 0) // if the last unescaped arg has a size + command = Utils.concatArrays(command, arg.split(" ")); + return command; } /** diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 99e132f60..2753c1b37 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -50,6 +50,18 @@ public abstract class BaseTest { public static final String validationDataLocation = GATKDataLocation + "Validation_Data/"; public static final String evaluationDataLocation = GATKDataLocation + "Evaluation_Data/"; public static final String comparisonDataLocation = GATKDataLocation + "Comparisons/"; + public static final String annotationDataLocation = GATKDataLocation + "Annotations/"; + + public static final String refseqAnnotationLocation = annotationDataLocation + "refseq/"; + public static final String hg18Refseq = refseqAnnotationLocation + "refGene-big-table-hg18.txt"; + public static final String hg19Refseq = refseqAnnotationLocation + "refGene-big-table-hg19.txt"; + public static final String b36Refseq = refseqAnnotationLocation + "refGene-big-table-b36.txt"; + public static final String b37Refseq = refseqAnnotationLocation + "refGene-big-table-b37.txt"; + + public static final String dbsnpDataLocation = GATKDataLocation; + public static final String hg18dbSNP129 = dbsnpDataLocation + "dbsnp_129_hg18.rod"; + public static final String b36dbSNP129 = dbsnpDataLocation + "dbsnp_129_b36.rod"; + public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public final String testDir = "testdata/"; diff --git a/java/test/org/broadinstitute/sting/WalkerTest.java b/java/test/org/broadinstitute/sting/WalkerTest.java index 9349fe0b4..4e9f48cb7 100755 --- a/java/test/org/broadinstitute/sting/WalkerTest.java +++ b/java/test/org/broadinstitute/sting/WalkerTest.java @@ -315,15 +315,32 @@ public class WalkerTest extends BaseTest { * @param md5s the list of md5s * @param tmpFiles the temp file corresponding to the md5 list * @param args the argument list + * @param expectedException the expected exception or null * @return a pair of file and string lists */ private Pair, List> executeTest(String name, List md5s, List tmpFiles, String args, Class expectedException) { + if (outputFileLocation != null) + args += " -o " + this.outputFileLocation.getAbsolutePath(); + executeTest(name, args, expectedException); + + if ( expectedException != null ) { + return null; + } else { + // we need to check MD5s + return new Pair, List>(tmpFiles, assertMatchingMD5s(name, tmpFiles, md5s)); + } + } + + /** + * execute the test, given the following: + * @param name the name of the test + * @param args the argument list + * @param expectedException the expected exception or null + */ + public static void executeTest(String name, String args, Class expectedException) { CommandLineGATK instance = new CommandLineGATK(); String[] command = Utils.escapeExpressions(args); - if (outputFileLocation != null) - command = Utils.appendArray(command, "-o", this.outputFileLocation.getAbsolutePath()); - // add the logging level to each of the integration test commands command = Utils.appendArray(command, "-l", "WARN", "-et", ENABLE_REPORTING ? "STANDARD" : "NO_ET"); @@ -356,14 +373,10 @@ public class WalkerTest extends BaseTest { if ( ! gotAnException ) // we expected an exception but didn't see it Assert.fail(String.format("Test %s expected exception %s but none was thrown", name, expectedException.toString())); - return null; } else { if ( CommandLineExecutable.result != 0) { throw new RuntimeException("Error running the GATK with arguments: " + args); } - - // we need to check MD5s - return new Pair, List>(tmpFiles, assertMatchingMD5s(name, tmpFiles, md5s)); } } diff --git a/java/test/org/broadinstitute/sting/utils/UtilsUnitTest.java b/java/test/org/broadinstitute/sting/utils/UtilsUnitTest.java index d6bec2d71..7a800e965 100644 --- a/java/test/org/broadinstitute/sting/utils/UtilsUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/UtilsUnitTest.java @@ -82,4 +82,38 @@ public class UtilsUnitTest extends BaseTest { Assert.assertTrue("one-1;two-2;three-1;four-2;five-1;six-2".equals(joined)); } + @Test + public void testEscapeExpressions() { + String[] expected, actual; + + expected = new String[] {"one", "two", "three four", "five", "six"}; + actual = Utils.escapeExpressions("one two 'three four' five six"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" one two 'three four' five six"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions("one two 'three four' five six "); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" one two 'three four' five six "); + Assert.assertEquals(actual, expected); + + expected = new String[] {"one two", "three", "four"}; + actual = Utils.escapeExpressions("'one two' three four"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" 'one two' three four"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions("'one two' three four "); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" 'one two' three four "); + Assert.assertEquals(actual, expected); + + expected = new String[] {"one", "two", "three four"}; + actual = Utils.escapeExpressions("one two 'three four'"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" one two 'three four'"); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions("one two 'three four' "); + Assert.assertEquals(actual, expected); + actual = Utils.escapeExpressions(" one two 'three four' "); + Assert.assertEquals(actual, expected); + } } diff --git a/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java b/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java index 9bd541866..637b4571c 100644 --- a/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java @@ -12,6 +12,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -191,6 +192,12 @@ public class IntervalUtilsUnitTest extends BaseTest { Assert.assertEquals(locs3.get(0), chr3); } + @Test(enabled=false) // disabled, GenomeLoc.compareTo() returns 0 for two locs with the same start, causing an exception in GLSS.add(). + public void testScatterIntervalsWithTheSameStart() { + List files = testFiles("sg.", 20, ".intervals"); + IntervalUtils.scatterIntervalArguments(new File(hg18Reference), Arrays.asList(BaseTest.GATKDataLocation + "whole_exome_agilent_designed_120.targets.hg18.chr20.interval_list"), files, false); + } + @Test public void testScatterOrder() { List intervals = Arrays.asList("chr2:1-1", "chr1:1-1", "chr3:2-2"); @@ -348,9 +355,16 @@ public class IntervalUtilsUnitTest extends BaseTest { } private List testFiles(String prefix, int count, String suffix) { - ArrayList files = new ArrayList(); - for (int i = 1; i <= count; i++) - files.add(new File(testDir + prefix + i + suffix)); - return files; + try { + ArrayList files = new ArrayList(); + for (int i = 1; i <= count; i++) { + File tmpFile = File.createTempFile(prefix + i, suffix); + tmpFile.deleteOnExit(); + files.add(tmpFile); + } + return files; + } catch (IOException e) { + throw new UserException.BadTmpDir("Unable to create temp file: " + e); + } } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 536598ba9..72c6724e7 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.engine import java.io.File import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util._ -import org.apache.commons.io.FileUtils /** * Runs jobs on an LSF compute cluster. @@ -154,7 +153,7 @@ class LsfJobRunner(val function: CommandLineFunction) extends DispatchJobRunner */ private def tailError() = { val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile - if (FileUtils.waitFor(errorFile, 120)) { + if (IOUtils.waitFor(errorFile, 120)) { val tailLines = IOUtils.tail(errorFile, 100) val nl = "%n".format() logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala index c271d5eca..7dc15ba44 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -11,6 +11,7 @@ import org.apache.commons.io.{LineIterator, IOUtils, FileUtils} */ class VcfGatherFunction extends GatherFunction with InProcessFunction { def run() = { + waitForGatherParts if (gatherParts.size < 1) { throw new QException("No files to gather to output: " + originalOutput) } else { diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index ab9e6ac12..1405d9dc4 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -151,8 +151,7 @@ trait QFunction extends Logging { def outputDirectories = { var dirs = Set.empty[File] dirs += commandDirectory - if (jobTempDir != null) - dirs += jobTempDir + dirs += jobTempDir dirs ++= outputs.map(_.getParentFile) dirs } @@ -287,10 +286,18 @@ trait QFunction extends Logging { if (jobTempDir == null) jobTempDir = qSettings.tempDirectory - // If the command directory is relative, insert the run directory ahead of it. - commandDirectory = IOUtils.absolute(new File(qSettings.runDirectory, commandDirectory.getPath)) + // Do not set the temp dir relative to the command directory + jobTempDir = IOUtils.absolute(jobTempDir) + + absoluteCommandDirectory() } + /** + * If the command directory is relative, insert the run directory ahead of it. + */ + def absoluteCommandDirectory() = + commandDirectory = IOUtils.absolute(qSettings.runDirectory, commandDirectory) + /** * Makes all field values canonical so that the graph can match the * inputs of one function to the output of another using equals(). @@ -330,7 +337,6 @@ trait QFunction extends Logging { */ private def absolute(file: File) = IOUtils.absolute(commandDirectory, file) - /** * Scala sugar type for checking annotation required and exclusiveOf. */ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 8bf5d7082..3d09c6061 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -3,6 +3,8 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File import org.broadinstitute.sting.commandline.{Input, Output} import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.queue.QException +import org.broadinstitute.sting.queue.util.IOUtils /** * Base class for Gather command line functions. @@ -13,4 +15,13 @@ trait GatherFunction extends QFunction { @Output(doc="The original output of the scattered function") var originalOutput: File = _ + + /** + * Waits for gather parts to propagate over NFS or throws an exception. + */ + protected def waitForGatherParts = { + val missing = IOUtils.waitFor(gatherParts, 120) + if (!missing.isEmpty) + throw new QException("Unable to find gather inputs: " + missing.mkString(", ")) + } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index c8dd4d694..4609f6f18 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -121,6 +121,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { // Allow the script writer to change the paths to the files. initCloneFunction(cloneFunction, i) + + // If the command directory is relative, insert the run directory ahead of it. + cloneFunction.absoluteCommandDirectory() + // Get absolute paths to the files and bind the sg functions to the clone function via the absolute paths. scatterFunction.bindCloneInputs(cloneFunction, i) for (gatherField <- outputFieldsWithValues) { diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala index cdd3c2ab2..03cd59102 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala @@ -10,6 +10,7 @@ import org.apache.commons.io.{LineIterator, IOUtils, FileUtils} */ class SimpleTextGatherFunction extends GatherFunction with InProcessFunction { def run() = { + waitForGatherParts if (gatherParts.size < 1) { throw new QException("No files to gather to output: " + originalOutput) } else if (gatherParts.size == 1) { diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index 1bb08a5cf..608d5a3b0 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -41,14 +41,67 @@ object IOUtils extends Logging { absolute(temp) } + /** + * Writes content into a file. + * @param file File to write to. + * @param content Content to write. + */ def writeContents(file: File, content: String) = FileUtils.writeStringToFile(file, content) - def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File) = { + /** + * Writes content to a temp file and returns the path to the temporary file. + * @param content to write. + * @param prefix Prefix for the temp file. + * @parm suffix Suffix for the temp file. + * @param directory Directory for the temp file. + * @return the path to the temp file. + */ + def writeTempFile(content: String, prefix: String, suffix: String, directory: File) = { val tempFile = absolute(File.createTempFile(prefix, suffix, directory)) writeContents(tempFile, content) tempFile } + /** + * Waits for NFS to propagate a file creation, imposing a timeout. + * + * Based on Apache Commons IO FileUtils.waitFor() + * + * @param file The file to wait for. + * @param seconds The maximum time in seconds to wait. + * @return true if the file exists + */ + def waitFor(file: File, seconds: Int): Boolean = waitFor(List(file), seconds).isEmpty + + /** + * Waits for NFS to propagate a file creation, imposing a timeout. + * + * Based on Apache Commons IO FileUtils.waitFor() + * + * @param files The list of files to wait for. + * @param seconds The maximum time in seconds to wait. + * @return Files that still do not exists at the end of the timeout, or a empty list if all files exists. + */ + def waitFor[T <: Traversable[File]](files: T, seconds: Int): Traversable[File] = { + var timeout = 0; + var tick = 0; + var missingFiles = files.filterNot(_.exists) + while (!missingFiles.isEmpty && timeout <= seconds) { + if (tick >= 10) { + tick = 0; + timeout += 1 + } + tick += 1 + try { + Thread.sleep(100) + } catch { + case ignore: InterruptedException => + } + missingFiles = missingFiles.filterNot(_.exists) + } + missingFiles + } + /** * Returns the directory at the number of levels deep. * For example 2 levels of /path/to/dir will return /path/to diff --git a/scala/test/org/broadinstitute/sting/queue/FullCallingPipelineIntegrationTest.scala b/scala/test/org/broadinstitute/sting/queue/FullCallingPipelineIntegrationTest.scala deleted file mode 100644 index 0c5f5bace..000000000 --- a/scala/test/org/broadinstitute/sting/queue/FullCallingPipelineIntegrationTest.scala +++ /dev/null @@ -1,19 +0,0 @@ -package org.broadinstitute.sting.queue - -import org.testng.annotations.Test -import org.broadinstitute.sting.BaseTest - -/** - * A temporary integration test to ensure that the full calling pipeline compiles. To be enhanced... - */ -class FullCallingPipelineIntegrationTest extends QScriptTest { - @Test - def testCompileFullCallingPipeline = { - val command = ("-jobProject QueuePipelineTest -S %1$sscala/qscript/fullCallingPipeline.q -Y %2$s " + - "-refseqTable /humgen/gsa-hpprojects/GATK/data/Annotations/refseq/refGene-big-table-hg18.txt " + - "--gatkjar %1$sdist/GenomeAnalysisTK.jar -titv 3.0 -skipCleaning").format( - stingDir, BaseTest.validationDataLocation + "QueuePipelineTestData/QueuePipelineTestData.yaml" - ) - executeTest("fullCallingPipeline", command, null) - } -} diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala new file mode 100644 index 000000000..77ebb3d08 --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala @@ -0,0 +1,132 @@ +package org.broadinstitute.sting.queue.pipeline + +import org.testng.annotations.{DataProvider, Test} +import collection.JavaConversions._ +import java.io.File +import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline} +import org.broadinstitute.sting.utils.yaml.YamlUtils +import org.broadinstitute.sting.queue.PipelineTest +import org.broadinstitute.sting.{WalkerTest, BaseTest} +import java.text.SimpleDateFormat +import java.util.Date + +class FullCallingPipelineTest extends BaseTest { + def datasets = List(k1gChr20Dataset) + + private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" + + val k1gChr20Dataset = { + val project = new PipelineProject + project.setName("Barcoded_1000G_WEx_chr20") + project.setReferenceFile(new File(BaseTest.hg19Reference)) + project.setDbsnpFile(new File(BaseTest.b37dbSNP129)) + project.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list")) + + val squid = "C426" + val ids = List( + "NA19651","NA19655","NA19669","NA19834","HG01440", + "NA12342","NA12748","NA19649","NA19652","NA19654") + var samples = List.empty[PipelineSample] + for (id <- ids) { + val sample = new PipelineSample + sample.setId(project.getName + "_" + id) + sample.setBamFiles(Map("recalibrated" -> new File("/seq/picard_aggregation/%1$s/%2$s/v6/%2$s.bam".format(squid,id)))) + sample.setTags(Map("SQUIDProject" -> squid, "CollaboratorID" -> id)) + samples :+= sample + } + + val pipeline = new Pipeline + pipeline.setProject(project) + pipeline.setSamples(samples) + + val dataset = new PipelineDataset + dataset.pipeline = pipeline + dataset.refseq = BaseTest.hg19Refseq + dataset.targetTiTv = "3.0" + dataset.jobQueue = "hour" + + dataset.validations :+= new PipelineValidation("evalHandFiltered.dbsnp.all.called.all.counter.nCalledLoci", "1390", "1420") + dataset.validations :+= new PipelineValidation("evalHandFiltered.dbsnp.all.called.all.titv.tiTvRatio", "3.52", "3.60") + dataset.validations :+= new PipelineValidation("evalHandFiltered.dbsnp.all.called.known.titv.tiTvRatio", "3.71", "3.80") + dataset.validations :+= new PipelineValidation("evalHandFiltered.dbsnp.all.called.novel.titv.tiTvRatio", "2.79", "2.86") + + dataset + } + + @DataProvider(name="datasets") + final def convertDatasets: Array[Array[AnyRef]] = + datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray + + @Test(dataProvider="datasets") + def testFullCallingPipeline(dataset: PipelineDataset) = { + val projectName = dataset.pipeline.getProject.getName + val testName = "fullCallingPipeline-" + projectName + val yamlFile = writeTempYaml(dataset.pipeline) + + // Run the pipeline with the expected inputs. + var pipelineCommand = ("-jobProject %s -S scala/qscript/fullCallingPipeline.q -Y %s" + + " -refseqTable %s" + + " --gatkjar %s/dist/GenomeAnalysisTK.jar -titv %s -skipCleaning") + .format(projectName, yamlFile, dataset.refseq, new File(".").getCanonicalPath, dataset.targetTiTv) + + if (dataset.jobQueue != null) + pipelineCommand += " -jobQueue " + dataset.jobQueue + + // Run the test, at least checking if the command compiles + PipelineTest.executeTest(testName, pipelineCommand, null) + + // If actually running, evaluate the output validating the expressions. + if (PipelineTest.run) { + // path where the pipeline should have output the uncleaned handfiltered vcf + val handFilteredVcf = PipelineTest.runDir(testName) + "SnpCalls/%s.uncleaned.annotated.handfiltered.vcf".format(projectName) + + // path where the pipeline should have outout the indel masked vcf + val optimizedVcf = PipelineTest.runDir(testName) + "SnpCalls/%s.uncleaned.annotated.indel.masked.recalibrated.tranched.vcf".format(projectName) + + // eval modules to record in the validation directory + val evalModules = List("CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator") + + // write the report to the shared validation data location + val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss") + val reportLocation = "%s/%s/validation.%s.eval".format(validationReportsDataLocation, testName, formatter.format(new Date)) + new File(reportLocation).getParentFile.mkdirs + + // Run variant eval generating the report and validating the pipeline vcfs. + var walkerCommand = ("-T VariantEval -R %s -D %s -B:evalOptimized,VCF %s -B:evalHandFiltered,VCF %s" + + " -E %s -reportType R -reportLocation %s -L %s") + .format( + dataset.pipeline.getProject.getReferenceFile, dataset.pipeline.getProject.getDbsnpFile, optimizedVcf, handFilteredVcf, + evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList) + + for (validation <- dataset.validations) { + walkerCommand += " -summary %s".format(validation.metric) + walkerCommand += " -validate '%1$s >= %2$s' -validate '%1$s <= %3$s'".format( + validation.metric, validation.min, validation.max) + } + + WalkerTest.executeTest("fullCallingPipelineValidate-" + projectName, walkerCommand, null) + } + } + + class PipelineDataset( + var pipeline: Pipeline = null, + var refseq: String = null, + var targetTiTv: String = null, + var validations: List[PipelineValidation] = Nil, + var jobQueue: String = null) { + override def toString = pipeline.getProject.getName + } + + class PipelineValidation( + var metric: String = null, + var min: String = null, + var max: String = null) { + } + + private def writeTempYaml(pipeline: Pipeline) = { + val tempFile = File.createTempFile(pipeline.getProject.getName + "-", ".yaml") + tempFile.deleteOnExit + YamlUtils.dump(pipeline, tempFile) + tempFile + } +} diff --git a/scala/test/org/broadinstitute/sting/queue/QScriptTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala similarity index 84% rename from scala/test/org/broadinstitute/sting/queue/QScriptTest.scala rename to scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index 3014f10b9..d7e285d89 100644 --- a/scala/test/org/broadinstitute/sting/queue/QScriptTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -3,12 +3,12 @@ package org.broadinstitute.sting.queue import org.broadinstitute.sting.utils.Utils import org.testng.Assert import org.broadinstitute.sting.commandline.CommandLineProgram -import org.broadinstitute.sting.BaseTest import org.broadinstitute.sting.queue.util.ProcessController -class QScriptTest extends BaseTest { +object PipelineTest { + private var runningCommandLines = Set.empty[QCommandLine] - protected val stingDir = "./" + val run = System.getProperty("pipeline.run") == "run" /** * execute the test @@ -21,13 +21,15 @@ class QScriptTest extends BaseTest { // add the logging level to each of the integration test commands - command = Utils.appendArray(command, "-l", "WARN", "-startFromScratch", "-tempDir", "integrationtests") + command = Utils.appendArray(command, "-l", "WARN", "-startFromScratch", "-tempDir", tempDir(name), "-runDir", runDir(name)) + if (run) + command = Utils.appendArray(command, "-run", "-bsub") // run the executable var gotAnException = false val instance = new QCommandLine - QScriptTest.runningCommandLines += instance + runningCommandLines += instance try { println("Executing test %s with Queue arguments: %s".format(name, Utils.join(" ",command))) CommandLineProgram.start(instance, command) @@ -51,7 +53,7 @@ class QScriptTest extends BaseTest { } } finally { instance.shutdown() - QScriptTest.runningCommandLines -= instance + runningCommandLines -= instance } // catch failures from the integration test @@ -64,10 +66,10 @@ class QScriptTest extends BaseTest { throw new RuntimeException("Error running the GATK with arguments: " + args) } } -} -object QScriptTest { - private var runningCommandLines = Set.empty[QCommandLine] + def testDir(testName: String) = "pipelinetests/%s/".format(testName) + def runDir(testName: String) = testDir(testName) + "run/" + def tempDir(testName: String) = testDir(testName) + "temp/" Runtime.getRuntime.addShutdownHook(new Thread { /** Cleanup as the JVM shuts down. */