diff --git a/java/test/org/broadinstitute/sting/BaseTest.java b/java/test/org/broadinstitute/sting/BaseTest.java index 7998c5f5a..6008759df 100755 --- a/java/test/org/broadinstitute/sting/BaseTest.java +++ b/java/test/org/broadinstitute/sting/BaseTest.java @@ -1,9 +1,11 @@ package org.broadinstitute.sting; +import org.apache.commons.io.FileUtils; import org.apache.log4j.*; import org.apache.log4j.spi.LoggingEvent; import org.broadinstitute.sting.commandline.CommandLineUtils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.testng.Assert; import java.io.*; import java.math.BigInteger; @@ -64,7 +66,12 @@ public abstract class BaseTest { public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf"; - public final String testDir = "testdata/"; + /** + * Subdirectory under the ant build directory where we store integration test md5 results + */ + public static final String MD5_FILE_DB_SUBDIR = "integrationtests"; + + public static final String testDir = "testdata/"; /** before the class starts up */ static { @@ -177,5 +184,159 @@ public abstract class BaseTest { } } + protected static void ensureMd5DbDirectory() { + // todo -- make path + File dir = new File(MD5_FILE_DB_SUBDIR); + if ( ! dir.exists() ) { + System.out.printf("##### Creating MD5 db %s%n", MD5_FILE_DB_SUBDIR); + if ( ! dir.mkdir() ) { + throw new ReviewedStingException("Infrastructure failure: failed to create md5 directory " + MD5_FILE_DB_SUBDIR); + } + } + } + protected static File getFileForMD5(final String md5) { + final String basename = String.format("%s.integrationtest", md5); + return new File(MD5_FILE_DB_SUBDIR + "/" + basename); + } + + private static void updateMD5Db(final String md5, final File resultsFile) { + // todo -- copy results file to DB dir if needed under filename for md5 + final File dbFile = getFileForMD5(md5); + if ( ! dbFile.exists() ) { + // the file isn't already in the db, copy it over + System.out.printf("##### Updating MD5 file: %s%n", dbFile.getPath()); + try { + FileUtils.copyFile(resultsFile, dbFile); + } catch ( IOException e ) { + throw new ReviewedStingException(e.getMessage()); + } + } else { + System.out.printf("##### MD5 file is up to date: %s%n", dbFile.getPath()); + + } + } + + private static String getMD5Path(final String md5, final String valueIfNotFound) { + // todo -- look up the result in the directory and return the path if it exists + final File dbFile = getFileForMD5(md5); + return dbFile.exists() ? dbFile.getPath() : valueIfNotFound; + } + + public static byte[] getBytesFromFile(File file) throws IOException { + InputStream is = new FileInputStream(file); + + // Get the size of the file + long length = file.length(); + + if (length > Integer.MAX_VALUE) { + // File is too large + } + + // Create the byte array to hold the data + byte[] bytes = new byte[(int) length]; + + // Read in the bytes + int offset = 0; + int numRead = 0; + while (offset < bytes.length + && (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) { + offset += numRead; + } + + // Ensure all the bytes have been read in + if (offset < bytes.length) { + throw new IOException("Could not completely read file " + file.getName()); + } + + // Close the input stream and return bytes + is.close(); + return bytes; + } + + /** + * Tests a file MD5 against an expected value, returning the MD5. NOTE: This function WILL throw an exception if the MD5s are different. + * @param name Name of the test. + * @param resultsFile File to MD5. + * @param expectedMD5 Expected MD5 value. + * @param parameterize If true or if expectedMD5 is an empty string, will print out the calculated MD5 instead of error text. + * @return The calculated MD5. + */ + public static String assertMatchingMD5(final String name, final File resultsFile, final String expectedMD5, final boolean parameterize) { + String filemd5sum = testFileMD5(name, resultsFile, expectedMD5, parameterize); + + if (parameterize || expectedMD5.equals("")) { + // Don't assert + } else { + Assert.assertEquals(filemd5sum, expectedMD5, name + " Mismatching MD5s"); + System.out.println(String.format(" => %s PASSED", name)); + } + + return filemd5sum; + } + + + /** + * Tests a file MD5 against an expected value, returning the MD5. NOTE: This function WILL NOT throw an exception if the MD5s are different. + * @param name Name of the test. + * @param resultsFile File to MD5. + * @param expectedMD5 Expected MD5 value. + * @param parameterize If true or if expectedMD5 is an empty string, will print out the calculated MD5 instead of error text. + * @return The calculated MD5. + */ + public static String testFileMD5(final String name, final File resultsFile, final String expectedMD5, final boolean parameterize) { + try { + byte[] bytesOfMessage = getBytesFromFile(resultsFile); + byte[] thedigest = MessageDigest.getInstance("MD5").digest(bytesOfMessage); + BigInteger bigInt = new BigInteger(1, thedigest); + String filemd5sum = bigInt.toString(16); + while (filemd5sum.length() < 32) filemd5sum = "0" + filemd5sum; // pad to length 32 + + // + // copy md5 to integrationtests + // + updateMD5Db(filemd5sum, resultsFile); + + if (parameterize || expectedMD5.equals("")) { + System.out.println(String.format("PARAMETERIZATION[%s]: file %s has md5 = %s, stated expectation is %s, equal? = %b", + name, resultsFile, filemd5sum, expectedMD5, filemd5sum.equals(expectedMD5))); + } else { + System.out.println(String.format("Checking MD5 for %s [calculated=%s, expected=%s]", resultsFile, filemd5sum, expectedMD5)); + System.out.flush(); + + if ( ! expectedMD5.equals(filemd5sum) ) { + // we are going to fail for real in assertEquals (so we are counted by the testing framework). + // prepare ourselves for the comparison + System.out.printf("##### Test %s is going fail #####%n", name); + String pathToExpectedMD5File = getMD5Path(expectedMD5, "[No DB file found]"); + String pathToFileMD5File = getMD5Path(filemd5sum, "[No DB file found]"); + System.out.printf("##### Path to expected file (MD5=%s): %s%n", expectedMD5, pathToExpectedMD5File); + System.out.printf("##### Path to calculated file (MD5=%s): %s%n", filemd5sum, pathToFileMD5File); + System.out.printf("##### Diff command: diff %s %s%n", pathToExpectedMD5File, pathToFileMD5File); + + // todo -- add support for simple inline display of the first N differences for text file + } + } + + return filemd5sum; + } catch (Exception e) { + throw new RuntimeException("Failed to read bytes from calls file: " + resultsFile, e); + } + } + + /** + * Creates a temp file that will be deleted on exit after tests are complete. + * @param name Prefix of the file. + * @param extension Extension to concat to the end of the file. + * @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits. + */ + public static File createTempFile(String name, String extension) { + try { + File file = File.createTempFile(name, extension); + file.deleteOnExit(); + return file; + } catch (IOException ex) { + throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); + } + } } diff --git a/java/test/org/broadinstitute/sting/WalkerTest.java b/java/test/org/broadinstitute/sting/WalkerTest.java index a1ca5ef78..70307230f 100755 --- a/java/test/org/broadinstitute/sting/WalkerTest.java +++ b/java/test/org/broadinstitute/sting/WalkerTest.java @@ -35,15 +35,8 @@ import org.broadinstitute.sting.utils.collections.Pair; import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.exceptions.StingException; import org.testng.Assert; -import org.testng.annotations.Test; -import org.apache.commons.io.FileUtils; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigInteger; -import java.security.MessageDigest; import java.util.*; public class WalkerTest extends BaseTest { @@ -51,95 +44,12 @@ public class WalkerTest extends BaseTest { private File outputFileLocation = null; private static final boolean ENABLE_REPORTING = false; - /** - * Subdirectory under the ant build directory where we store integration test md5 results - */ - public static final String MD5_FILE_DB_SUBDIR = "integrationtests"; - public void setOutputFileLocation(File outputFileLocation) { this.outputFileLocation = outputFileLocation; } - private static void ensureMd5DbDirectory() { - // todo -- make path - File dir = new File(MD5_FILE_DB_SUBDIR); - if ( ! dir.exists() ) { - System.out.printf("##### Creating MD5 db %s%n", MD5_FILE_DB_SUBDIR); - if ( ! dir.mkdir() ) { - throw new ReviewedStingException("Infrastructure failure: failed to create md5 directory " + MD5_FILE_DB_SUBDIR); - } - } - } - - protected static File getFileForMD5(final String md5) { - final String basename = String.format("%s.integrationtest", md5); - return new File(MD5_FILE_DB_SUBDIR + "/" + basename); - } - - private static void updateMD5Db(final String md5, final File resultsFile) { - // todo -- copy results file to DB dir if needed under filename for md5 - final File dbFile = getFileForMD5(md5); - if ( ! dbFile.exists() ) { - // the file isn't already in the db, copy it over - System.out.printf("##### Updating MD5 file: %s%n", dbFile.getPath()); - try { - FileUtils.copyFile(resultsFile, dbFile); - } catch ( IOException e ) { - throw new ReviewedStingException(e.getMessage()); - } - } else { - System.out.printf("##### MD5 file is up to date: %s%n", dbFile.getPath()); - - } - } - - private static String getMD5Path(final String md5, final String valueIfNotFound) { - // todo -- look up the result in the directory and return the path if it exists - final File dbFile = getFileForMD5(md5); - return dbFile.exists() ? dbFile.getPath() : valueIfNotFound; - } - public String assertMatchingMD5(final String name, final File resultsFile, final String expectedMD5) { - try { - byte[] bytesOfMessage = getBytesFromFile(resultsFile); - byte[] thedigest = MessageDigest.getInstance("MD5").digest(bytesOfMessage); - BigInteger bigInt = new BigInteger(1, thedigest); - String filemd5sum = bigInt.toString(16); - while (filemd5sum.length() < 32) filemd5sum = "0" + filemd5sum; // pad to length 32 - - // - // copy md5 to integrationtests - // - updateMD5Db(filemd5sum, resultsFile); - - if (parameterize() || expectedMD5.equals("")) { - System.out.println(String.format("PARAMETERIZATION[%s]: file %s has md5 = %s, stated expectation is %s, equal? = %b", - name, resultsFile, filemd5sum, expectedMD5, filemd5sum.equals(expectedMD5))); - } else { - System.out.println(String.format("Checking MD5 for %s [calculated=%s, expected=%s]", resultsFile, filemd5sum, expectedMD5)); - System.out.flush(); - - if ( ! expectedMD5.equals(filemd5sum) ) { - // we are going to fail for real in assertEquals (so we are counted by the testing framework). - // prepare ourselves for the comparison - System.out.printf("##### Test %s is going fail #####%n", name); - String pathToExpectedMD5File = getMD5Path(expectedMD5, "[No DB file found]"); - String pathToFileMD5File = getMD5Path(filemd5sum, "[No DB file found]"); - System.out.printf("##### Path to expected file (MD5=%s): %s%n", expectedMD5, pathToExpectedMD5File); - System.out.printf("##### Path to calculated file (MD5=%s): %s%n", filemd5sum, pathToFileMD5File); - System.out.printf("##### Diff command: diff %s %s%n", pathToExpectedMD5File, pathToFileMD5File); - - // todo -- add support for simple inline display of the first N differences for text file - } - - Assert.assertEquals(filemd5sum,expectedMD5,name + " Mismatching MD5s"); - System.out.println(String.format(" => %s PASSED", name)); - } - - return filemd5sum; - } catch (Exception e) { - throw new RuntimeException("Failed to read bytes from calls file: " + resultsFile, e); - } + return assertMatchingMD5(name, resultsFile, expectedMD5, parameterize()); } public void maybeValidateSupplementaryFile(final String name, final File resultFile) { @@ -167,38 +77,6 @@ public class WalkerTest extends BaseTest { return md5s; } - - public static byte[] getBytesFromFile(File file) throws IOException { - InputStream is = new FileInputStream(file); - - // Get the size of the file - long length = file.length(); - - if (length > Integer.MAX_VALUE) { - // File is too large - } - - // Create the byte array to hold the data - byte[] bytes = new byte[(int) length]; - - // Read in the bytes - int offset = 0; - int numRead = 0; - while (offset < bytes.length - && (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) { - offset += numRead; - } - - // Ensure all the bytes have been read in - if (offset < bytes.length) { - throw new IOException("Could not completely read file " + file.getName()); - } - - // Close the input stream and return bytes - is.close(); - return bytes; - } - public class WalkerTestSpec { String args = ""; int nOutputFiles = -1; @@ -299,16 +177,6 @@ public class WalkerTest extends BaseTest { } } - public File createTempFile(String name, String extension) { - try { - File fl = File.createTempFile(name, extension); - fl.deleteOnExit(); - return fl; - } catch (IOException ex) { - throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex); - } - } - /** * execute the test, given the following: * @param name the name of the test diff --git a/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java b/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java index 8e7eb6c89..d366085d9 100644 --- a/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/interval/IntervalUtilsUnitTest.java @@ -1,6 +1,5 @@ package org.broadinstitute.sting.utils.interval; -import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile; import org.broadinstitute.sting.BaseTest; import org.testng.Assert; @@ -14,7 +13,6 @@ import org.testng.annotations.Test; import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -362,16 +360,10 @@ public class IntervalUtilsUnitTest extends BaseTest { } private List testFiles(String prefix, int count, String suffix) { - try { - ArrayList files = new ArrayList(); - for (int i = 1; i <= count; i++) { - File tmpFile = File.createTempFile(prefix + i, suffix); - tmpFile.deleteOnExit(); - files.add(tmpFile); - } - return files; - } catch (IOException e) { - throw new UserException.BadTmpDir("Unable to create temp file: " + e); + ArrayList files = new ArrayList(); + for (int i = 1; i <= count; i++) { + files.add(createTempFile(prefix + i, suffix)); } + return files; } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala index 39c47291d..863da1d4f 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/CommandLineJobRunner.scala @@ -8,6 +8,7 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils} * Runs a command line function. */ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { + /** A generated exec shell script. */ protected var exec: File = _ @@ -42,24 +43,8 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging { this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir) } - /** - * Removes all temporary files used for this LSF job. - */ - def removeTemporaryFiles() = { + override def removeTemporaryFiles() { + super.removeTemporaryFiles() IOUtils.tryDelete(exec) } - - /** - * Outputs the last lines of the error logs. - */ - protected def tailError() = { - val errorFile = functionErrorFile - if (IOUtils.waitFor(errorFile, 120)) { - val tailLines = IOUtils.tail(errorFile, 100) - val nl = "%n".format() - logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) - } else { - logger.error("Unable to access log file: %s".format(errorFile)) - } - } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 05eb7cce6..23c4661d0 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -1,6 +1,8 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.QFunction +import java.io.{StringWriter, PrintWriter} +import org.broadinstitute.sting.queue.util.{Logging, IOUtils} /** * An edge in the QGraph that runs a QFunction. @@ -8,7 +10,7 @@ import org.broadinstitute.sting.queue.function.QFunction * and then the runner is specified later when the time comes to * execute the function in the edge. */ -class FunctionEdge(var function: QFunction) extends QEdge { +class FunctionEdge(var function: QFunction) extends QEdge with Logging { var runner: JobRunner[_] =_ /** @@ -30,27 +32,99 @@ class FunctionEdge(var function: QFunction) extends QEdge { RunnerStatus.PENDING } + def start() { + try { + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + function.description) + } else { + logger.info("Starting: " + function.description) + } + logger.info("Output written to " + function.jobOutputFile) + if (function.jobErrorFile != null) + logger.info("Errors written to " + function.jobErrorFile) + + function.deleteLogs() + function.deleteOutputs() + function.mkOutputDirectories() + + runner.start() + } catch { + case e => + currentStatus = RunnerStatus.FAILED + try { + runner.removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.description, e) + } + } + /** * Returns the current status of the edge. */ def status = { - if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) - if (runner != null) - currentStatus = runner.status + if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) { + if (runner != null) { + try { + currentStatus = runner.status + + if (currentStatus == RunnerStatus.FAILED) { + try { + runner.removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + } catch { + case _ => /* ignore errors in the error handler */ + } + logger.error("Error: " + function.description) + tailError() + } else if (currentStatus == RunnerStatus.DONE) { + try { + runner.removeTemporaryFiles() + function.doneOutputs.foreach(_.createNewFile()) + } catch { + case _ => /* ignore errors in the done handler */ + } + logger.info("Done: " + function.description) + } + } catch { + case e => + currentStatus = RunnerStatus.FAILED + try { + runner.removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error retrieving status: " + function.description, e) + } + } + } + currentStatus } + /** + * Explicitly sets the status of the runner to done.. + */ + def markAsDone() { + currentStatus = RunnerStatus.DONE + } + /** * Marks this edge as skipped as it is not needed for the current run. */ - def markAsSkipped() = { + def markAsSkipped() { currentStatus = RunnerStatus.SKIPPED } /** * Resets the edge to pending status. */ - def resetToPending(cleanOutputs: Boolean) = { + def resetToPending(cleanOutputs: Boolean) { currentStatus = RunnerStatus.PENDING if (cleanOutputs) function.deleteOutputs() @@ -60,4 +134,45 @@ class FunctionEdge(var function: QFunction) extends QEdge { def inputs = function.inputs def outputs = function.outputs override def dotString = function.dotString + + /** + * Returns the path to the file to use for logging errors. + * @return the path to the file to use for logging errors. + */ + private def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile + + /** + * Outputs the last lines of the error logs. + */ + private def tailError() = { + val errorFile = functionErrorFile + if (IOUtils.waitFor(errorFile, 120)) { + val maxLines = 100 + val tailLines = IOUtils.tail(errorFile, maxLines) + val nl = "%n".format() + val summary = if (tailLines.size <= maxLines) "Last %d lines".format(maxLines) else "Contents" + logger.error("%s of %s:%n%s".format(summary, errorFile, tailLines.mkString(nl))) + } else { + logger.error("Unable to access log file: %s".format(errorFile)) + } + } + + /** + * Writes the contents of the error to the error file. + */ + private def writeError(content: String) { + IOUtils.writeContents(functionErrorFile, content) + } + + /** + * Writes the stack trace to the error file. + */ + private def writeStackTrace(e: Throwable) { + val stackTrace = new StringWriter + val printWriter = new PrintWriter(stackTrace) + printWriter.println(function.description) + e.printStackTrace(printWriter) + printWriter.close + IOUtils.writeContents(functionErrorFile, stackTrace.toString) + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index 18031dc4e..d583a55ef 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -1,43 +1,20 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.util.IOUtils /** * Runs a function that executes in process and does not fork out an external process. */ -class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProcessFunction] with Logging { +class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProcessFunction] { private var runStatus: RunnerStatus.Value = _ def start() = { - try { - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + function.description) - } else { - logger.info("Starting: " + function.description) - } - - function.deleteLogs() - function.deleteOutputs() - function.mkOutputDirectories() - runStatus = RunnerStatus.RUNNING - function.run() - function.doneOutputs.foreach(_.createNewFile()) - writeDone() - runStatus = RunnerStatus.DONE - logger.info("Done: " + function.description) - } catch { - case e => { - runStatus = RunnerStatus.FAILED - try { - function.failOutputs.foreach(_.createNewFile()) - writeStackTrace(e) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + function.description, e) - } - } + runStatus = RunnerStatus.RUNNING + function.run() + val content = "%s%nDone.".format(function.description) + IOUtils.writeContents(function.jobOutputFile, content) + runStatus = RunnerStatus.DONE } def status = runStatus diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 0a5c9bb0d..138326750 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -1,7 +1,5 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.util.IOUtils -import java.io.{PrintWriter, StringWriter} import org.broadinstitute.sting.queue.function.QFunction /** @@ -29,30 +27,9 @@ trait JobRunner[TFunction <: QFunction] { def function: TFunction /** - * Writes the basic function description to the job done file. + * Removes all temporary files used for this job. */ - protected def writeDone() { - val content = "%s%nDone.".format(function.description) - IOUtils.writeContents(function.jobOutputFile, content) - } - - /** - * Writes the contents of the error to the error file. - */ - protected def writeError(content: String) { - IOUtils.writeContents(functionErrorFile, content) - } - - /** - * Writes the stack trace to the error file. - */ - protected def writeStackTrace(e: Throwable) { - val stackTrace = new StringWriter - val printWriter = new PrintWriter(stackTrace) - printWriter.println(function.description) - e.printStackTrace(printWriter) - printWriter.close - IOUtils.writeContents(functionErrorFile, stackTrace.toString) + def removeTemporaryFiles() { } /** @@ -65,10 +42,4 @@ trait JobRunner[TFunction <: QFunction] { if (updater.isDefinedAt(value)) updater(value) } - - /** - * Returns the path to the file to use for logging errors. - * @return the path to the file to use for logging errors. - */ - protected def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala index ddee2eba2..c9933a2ed 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/Lsf706JobRunner.scala @@ -14,183 +14,136 @@ import java.util.Date /** * Runs jobs on an LSF compute cluster. */ -class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(function) with Logging { +class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { // Run the static initializer for Lsf706JobRunner Lsf706JobRunner + /** Job Id of the currently executing job. */ + var jobId = -1L + + /** Last known run status */ + private var runStatus: RunnerStatus.Value = _ + /** * Dispatches the function on the LSF cluster. * @param function Command to run. */ def start() = { - try { - val request = new submit - for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) - request.rLimits(i) = LibLsf.DEFAULT_RLIMIT; + val request = new submit + for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) + request.rLimits(i) = LibLsf.DEFAULT_RLIMIT; - request.outFile = function.jobOutputFile.getPath - request.options |= LibBat.SUB_OUT_FILE + request.outFile = function.jobOutputFile.getPath + request.options |= LibBat.SUB_OUT_FILE - if (function.jobErrorFile != null) { - request.errFile = function.jobErrorFile.getPath - request.options |= LibBat.SUB_ERR_FILE - } - - if (function.jobProject != null) { - request.projectName = function.jobProject - request.options |= LibBat.SUB_PROJECT_NAME - } - - if (function.jobQueue != null) { - request.queue = function.jobQueue - request.options |= LibBat.SUB_QUEUE - } - - if (IOUtils.absolute(new File(".")) != function.commandDirectory) { - request.cwd = function.commandDirectory.getPath - request.options3 |= LibBat.SUB3_CWD - } - - if (function.jobRestartable) { - request.options |= LibBat.SUB_RERUNNABLE - } - - if (function.memoryLimit.isDefined) { - request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" - request.options |= LibBat.SUB_RES_REQ - } - - if (function.description != null) { - request.jobName = function.description.take(1000) - request.options |= LibBat.SUB_JOB_NAME - } - - if (function.jobLimitSeconds.isDefined) { - request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get - } - - writeExec() - request.command = "sh " + exec - - // Allow advanced users to update the request. - updateJobRun(request) - - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + bsubCommand) - } else { - logger.info("Starting: " + bsubCommand) - } - - function.deleteLogs() - function.deleteOutputs() - function.mkOutputDirectories() - - runStatus = RunnerStatus.RUNNING - Retry.attempt(() => { - val reply = new submitReply - jobId = LibBat.lsb_submit(request, reply) - if (jobId < 0) - throw new QException(LibBat.lsb_sperror("Unable to submit job")) - }, 1, 5, 10) - logger.info("Submitted LSF job id: " + jobId) - } catch { - case e => - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - writeStackTrace(e) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + bsubCommand, e) + if (function.jobErrorFile != null) { + request.errFile = function.jobErrorFile.getPath + request.options |= LibBat.SUB_ERR_FILE } + + if (function.jobProject != null) { + request.projectName = function.jobProject + request.options |= LibBat.SUB_PROJECT_NAME + } + + if (function.jobQueue != null) { + request.queue = function.jobQueue + request.options |= LibBat.SUB_QUEUE + } + + if (IOUtils.absolute(new File(".")) != function.commandDirectory) { + request.cwd = function.commandDirectory.getPath + request.options3 |= LibBat.SUB3_CWD + } + + if (function.jobRestartable) { + request.options |= LibBat.SUB_RERUNNABLE + } + + if (function.memoryLimit.isDefined) { + request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" + request.options |= LibBat.SUB_RES_REQ + } + + if (function.description != null) { + request.jobName = function.description.take(1000) + request.options |= LibBat.SUB_JOB_NAME + } + + if (function.jobLimitSeconds.isDefined) { + request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get + } + + writeExec() + request.command = "sh " + exec + + // Allow advanced users to update the request. + updateJobRun(request) + + runStatus = RunnerStatus.RUNNING + Retry.attempt(() => { + val reply = new submitReply + jobId = LibBat.lsb_submit(request, reply) + if (jobId < 0) + throw new QException(LibBat.lsb_sperror("Unable to submit job")) + }, 1, 5, 10) + logger.info("Submitted LSF job id: " + jobId) } /** * Updates and returns the status. */ def status = { + var jobStatus = LibBat.JOB_STAT_NULL + var exitStatus = 0 + var exitInfo = 0 + var endTime: NativeLong = null + + LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) try { - var jobStatus = LibBat.JOB_STAT_NULL - var exitStatus = 0 - var exitInfo = 0 - var endTime: NativeLong = null - - LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) - try { - val jobInfo = LibBat.lsb_readjobinfo(null) - if (jobInfo == null) { - jobStatus = LibBat.JOB_STAT_UNKWN - exitStatus = 0 - exitInfo = 0 - endTime = null - } else { - jobStatus = jobInfo.status - exitStatus = jobInfo.exitStatus - exitInfo = jobInfo.exitInfo - endTime = jobInfo.endTime - } - } finally { - LibBat.lsb_closejobinfo() - } - - logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { - val now = new Date().getTime - - if (firstUnknownTime.isEmpty) { - firstUnknownTime = Some(now) - logger.debug("First unknown status for job id: " + jobId) - } - - if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { - // Unknown status has been returned for a while now. - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - } catch { - case _ => /* ignore errors in the error handler */ - } - logger.error("Error: " + bsubCommand + ", unknown status for " + unknownStatusMaxSeconds + " seconds.") - } + val jobInfo = LibBat.lsb_readjobinfo(null) + if (jobInfo == null) { + jobStatus = LibBat.JOB_STAT_UNKWN + exitStatus = 0 + exitInfo = 0 + endTime = null } else { - // Reset the last time an unknown status was seen. - firstUnknownTime = None - - if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { - // Exited function that (probably) won't be retried. - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - } catch { - case _ => /* ignore errors in the error handler */ - } - logger.error("Error: " + bsubCommand) - tailError() - } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { - // Done successfully. - removeTemporaryFiles() - function.doneOutputs.foreach(_.createNewFile()) - runStatus = RunnerStatus.DONE - logger.info("Done: " + bsubCommand) - } + jobStatus = jobInfo.status + exitStatus = jobInfo.exitStatus + exitInfo = jobInfo.exitInfo + endTime = jobInfo.endTime } - } catch { - case e => + } finally { + LibBat.lsb_closejobinfo() + } + + logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo)) + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) { + val now = new Date().getTime + + if (firstUnknownTime.isEmpty) { + firstUnknownTime = Some(now) + logger.debug("First unknown status for job id: " + jobId) + } + + if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) { + // Unknown status has been returned for a while now. runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - writeStackTrace(e) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + bsubCommand, e) + logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description)) + } + } else { + // Reset the last time an unknown status was seen. + firstUnknownTime = None + + if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { + // Exited function that (probably) won't be retried. + runStatus = RunnerStatus.FAILED + } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { + // Done successfully. + runStatus = RunnerStatus.DONE + } } runStatus diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala deleted file mode 100644 index fb937f2ab..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ /dev/null @@ -1,17 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.queue.util._ - -/** - * Runs jobs on an LSF compute cluster. - */ -abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { - protected var runStatus: RunnerStatus.Value = _ - - /** Job Id of the currently executing job. */ - var jobId = -1L - - // TODO: Full bsub command for debugging. - protected def bsubCommand = "bsub " + function.commandLine -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index b62e37f7b..5aebebb66 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -1,74 +1,34 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.CommandLineFunction -import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob} +import org.broadinstitute.sting.queue.util.ShellJob /** * Runs jobs one at a time locally */ -class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging { +class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner { private var runStatus: RunnerStatus.Value = _ /** * Runs the function on the local shell. * @param function Command to run. */ - def start() = { - try { - val job = new ShellJob + def start() { + val job = new ShellJob - job.workingDir = function.commandDirectory - job.outputFile = function.jobOutputFile - job.errorFile = function.jobErrorFile + job.workingDir = function.commandDirectory + job.outputFile = function.jobOutputFile + job.errorFile = function.jobErrorFile - writeExec() - job.shellScript = exec + writeExec() + job.shellScript = exec - // Allow advanced users to update the job. - updateJobRun(job) + // Allow advanced users to update the job. + updateJobRun(job) - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine) - } else { - logger.info("Starting: " + function.commandLine) - } - - logger.info("Output written to " + function.jobOutputFile) - if (function.jobErrorFile != null) - logger.info("Errors written to " + function.jobErrorFile) - - function.deleteLogs() - function.deleteOutputs() - function.mkOutputDirectories() - runStatus = RunnerStatus.RUNNING - job.run() - removeTemporaryFiles() - function.doneOutputs.foreach(_.createNewFile()) - runStatus = RunnerStatus.DONE - logger.info("Done: " + function.commandLine) - } catch { - case jee: JobExitException => - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - writeError(jee.getMessage) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + function.commandLine) - logger.error(jee.stdErr) - case e => - runStatus = RunnerStatus.FAILED - try { - removeTemporaryFiles() - function.failOutputs.foreach(_.createNewFile()) - writeStackTrace(e) - } catch { - case _ => /* ignore errors in the exception handler */ - } - logger.error("Error: " + function.commandLine, e) - } + runStatus = RunnerStatus.RUNNING + job.run() + runStatus = RunnerStatus.DONE } def status = runStatus diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala index 610e832a5..a7dfae4f3 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTest.scala @@ -3,28 +3,84 @@ package org.broadinstitute.sting.queue.pipeline import org.broadinstitute.sting.utils.Utils import org.testng.Assert import org.broadinstitute.sting.commandline.CommandLineProgram -import org.broadinstitute.sting.queue.util.ProcessController import org.broadinstitute.sting.queue.QCommandLine +import java.io.File +import org.broadinstitute.sting.queue.util.{TextFormatUtils, ProcessController} +import java.util.Date +import java.text.SimpleDateFormat +import org.broadinstitute.sting.{WalkerTest, BaseTest} object PipelineTest { private var runningCommandLines = Set.empty[QCommandLine] + private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" + val run = System.getProperty("pipeline.run") == "run" + def executeTest(name: String, pipelineTest: PipelineTestSpec) { + println(Utils.dupString('-', 80)); + executeTest(name, pipelineTest.args, pipelineTest.expectedException) + if (run) { + assertMatchingMD5s(name, pipelineTest.fileMD5s) + if (pipelineTest.evalSpec != null) + validateEval(name, pipelineTest.evalSpec) + println(" => %s PASSED".format(name)) + } + else + println(" => %s PASSED DRY RUN".format(name)) + } + + private def assertMatchingMD5s(name: String, fileMD5s: Traversable[(File, String)]) { + var failed = 0 + for ((file, expectedMD5) <- fileMD5s) { + val calculatedMD5 = BaseTest.testFileMD5(name, file, expectedMD5, false) + failed += 1 + } + if (failed > 0) + Assert.fail("%d MD5%s did not match.".format(failed, TextFormatUtils.plural(failed))) + } + + private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) { + // write the report to the shared validation data location + val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss") + val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, name, formatter.format(new Date)) + new File(reportLocation).getParentFile.mkdirs + + // Run variant eval generating the report and validating the pipeline vcf. + var walkerCommand = "-T VariantEval -R %s -B:eval,VCF %s -E %s -reportType R -reportLocation %s -L %s" + .format(evalSpec.reference, evalSpec.vcf, evalSpec.evalModules.mkString(" -E "), reportLocation, evalSpec.intervals) + + if (evalSpec.dbsnp != null) { + val dbsnpArg = if (evalSpec.dbsnp.getName.toLowerCase.endsWith(".vcf")) "-B:dbsnp,VCF" else "-D" + walkerCommand += " %s %s".format(dbsnpArg, evalSpec.dbsnp) + } + + if (evalSpec.intervals != null) + walkerCommand += " -L %s".format(evalSpec.intervals) + + for (validation <- evalSpec.validations) { + walkerCommand += " -summary %s".format(validation.metric) + walkerCommand += " -validate '%1$s >= %2$s' -validate '%1$s <= %3$s'".format( + validation.metric, validation.min, validation.max) + } + + WalkerTest.executeTest(name + "-validate", walkerCommand, null) + } + /** * execute the test * @param name the name of the test * @param args the argument list * @param expectedException the expected exception or null if no exception is expected. */ - def executeTest(name: String, args: String, expectedException: Class[_]) = { + def executeTest(name: String, args: String, expectedException: Class[_]) { var command = Utils.escapeExpressions(args) // add the logging level to each of the integration test commands - command = Utils.appendArray(command, "-l", "WARN", "-startFromScratch", "-tempDir", tempDir(name), "-runDir", runDir(name)) + command = Utils.appendArray(command, "-bsub", "-l", "WARN", "-startFromScratch", "-tempDir", tempDir(name), "-runDir", runDir(name)) if (run) - command = Utils.appendArray(command, "-run", "-bsub") + command = Utils.appendArray(command, "-run") // run the executable var gotAnException = false @@ -68,13 +124,14 @@ object PipelineTest { } } + val currentDir = new File(".").getAbsolutePath def testDir(testName: String) = "pipelinetests/%s/".format(testName) def runDir(testName: String) = testDir(testName) + "run/" def tempDir(testName: String) = testDir(testName) + "temp/" Runtime.getRuntime.addShutdownHook(new Thread { /** Cleanup as the JVM shuts down. */ - override def run = { + override def run { try { ProcessController.shutdown() } catch { diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestEvalSpec.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestEvalSpec.scala new file mode 100644 index 000000000..9bcd716b6 --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestEvalSpec.scala @@ -0,0 +1,44 @@ +package org.broadinstitute.sting.queue.pipeline + +import java.io.File + +/** + * Data validations to evaluate on a VCF using VariantEval. + */ +class PipelineTestEvalSpec { + // TODO: Reuse the Project "YAML" object for reference, intervals, etc. + + /** VCF to eval */ + var vcf: File = _ + + /** Reference for the VCF */ + var reference: File = _ + + /** Intervals for the VCF */ + var intervals: File = _ + + /** DBSNP to use for comparisons, via -B:dbsnp,VCF or -D */ + var dbsnp: File = _ + + /** List of eval modules to output. */ + var evalModules = List("CompOverlap", "CountFunctionalClasses", "CountVariants", "SimpleMetricsBySample", "TiTvVariantEvaluator") + + /** Validations to assert. */ + var validations: List[PipelineValidation] = Nil +} + +/** A VariantEval JEXL and range of values to validate. */ +class PipelineValidation(val metric: String, val min: String, val max: String) { +} + +/** A VariantEval JEXL and target to validate within a 1% tolerance. */ +class IntegerValidation(metric: String, target: Int) + extends PipelineValidation(metric, + (target * .99).floor.toInt.toString, (target * 1.01).ceil.toInt.toString) { +} + +/** A VariantEval JEXL and target to validate within a 1% tolerance. */ +class DoubleValidation(metric: String, target: Double) + extends PipelineValidation(metric, + "%.2f".format((target * 99).floor / 100), "%.2f".format((target * 101).ceil / 100)) { +} diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestSpec.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestSpec.scala new file mode 100644 index 000000000..5d1e0f4d7 --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/PipelineTestSpec.scala @@ -0,0 +1,30 @@ +package org.broadinstitute.sting.queue.pipeline + +import java.io.File + +class PipelineTestSpec { + + /** The arguments to pass to the Queue test, ex: "-S scala/qscript/examples/HelloWorld.scala" */ + var args: String = _ + + /** Expected MD5 results for each file path. */ + var fileMD5s = Map.empty[File, String] + + /** VariantEval validations to run on a VCF after the pipeline has completed. */ + var evalSpec: PipelineTestEvalSpec = _ + + /** Expected exception from the test. */ + var expectedException: Class[_ <: Exception] = null + + def this(args: String, fileMD5s: Traversable[(File, String)]) = { + this() + this.args = args + this.fileMD5s = fileMD5s.toMap + } + + def this(args: String, expectedException: Class[_ <: Exception]) = { + this() + this.args = args + this.expectedException = expectedException + } +} diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala new file mode 100644 index 000000000..6b43479bb --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/examples/HelloWorldPipelineTest.scala @@ -0,0 +1,16 @@ +package org.broadinstitute.sting.queue.pipeline.examples + +import org.testng.annotations.Test +import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec} + +class HelloWorldPipelineTest { + @Test + def testHelloWorld { + var testName = "helloworld" + val spec = new PipelineTestSpec + spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPrefix HelloWorld -jobQueue hour" + // TODO: working example of MD5 usage. + // spec.fileMD5s += new File(PipelineTest.runDir(testName) + "hello.out") -> "0123456789abcdef0123456789abcdef" + PipelineTest.executeTest(testName, spec) + } +} diff --git a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala similarity index 64% rename from scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala rename to scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala index 61a4bf120..592985b88 100644 --- a/scala/test/org/broadinstitute/sting/queue/pipeline/FullCallingPipelineTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/pipeline/playground/FullCallingPipelineTest.scala @@ -1,19 +1,16 @@ -package org.broadinstitute.sting.queue.pipeline +package org.broadinstitute.sting.queue.pipeline.playground import org.testng.annotations.{DataProvider, Test} import collection.JavaConversions._ import java.io.File import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline} import org.broadinstitute.sting.utils.yaml.YamlUtils -import org.broadinstitute.sting.{WalkerTest, BaseTest} -import java.text.SimpleDateFormat -import java.util.Date +import org.broadinstitute.sting.BaseTest +import org.broadinstitute.sting.queue.pipeline._ -class FullCallingPipelineTest extends BaseTest { +class FullCallingPipelineTest { def datasets = List(k1gChr20Dataset, k1gExomeDataset) - private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" - // In fullCallingPipeline.q VariantEval is always compared against 129. // Until the newvarianteval is finalized which will allow java import of the prior results, // we re-run VariantEval to validate the run, and replicate that behavior here. @@ -93,12 +90,11 @@ class FullCallingPipelineTest extends BaseTest { var cleanType = "cleaned" // Run the pipeline with the expected inputs. - val currentDir = new File(".").getAbsolutePath var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" + " -jobProject %s -Y %s" + " -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" + " --gatkjar %s/dist/GenomeAnalysisTK.jar") - .format(projectName, yamlFile, currentDir, currentDir) + .format(projectName, yamlFile, PipelineTest.currentDir, PipelineTest.currentDir) if (!dataset.runIndelRealigner) { pipelineCommand += " -skipCleaning" @@ -108,37 +104,18 @@ class FullCallingPipelineTest extends BaseTest { if (dataset.jobQueue != null) pipelineCommand += " -jobQueue " + dataset.jobQueue + val pipelineSpec = new PipelineTestSpec + pipelineSpec.args = pipelineCommand + + pipelineSpec.evalSpec = new PipelineTestEvalSpec + pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType)) + pipelineSpec.evalSpec.reference = dataset.pipeline.getProject.getReferenceFile + pipelineSpec.evalSpec.intervals = dataset.pipeline.getProject.getIntervalList + pipelineSpec.evalSpec.dbsnp = variantEvalDbsnpFile + pipelineSpec.evalSpec.validations = dataset.validations + // Run the test, at least checking if the command compiles - PipelineTest.executeTest(testName, pipelineCommand, null) - - // If actually running, evaluate the output validating the expressions. - if (PipelineTest.run) { - // path where the pipeline should have output the handfiltered vcf - val handFilteredVcf = PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType) - - // eval modules to record in the validation directory - val evalModules = List("CompOverlap", "CountFunctionalClasses", "CountVariants", "SimpleMetricsBySample", "TiTvVariantEvaluator") - - // write the report to the shared validation data location - val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss") - val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, testName, formatter.format(new Date)) - new File(reportLocation).getParentFile.mkdirs - - // Run variant eval generating the report and validating the pipeline vcfs. - var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" + - " -E %s -reportType R -reportLocation %s -L %s") - .format( - dataset.pipeline.getProject.getReferenceFile, variantEvalDbsnpFile, handFilteredVcf, - evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList) - - for (validation <- dataset.validations) { - walkerCommand += " -summary %s".format(validation.metric) - walkerCommand += " -validate '%1$s >= %2$s' -validate '%1$s <= %3$s'".format( - validation.metric, validation.min, validation.max) - } - - WalkerTest.executeTest("fullCallingPipelineValidate-" + projectName, walkerCommand, null) - } + PipelineTest.executeTest(testName, pipelineSpec) } class PipelineDataset( @@ -149,22 +126,8 @@ class FullCallingPipelineTest extends BaseTest { override def toString = pipeline.getProject.getName } - class PipelineValidation(val metric: String, val min: String, val max: String) { - } - - class IntegerValidation(metric: String, target: Int) - extends PipelineValidation(metric, - (target * .99).floor.toInt.toString, (target * 1.01).ceil.toInt.toString) { - } - - class DoubleValidation(metric: String, target: Double) - extends PipelineValidation(metric, - "%.2f".format((target * 99).floor / 100), "%.2f".format((target * 101).ceil / 100)) { - } - private def writeTempYaml(pipeline: Pipeline) = { - val tempFile = File.createTempFile(pipeline.getProject.getName + "-", ".yaml") - tempFile.deleteOnExit + val tempFile = BaseTest.createTempFile(pipeline.getProject.getName + "-", ".yaml") YamlUtils.dump(pipeline, tempFile) tempFile } diff --git a/scala/test/org/broadinstitute/sting/queue/util/ShellJobUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/ShellJobUnitTest.scala index bec771e2a..a2e6fb7c0 100644 --- a/scala/test/org/broadinstitute/sting/queue/util/ShellJobUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/util/ShellJobUnitTest.scala @@ -2,10 +2,9 @@ package org.broadinstitute.sting.queue.util import org.broadinstitute.sting.BaseTest import org.testng.annotations.Test -import java.io.File import org.testng.Assert -class ShellJobUnitTest extends BaseTest { +class ShellJobUnitTest { @Test def testEcho { val job = new ShellJob @@ -33,19 +32,19 @@ class ShellJobUnitTest extends BaseTest { job = new ShellJob job.shellScript = writeScript("echo #") - job.outputFile = File.createTempFile("temp", "") + job.outputFile = BaseTest.createTempFile("temp", "") job.run() Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, "") job = new ShellJob job.shellScript = writeScript("""echo \#""") - job.outputFile = File.createTempFile("temp", "") + job.outputFile = BaseTest.createTempFile("temp", "") job.run() Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, "#") job = new ShellJob job.shellScript = writeScript("""echo \\#""") - job.outputFile = File.createTempFile("temp", "") + job.outputFile = BaseTest.createTempFile("temp", "") job.run() Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, """\#""") } @@ -66,11 +65,9 @@ class ShellJobUnitTest extends BaseTest { job.run() } - private val tempDir = new File(System.getProperty("java.io.tmpdir")) - private def writeScript(contents: String) = { - val script = IOUtils.writeTempFile(contents, "temp", "", tempDir) - script.deleteOnExit - script + val file = BaseTest.createTempFile("temp", "") + IOUtils.writeContents(file, contents) + file } }