Moved MD5 utils from WalkerTest to BaseTest for use by PipelineTests.

Moved VariantEval validation from FCPTest to PipelineTest.
Cleaned up some duplicate code for writing temp files during tests.
Moved FCPTest to playground namespace to match move for FCP.q.
Added a basic HelloWorldPipelineTest for the HelloWorld QScript. 
Moved duplicated error handling from JobRunners into the FunctionEdge.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5068 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-01-25 04:11:49 +00:00
parent 9db02059ac
commit 9923e05e0a
16 changed files with 598 additions and 526 deletions

View File

@ -1,9 +1,11 @@
package org.broadinstitute.sting;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.*;
import org.apache.log4j.spi.LoggingEvent;
import org.broadinstitute.sting.commandline.CommandLineUtils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.testng.Assert;
import java.io.*;
import java.math.BigInteger;
@ -64,7 +66,12 @@ public abstract class BaseTest {
public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod";
public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf";
public final String testDir = "testdata/";
/**
* Subdirectory under the ant build directory where we store integration test md5 results
*/
public static final String MD5_FILE_DB_SUBDIR = "integrationtests";
public static final String testDir = "testdata/";
/** before the class starts up */
static {
@ -177,5 +184,159 @@ public abstract class BaseTest {
}
}
protected static void ensureMd5DbDirectory() {
// todo -- make path
File dir = new File(MD5_FILE_DB_SUBDIR);
if ( ! dir.exists() ) {
System.out.printf("##### Creating MD5 db %s%n", MD5_FILE_DB_SUBDIR);
if ( ! dir.mkdir() ) {
throw new ReviewedStingException("Infrastructure failure: failed to create md5 directory " + MD5_FILE_DB_SUBDIR);
}
}
}
protected static File getFileForMD5(final String md5) {
final String basename = String.format("%s.integrationtest", md5);
return new File(MD5_FILE_DB_SUBDIR + "/" + basename);
}
private static void updateMD5Db(final String md5, final File resultsFile) {
// todo -- copy results file to DB dir if needed under filename for md5
final File dbFile = getFileForMD5(md5);
if ( ! dbFile.exists() ) {
// the file isn't already in the db, copy it over
System.out.printf("##### Updating MD5 file: %s%n", dbFile.getPath());
try {
FileUtils.copyFile(resultsFile, dbFile);
} catch ( IOException e ) {
throw new ReviewedStingException(e.getMessage());
}
} else {
System.out.printf("##### MD5 file is up to date: %s%n", dbFile.getPath());
}
}
private static String getMD5Path(final String md5, final String valueIfNotFound) {
// todo -- look up the result in the directory and return the path if it exists
final File dbFile = getFileForMD5(md5);
return dbFile.exists() ? dbFile.getPath() : valueIfNotFound;
}
public static byte[] getBytesFromFile(File file) throws IOException {
InputStream is = new FileInputStream(file);
// Get the size of the file
long length = file.length();
if (length > Integer.MAX_VALUE) {
// File is too large
}
// Create the byte array to hold the data
byte[] bytes = new byte[(int) length];
// Read in the bytes
int offset = 0;
int numRead = 0;
while (offset < bytes.length
&& (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) {
offset += numRead;
}
// Ensure all the bytes have been read in
if (offset < bytes.length) {
throw new IOException("Could not completely read file " + file.getName());
}
// Close the input stream and return bytes
is.close();
return bytes;
}
/**
* Tests a file MD5 against an expected value, returning the MD5. NOTE: This function WILL throw an exception if the MD5s are different.
* @param name Name of the test.
* @param resultsFile File to MD5.
* @param expectedMD5 Expected MD5 value.
* @param parameterize If true or if expectedMD5 is an empty string, will print out the calculated MD5 instead of error text.
* @return The calculated MD5.
*/
public static String assertMatchingMD5(final String name, final File resultsFile, final String expectedMD5, final boolean parameterize) {
String filemd5sum = testFileMD5(name, resultsFile, expectedMD5, parameterize);
if (parameterize || expectedMD5.equals("")) {
// Don't assert
} else {
Assert.assertEquals(filemd5sum, expectedMD5, name + " Mismatching MD5s");
System.out.println(String.format(" => %s PASSED", name));
}
return filemd5sum;
}
/**
* Tests a file MD5 against an expected value, returning the MD5. NOTE: This function WILL NOT throw an exception if the MD5s are different.
* @param name Name of the test.
* @param resultsFile File to MD5.
* @param expectedMD5 Expected MD5 value.
* @param parameterize If true or if expectedMD5 is an empty string, will print out the calculated MD5 instead of error text.
* @return The calculated MD5.
*/
public static String testFileMD5(final String name, final File resultsFile, final String expectedMD5, final boolean parameterize) {
try {
byte[] bytesOfMessage = getBytesFromFile(resultsFile);
byte[] thedigest = MessageDigest.getInstance("MD5").digest(bytesOfMessage);
BigInteger bigInt = new BigInteger(1, thedigest);
String filemd5sum = bigInt.toString(16);
while (filemd5sum.length() < 32) filemd5sum = "0" + filemd5sum; // pad to length 32
//
// copy md5 to integrationtests
//
updateMD5Db(filemd5sum, resultsFile);
if (parameterize || expectedMD5.equals("")) {
System.out.println(String.format("PARAMETERIZATION[%s]: file %s has md5 = %s, stated expectation is %s, equal? = %b",
name, resultsFile, filemd5sum, expectedMD5, filemd5sum.equals(expectedMD5)));
} else {
System.out.println(String.format("Checking MD5 for %s [calculated=%s, expected=%s]", resultsFile, filemd5sum, expectedMD5));
System.out.flush();
if ( ! expectedMD5.equals(filemd5sum) ) {
// we are going to fail for real in assertEquals (so we are counted by the testing framework).
// prepare ourselves for the comparison
System.out.printf("##### Test %s is going fail #####%n", name);
String pathToExpectedMD5File = getMD5Path(expectedMD5, "[No DB file found]");
String pathToFileMD5File = getMD5Path(filemd5sum, "[No DB file found]");
System.out.printf("##### Path to expected file (MD5=%s): %s%n", expectedMD5, pathToExpectedMD5File);
System.out.printf("##### Path to calculated file (MD5=%s): %s%n", filemd5sum, pathToFileMD5File);
System.out.printf("##### Diff command: diff %s %s%n", pathToExpectedMD5File, pathToFileMD5File);
// todo -- add support for simple inline display of the first N differences for text file
}
}
return filemd5sum;
} catch (Exception e) {
throw new RuntimeException("Failed to read bytes from calls file: " + resultsFile, e);
}
}
/**
* Creates a temp file that will be deleted on exit after tests are complete.
* @param name Prefix of the file.
* @param extension Extension to concat to the end of the file.
* @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createTempFile(String name, String extension) {
try {
File file = File.createTempFile(name, extension);
file.deleteOnExit();
return file;
} catch (IOException ex) {
throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex);
}
}
}

View File

@ -35,15 +35,8 @@ import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.exceptions.StingException;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.*;
public class WalkerTest extends BaseTest {
@ -51,95 +44,12 @@ public class WalkerTest extends BaseTest {
private File outputFileLocation = null;
private static final boolean ENABLE_REPORTING = false;
/**
* Subdirectory under the ant build directory where we store integration test md5 results
*/
public static final String MD5_FILE_DB_SUBDIR = "integrationtests";
public void setOutputFileLocation(File outputFileLocation) {
this.outputFileLocation = outputFileLocation;
}
private static void ensureMd5DbDirectory() {
// todo -- make path
File dir = new File(MD5_FILE_DB_SUBDIR);
if ( ! dir.exists() ) {
System.out.printf("##### Creating MD5 db %s%n", MD5_FILE_DB_SUBDIR);
if ( ! dir.mkdir() ) {
throw new ReviewedStingException("Infrastructure failure: failed to create md5 directory " + MD5_FILE_DB_SUBDIR);
}
}
}
protected static File getFileForMD5(final String md5) {
final String basename = String.format("%s.integrationtest", md5);
return new File(MD5_FILE_DB_SUBDIR + "/" + basename);
}
private static void updateMD5Db(final String md5, final File resultsFile) {
// todo -- copy results file to DB dir if needed under filename for md5
final File dbFile = getFileForMD5(md5);
if ( ! dbFile.exists() ) {
// the file isn't already in the db, copy it over
System.out.printf("##### Updating MD5 file: %s%n", dbFile.getPath());
try {
FileUtils.copyFile(resultsFile, dbFile);
} catch ( IOException e ) {
throw new ReviewedStingException(e.getMessage());
}
} else {
System.out.printf("##### MD5 file is up to date: %s%n", dbFile.getPath());
}
}
private static String getMD5Path(final String md5, final String valueIfNotFound) {
// todo -- look up the result in the directory and return the path if it exists
final File dbFile = getFileForMD5(md5);
return dbFile.exists() ? dbFile.getPath() : valueIfNotFound;
}
public String assertMatchingMD5(final String name, final File resultsFile, final String expectedMD5) {
try {
byte[] bytesOfMessage = getBytesFromFile(resultsFile);
byte[] thedigest = MessageDigest.getInstance("MD5").digest(bytesOfMessage);
BigInteger bigInt = new BigInteger(1, thedigest);
String filemd5sum = bigInt.toString(16);
while (filemd5sum.length() < 32) filemd5sum = "0" + filemd5sum; // pad to length 32
//
// copy md5 to integrationtests
//
updateMD5Db(filemd5sum, resultsFile);
if (parameterize() || expectedMD5.equals("")) {
System.out.println(String.format("PARAMETERIZATION[%s]: file %s has md5 = %s, stated expectation is %s, equal? = %b",
name, resultsFile, filemd5sum, expectedMD5, filemd5sum.equals(expectedMD5)));
} else {
System.out.println(String.format("Checking MD5 for %s [calculated=%s, expected=%s]", resultsFile, filemd5sum, expectedMD5));
System.out.flush();
if ( ! expectedMD5.equals(filemd5sum) ) {
// we are going to fail for real in assertEquals (so we are counted by the testing framework).
// prepare ourselves for the comparison
System.out.printf("##### Test %s is going fail #####%n", name);
String pathToExpectedMD5File = getMD5Path(expectedMD5, "[No DB file found]");
String pathToFileMD5File = getMD5Path(filemd5sum, "[No DB file found]");
System.out.printf("##### Path to expected file (MD5=%s): %s%n", expectedMD5, pathToExpectedMD5File);
System.out.printf("##### Path to calculated file (MD5=%s): %s%n", filemd5sum, pathToFileMD5File);
System.out.printf("##### Diff command: diff %s %s%n", pathToExpectedMD5File, pathToFileMD5File);
// todo -- add support for simple inline display of the first N differences for text file
}
Assert.assertEquals(filemd5sum,expectedMD5,name + " Mismatching MD5s");
System.out.println(String.format(" => %s PASSED", name));
}
return filemd5sum;
} catch (Exception e) {
throw new RuntimeException("Failed to read bytes from calls file: " + resultsFile, e);
}
return assertMatchingMD5(name, resultsFile, expectedMD5, parameterize());
}
public void maybeValidateSupplementaryFile(final String name, final File resultFile) {
@ -167,38 +77,6 @@ public class WalkerTest extends BaseTest {
return md5s;
}
public static byte[] getBytesFromFile(File file) throws IOException {
InputStream is = new FileInputStream(file);
// Get the size of the file
long length = file.length();
if (length > Integer.MAX_VALUE) {
// File is too large
}
// Create the byte array to hold the data
byte[] bytes = new byte[(int) length];
// Read in the bytes
int offset = 0;
int numRead = 0;
while (offset < bytes.length
&& (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) {
offset += numRead;
}
// Ensure all the bytes have been read in
if (offset < bytes.length) {
throw new IOException("Could not completely read file " + file.getName());
}
// Close the input stream and return bytes
is.close();
return bytes;
}
public class WalkerTestSpec {
String args = "";
int nOutputFiles = -1;
@ -299,16 +177,6 @@ public class WalkerTest extends BaseTest {
}
}
public File createTempFile(String name, String extension) {
try {
File fl = File.createTempFile(name, extension);
fl.deleteOnExit();
return fl;
} catch (IOException ex) {
throw new ReviewedStingException("Cannot create temp file: " + ex.getMessage(), ex);
}
}
/**
* execute the test, given the following:
* @param name the name of the test

View File

@ -1,6 +1,5 @@
package org.broadinstitute.sting.utils.interval;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import net.sf.picard.reference.ReferenceSequenceFile;
import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
@ -14,7 +13,6 @@ import org.testng.annotations.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -362,16 +360,10 @@ public class IntervalUtilsUnitTest extends BaseTest {
}
private List<File> testFiles(String prefix, int count, String suffix) {
try {
ArrayList<File> files = new ArrayList<File>();
for (int i = 1; i <= count; i++) {
File tmpFile = File.createTempFile(prefix + i, suffix);
tmpFile.deleteOnExit();
files.add(tmpFile);
}
return files;
} catch (IOException e) {
throw new UserException.BadTmpDir("Unable to create temp file: " + e);
ArrayList<File> files = new ArrayList<File>();
for (int i = 1; i <= count; i++) {
files.add(createTempFile(prefix + i, suffix));
}
return files;
}
}

View File

@ -8,6 +8,7 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
* Runs a command line function.
*/
trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** A generated exec shell script. */
protected var exec: File = _
@ -42,24 +43,8 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
this.exec = IOUtils.writeTempFile(exec.toString, ".exec", "", jobStatusDir)
}
/**
* Removes all temporary files used for this LSF job.
*/
def removeTemporaryFiles() = {
override def removeTemporaryFiles() {
super.removeTemporaryFiles()
IOUtils.tryDelete(exec)
}
/**
* Outputs the last lines of the error logs.
*/
protected def tailError() = {
val errorFile = functionErrorFile
if (IOUtils.waitFor(errorFile, 120)) {
val tailLines = IOUtils.tail(errorFile, 100)
val nl = "%n".format()
logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
} else {
logger.error("Unable to access log file: %s".format(errorFile))
}
}
}

View File

@ -1,6 +1,8 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.QFunction
import java.io.{StringWriter, PrintWriter}
import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
/**
* An edge in the QGraph that runs a QFunction.
@ -8,7 +10,7 @@ import org.broadinstitute.sting.queue.function.QFunction
* and then the runner is specified later when the time comes to
* execute the function in the edge.
*/
class FunctionEdge(var function: QFunction) extends QEdge {
class FunctionEdge(var function: QFunction) extends QEdge with Logging {
var runner: JobRunner[_] =_
/**
@ -30,27 +32,99 @@ class FunctionEdge(var function: QFunction) extends QEdge {
RunnerStatus.PENDING
}
def start() {
try {
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.description)
} else {
logger.info("Starting: " + function.description)
}
logger.info("Output written to " + function.jobOutputFile)
if (function.jobErrorFile != null)
logger.info("Errors written to " + function.jobErrorFile)
function.deleteLogs()
function.deleteOutputs()
function.mkOutputDirectories()
runner.start()
} catch {
case e =>
currentStatus = RunnerStatus.FAILED
try {
runner.removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + function.description, e)
}
}
/**
* Returns the current status of the edge.
*/
def status = {
if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING)
if (runner != null)
currentStatus = runner.status
if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) {
if (runner != null) {
try {
currentStatus = runner.status
if (currentStatus == RunnerStatus.FAILED) {
try {
runner.removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
}
logger.error("Error: " + function.description)
tailError()
} else if (currentStatus == RunnerStatus.DONE) {
try {
runner.removeTemporaryFiles()
function.doneOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the done handler */
}
logger.info("Done: " + function.description)
}
} catch {
case e =>
currentStatus = RunnerStatus.FAILED
try {
runner.removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error retrieving status: " + function.description, e)
}
}
}
currentStatus
}
/**
* Explicitly sets the status of the runner to done..
*/
def markAsDone() {
currentStatus = RunnerStatus.DONE
}
/**
* Marks this edge as skipped as it is not needed for the current run.
*/
def markAsSkipped() = {
def markAsSkipped() {
currentStatus = RunnerStatus.SKIPPED
}
/**
* Resets the edge to pending status.
*/
def resetToPending(cleanOutputs: Boolean) = {
def resetToPending(cleanOutputs: Boolean) {
currentStatus = RunnerStatus.PENDING
if (cleanOutputs)
function.deleteOutputs()
@ -60,4 +134,45 @@ class FunctionEdge(var function: QFunction) extends QEdge {
def inputs = function.inputs
def outputs = function.outputs
override def dotString = function.dotString
/**
* Returns the path to the file to use for logging errors.
* @return the path to the file to use for logging errors.
*/
private def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile
/**
* Outputs the last lines of the error logs.
*/
private def tailError() = {
val errorFile = functionErrorFile
if (IOUtils.waitFor(errorFile, 120)) {
val maxLines = 100
val tailLines = IOUtils.tail(errorFile, maxLines)
val nl = "%n".format()
val summary = if (tailLines.size <= maxLines) "Last %d lines".format(maxLines) else "Contents"
logger.error("%s of %s:%n%s".format(summary, errorFile, tailLines.mkString(nl)))
} else {
logger.error("Unable to access log file: %s".format(errorFile))
}
}
/**
* Writes the contents of the error to the error file.
*/
private def writeError(content: String) {
IOUtils.writeContents(functionErrorFile, content)
}
/**
* Writes the stack trace to the error file.
*/
private def writeStackTrace(e: Throwable) {
val stackTrace = new StringWriter
val printWriter = new PrintWriter(stackTrace)
printWriter.println(function.description)
e.printStackTrace(printWriter)
printWriter.close
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
}
}

View File

@ -1,43 +1,20 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.util.IOUtils
/**
* Runs a function that executes in process and does not fork out an external process.
*/
class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProcessFunction] with Logging {
class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProcessFunction] {
private var runStatus: RunnerStatus.Value = _
def start() = {
try {
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.description)
} else {
logger.info("Starting: " + function.description)
}
function.deleteLogs()
function.deleteOutputs()
function.mkOutputDirectories()
runStatus = RunnerStatus.RUNNING
function.run()
function.doneOutputs.foreach(_.createNewFile())
writeDone()
runStatus = RunnerStatus.DONE
logger.info("Done: " + function.description)
} catch {
case e => {
runStatus = RunnerStatus.FAILED
try {
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + function.description, e)
}
}
runStatus = RunnerStatus.RUNNING
function.run()
val content = "%s%nDone.".format(function.description)
IOUtils.writeContents(function.jobOutputFile, content)
runStatus = RunnerStatus.DONE
}
def status = runStatus

View File

@ -1,7 +1,5 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.IOUtils
import java.io.{PrintWriter, StringWriter}
import org.broadinstitute.sting.queue.function.QFunction
/**
@ -29,30 +27,9 @@ trait JobRunner[TFunction <: QFunction] {
def function: TFunction
/**
* Writes the basic function description to the job done file.
* Removes all temporary files used for this job.
*/
protected def writeDone() {
val content = "%s%nDone.".format(function.description)
IOUtils.writeContents(function.jobOutputFile, content)
}
/**
* Writes the contents of the error to the error file.
*/
protected def writeError(content: String) {
IOUtils.writeContents(functionErrorFile, content)
}
/**
* Writes the stack trace to the error file.
*/
protected def writeStackTrace(e: Throwable) {
val stackTrace = new StringWriter
val printWriter = new PrintWriter(stackTrace)
printWriter.println(function.description)
e.printStackTrace(printWriter)
printWriter.close
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
def removeTemporaryFiles() {
}
/**
@ -65,10 +42,4 @@ trait JobRunner[TFunction <: QFunction] {
if (updater.isDefinedAt(value))
updater(value)
}
/**
* Returns the path to the file to use for logging errors.
* @return the path to the file to use for logging errors.
*/
protected def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile
}

View File

@ -14,183 +14,136 @@ import java.util.Date
/**
* Runs jobs on an LSF compute cluster.
*/
class Lsf706JobRunner(function: CommandLineFunction) extends LsfJobRunner(function) with Logging {
class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
// Run the static initializer for Lsf706JobRunner
Lsf706JobRunner
/** Job Id of the currently executing job. */
var jobId = -1L
/** Last known run status */
private var runStatus: RunnerStatus.Value = _
/**
* Dispatches the function on the LSF cluster.
* @param function Command to run.
*/
def start() = {
try {
val request = new submit
for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS)
request.rLimits(i) = LibLsf.DEFAULT_RLIMIT;
val request = new submit
for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS)
request.rLimits(i) = LibLsf.DEFAULT_RLIMIT;
request.outFile = function.jobOutputFile.getPath
request.options |= LibBat.SUB_OUT_FILE
request.outFile = function.jobOutputFile.getPath
request.options |= LibBat.SUB_OUT_FILE
if (function.jobErrorFile != null) {
request.errFile = function.jobErrorFile.getPath
request.options |= LibBat.SUB_ERR_FILE
}
if (function.jobProject != null) {
request.projectName = function.jobProject
request.options |= LibBat.SUB_PROJECT_NAME
}
if (function.jobQueue != null) {
request.queue = function.jobQueue
request.options |= LibBat.SUB_QUEUE
}
if (IOUtils.absolute(new File(".")) != function.commandDirectory) {
request.cwd = function.commandDirectory.getPath
request.options3 |= LibBat.SUB3_CWD
}
if (function.jobRestartable) {
request.options |= LibBat.SUB_RERUNNABLE
}
if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
request.options |= LibBat.SUB_RES_REQ
}
if (function.description != null) {
request.jobName = function.description.take(1000)
request.options |= LibBat.SUB_JOB_NAME
}
if (function.jobLimitSeconds.isDefined) {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get
}
writeExec()
request.command = "sh " + exec
// Allow advanced users to update the request.
updateJobRun(request)
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + bsubCommand)
} else {
logger.info("Starting: " + bsubCommand)
}
function.deleteLogs()
function.deleteOutputs()
function.mkOutputDirectories()
runStatus = RunnerStatus.RUNNING
Retry.attempt(() => {
val reply = new submitReply
jobId = LibBat.lsb_submit(request, reply)
if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10)
logger.info("Submitted LSF job id: " + jobId)
} catch {
case e =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + bsubCommand, e)
if (function.jobErrorFile != null) {
request.errFile = function.jobErrorFile.getPath
request.options |= LibBat.SUB_ERR_FILE
}
if (function.jobProject != null) {
request.projectName = function.jobProject
request.options |= LibBat.SUB_PROJECT_NAME
}
if (function.jobQueue != null) {
request.queue = function.jobQueue
request.options |= LibBat.SUB_QUEUE
}
if (IOUtils.absolute(new File(".")) != function.commandDirectory) {
request.cwd = function.commandDirectory.getPath
request.options3 |= LibBat.SUB3_CWD
}
if (function.jobRestartable) {
request.options |= LibBat.SUB_RERUNNABLE
}
if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
request.options |= LibBat.SUB_RES_REQ
}
if (function.description != null) {
request.jobName = function.description.take(1000)
request.options |= LibBat.SUB_JOB_NAME
}
if (function.jobLimitSeconds.isDefined) {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get
}
writeExec()
request.command = "sh " + exec
// Allow advanced users to update the request.
updateJobRun(request)
runStatus = RunnerStatus.RUNNING
Retry.attempt(() => {
val reply = new submitReply
jobId = LibBat.lsb_submit(request, reply)
if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10)
logger.info("Submitted LSF job id: " + jobId)
}
/**
* Updates and returns the status.
*/
def status = {
var jobStatus = LibBat.JOB_STAT_NULL
var exitStatus = 0
var exitInfo = 0
var endTime: NativeLong = null
LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
try {
var jobStatus = LibBat.JOB_STAT_NULL
var exitStatus = 0
var exitInfo = 0
var endTime: NativeLong = null
LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
try {
val jobInfo = LibBat.lsb_readjobinfo(null)
if (jobInfo == null) {
jobStatus = LibBat.JOB_STAT_UNKWN
exitStatus = 0
exitInfo = 0
endTime = null
} else {
jobStatus = jobInfo.status
exitStatus = jobInfo.exitStatus
exitInfo = jobInfo.exitInfo
endTime = jobInfo.endTime
}
} finally {
LibBat.lsb_closejobinfo()
}
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
}
logger.error("Error: " + bsubCommand + ", unknown status for " + unknownStatusMaxSeconds + " seconds.")
}
val jobInfo = LibBat.lsb_readjobinfo(null)
if (jobInfo == null) {
jobStatus = LibBat.JOB_STAT_UNKWN
exitStatus = 0
exitInfo = 0
endTime = null
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
} catch {
case _ => /* ignore errors in the error handler */
}
logger.error("Error: " + bsubCommand)
tailError()
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
removeTemporaryFiles()
function.doneOutputs.foreach(_.createNewFile())
runStatus = RunnerStatus.DONE
logger.info("Done: " + bsubCommand)
}
jobStatus = jobInfo.status
exitStatus = jobInfo.exitStatus
exitInfo = jobInfo.exitInfo
endTime = jobInfo.endTime
}
} catch {
case e =>
} finally {
LibBat.lsb_closejobinfo()
}
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + bsubCommand, e)
logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description))
}
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
runStatus = RunnerStatus.DONE
}
}
runStatus

View File

@ -1,17 +0,0 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util._
/**
* Runs jobs on an LSF compute cluster.
*/
abstract class LsfJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
protected var runStatus: RunnerStatus.Value = _
/** Job Id of the currently executing job. */
var jobId = -1L
// TODO: Full bsub command for debugging.
protected def bsubCommand = "bsub " + function.commandLine
}

View File

@ -1,74 +1,34 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob}
import org.broadinstitute.sting.queue.util.ShellJob
/**
* Runs jobs one at a time locally
*/
class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner with Logging {
class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRunner {
private var runStatus: RunnerStatus.Value = _
/**
* Runs the function on the local shell.
* @param function Command to run.
*/
def start() = {
try {
val job = new ShellJob
def start() {
val job = new ShellJob
job.workingDir = function.commandDirectory
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
job.workingDir = function.commandDirectory
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
writeExec()
job.shellScript = exec
writeExec()
job.shellScript = exec
// Allow advanced users to update the job.
updateJobRun(job)
// Allow advanced users to update the job.
updateJobRun(job)
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine)
} else {
logger.info("Starting: " + function.commandLine)
}
logger.info("Output written to " + function.jobOutputFile)
if (function.jobErrorFile != null)
logger.info("Errors written to " + function.jobErrorFile)
function.deleteLogs()
function.deleteOutputs()
function.mkOutputDirectories()
runStatus = RunnerStatus.RUNNING
job.run()
removeTemporaryFiles()
function.doneOutputs.foreach(_.createNewFile())
runStatus = RunnerStatus.DONE
logger.info("Done: " + function.commandLine)
} catch {
case jee: JobExitException =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeError(jee.getMessage)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + function.commandLine)
logger.error(jee.stdErr)
case e =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + function.commandLine, e)
}
runStatus = RunnerStatus.RUNNING
job.run()
runStatus = RunnerStatus.DONE
}
def status = runStatus

View File

@ -3,28 +3,84 @@ package org.broadinstitute.sting.queue.pipeline
import org.broadinstitute.sting.utils.Utils
import org.testng.Assert
import org.broadinstitute.sting.commandline.CommandLineProgram
import org.broadinstitute.sting.queue.util.ProcessController
import org.broadinstitute.sting.queue.QCommandLine
import java.io.File
import org.broadinstitute.sting.queue.util.{TextFormatUtils, ProcessController}
import java.util.Date
import java.text.SimpleDateFormat
import org.broadinstitute.sting.{WalkerTest, BaseTest}
object PipelineTest {
private var runningCommandLines = Set.empty[QCommandLine]
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
val run = System.getProperty("pipeline.run") == "run"
def executeTest(name: String, pipelineTest: PipelineTestSpec) {
println(Utils.dupString('-', 80));
executeTest(name, pipelineTest.args, pipelineTest.expectedException)
if (run) {
assertMatchingMD5s(name, pipelineTest.fileMD5s)
if (pipelineTest.evalSpec != null)
validateEval(name, pipelineTest.evalSpec)
println(" => %s PASSED".format(name))
}
else
println(" => %s PASSED DRY RUN".format(name))
}
private def assertMatchingMD5s(name: String, fileMD5s: Traversable[(File, String)]) {
var failed = 0
for ((file, expectedMD5) <- fileMD5s) {
val calculatedMD5 = BaseTest.testFileMD5(name, file, expectedMD5, false)
failed += 1
}
if (failed > 0)
Assert.fail("%d MD5%s did not match.".format(failed, TextFormatUtils.plural(failed)))
}
private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) {
// write the report to the shared validation data location
val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss")
val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, name, formatter.format(new Date))
new File(reportLocation).getParentFile.mkdirs
// Run variant eval generating the report and validating the pipeline vcf.
var walkerCommand = "-T VariantEval -R %s -B:eval,VCF %s -E %s -reportType R -reportLocation %s -L %s"
.format(evalSpec.reference, evalSpec.vcf, evalSpec.evalModules.mkString(" -E "), reportLocation, evalSpec.intervals)
if (evalSpec.dbsnp != null) {
val dbsnpArg = if (evalSpec.dbsnp.getName.toLowerCase.endsWith(".vcf")) "-B:dbsnp,VCF" else "-D"
walkerCommand += " %s %s".format(dbsnpArg, evalSpec.dbsnp)
}
if (evalSpec.intervals != null)
walkerCommand += " -L %s".format(evalSpec.intervals)
for (validation <- evalSpec.validations) {
walkerCommand += " -summary %s".format(validation.metric)
walkerCommand += " -validate '%1$s >= %2$s' -validate '%1$s <= %3$s'".format(
validation.metric, validation.min, validation.max)
}
WalkerTest.executeTest(name + "-validate", walkerCommand, null)
}
/**
* execute the test
* @param name the name of the test
* @param args the argument list
* @param expectedException the expected exception or null if no exception is expected.
*/
def executeTest(name: String, args: String, expectedException: Class[_]) = {
def executeTest(name: String, args: String, expectedException: Class[_]) {
var command = Utils.escapeExpressions(args)
// add the logging level to each of the integration test commands
command = Utils.appendArray(command, "-l", "WARN", "-startFromScratch", "-tempDir", tempDir(name), "-runDir", runDir(name))
command = Utils.appendArray(command, "-bsub", "-l", "WARN", "-startFromScratch", "-tempDir", tempDir(name), "-runDir", runDir(name))
if (run)
command = Utils.appendArray(command, "-run", "-bsub")
command = Utils.appendArray(command, "-run")
// run the executable
var gotAnException = false
@ -68,13 +124,14 @@ object PipelineTest {
}
}
val currentDir = new File(".").getAbsolutePath
def testDir(testName: String) = "pipelinetests/%s/".format(testName)
def runDir(testName: String) = testDir(testName) + "run/"
def tempDir(testName: String) = testDir(testName) + "temp/"
Runtime.getRuntime.addShutdownHook(new Thread {
/** Cleanup as the JVM shuts down. */
override def run = {
override def run {
try {
ProcessController.shutdown()
} catch {

View File

@ -0,0 +1,44 @@
package org.broadinstitute.sting.queue.pipeline
import java.io.File
/**
* Data validations to evaluate on a VCF using VariantEval.
*/
class PipelineTestEvalSpec {
// TODO: Reuse the Project "YAML" object for reference, intervals, etc.
/** VCF to eval */
var vcf: File = _
/** Reference for the VCF */
var reference: File = _
/** Intervals for the VCF */
var intervals: File = _
/** DBSNP to use for comparisons, via -B:dbsnp,VCF or -D */
var dbsnp: File = _
/** List of eval modules to output. */
var evalModules = List("CompOverlap", "CountFunctionalClasses", "CountVariants", "SimpleMetricsBySample", "TiTvVariantEvaluator")
/** Validations to assert. */
var validations: List[PipelineValidation] = Nil
}
/** A VariantEval JEXL and range of values to validate. */
class PipelineValidation(val metric: String, val min: String, val max: String) {
}
/** A VariantEval JEXL and target to validate within a 1% tolerance. */
class IntegerValidation(metric: String, target: Int)
extends PipelineValidation(metric,
(target * .99).floor.toInt.toString, (target * 1.01).ceil.toInt.toString) {
}
/** A VariantEval JEXL and target to validate within a 1% tolerance. */
class DoubleValidation(metric: String, target: Double)
extends PipelineValidation(metric,
"%.2f".format((target * 99).floor / 100), "%.2f".format((target * 101).ceil / 100)) {
}

View File

@ -0,0 +1,30 @@
package org.broadinstitute.sting.queue.pipeline
import java.io.File
class PipelineTestSpec {
/** The arguments to pass to the Queue test, ex: "-S scala/qscript/examples/HelloWorld.scala" */
var args: String = _
/** Expected MD5 results for each file path. */
var fileMD5s = Map.empty[File, String]
/** VariantEval validations to run on a VCF after the pipeline has completed. */
var evalSpec: PipelineTestEvalSpec = _
/** Expected exception from the test. */
var expectedException: Class[_ <: Exception] = null
def this(args: String, fileMD5s: Traversable[(File, String)]) = {
this()
this.args = args
this.fileMD5s = fileMD5s.toMap
}
def this(args: String, expectedException: Class[_ <: Exception]) = {
this()
this.args = args
this.expectedException = expectedException
}
}

View File

@ -0,0 +1,16 @@
package org.broadinstitute.sting.queue.pipeline.examples
import org.testng.annotations.Test
import org.broadinstitute.sting.queue.pipeline.{PipelineTest, PipelineTestSpec}
class HelloWorldPipelineTest {
@Test
def testHelloWorld {
var testName = "helloworld"
val spec = new PipelineTestSpec
spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPrefix HelloWorld -jobQueue hour"
// TODO: working example of MD5 usage.
// spec.fileMD5s += new File(PipelineTest.runDir(testName) + "hello.out") -> "0123456789abcdef0123456789abcdef"
PipelineTest.executeTest(testName, spec)
}
}

View File

@ -1,19 +1,16 @@
package org.broadinstitute.sting.queue.pipeline
package org.broadinstitute.sting.queue.pipeline.playground
import org.testng.annotations.{DataProvider, Test}
import collection.JavaConversions._
import java.io.File
import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline}
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.{WalkerTest, BaseTest}
import java.text.SimpleDateFormat
import java.util.Date
import org.broadinstitute.sting.BaseTest
import org.broadinstitute.sting.queue.pipeline._
class FullCallingPipelineTest extends BaseTest {
class FullCallingPipelineTest {
def datasets = List(k1gChr20Dataset, k1gExomeDataset)
private final val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
// In fullCallingPipeline.q VariantEval is always compared against 129.
// Until the newvarianteval is finalized which will allow java import of the prior results,
// we re-run VariantEval to validate the run, and replicate that behavior here.
@ -93,12 +90,11 @@ class FullCallingPipelineTest extends BaseTest {
var cleanType = "cleaned"
// Run the pipeline with the expected inputs.
val currentDir = new File(".").getAbsolutePath
var pipelineCommand = ("-retry 1 -S scala/qscript/playground/fullCallingPipeline.q" +
" -jobProject %s -Y %s" +
" -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" +
" --gatkjar %s/dist/GenomeAnalysisTK.jar")
.format(projectName, yamlFile, currentDir, currentDir)
.format(projectName, yamlFile, PipelineTest.currentDir, PipelineTest.currentDir)
if (!dataset.runIndelRealigner) {
pipelineCommand += " -skipCleaning"
@ -108,37 +104,18 @@ class FullCallingPipelineTest extends BaseTest {
if (dataset.jobQueue != null)
pipelineCommand += " -jobQueue " + dataset.jobQueue
val pipelineSpec = new PipelineTestSpec
pipelineSpec.args = pipelineCommand
pipelineSpec.evalSpec = new PipelineTestEvalSpec
pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType))
pipelineSpec.evalSpec.reference = dataset.pipeline.getProject.getReferenceFile
pipelineSpec.evalSpec.intervals = dataset.pipeline.getProject.getIntervalList
pipelineSpec.evalSpec.dbsnp = variantEvalDbsnpFile
pipelineSpec.evalSpec.validations = dataset.validations
// Run the test, at least checking if the command compiles
PipelineTest.executeTest(testName, pipelineCommand, null)
// If actually running, evaluate the output validating the expressions.
if (PipelineTest.run) {
// path where the pipeline should have output the handfiltered vcf
val handFilteredVcf = PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType)
// eval modules to record in the validation directory
val evalModules = List("CompOverlap", "CountFunctionalClasses", "CountVariants", "SimpleMetricsBySample", "TiTvVariantEvaluator")
// write the report to the shared validation data location
val formatter = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss")
val reportLocation = "%s%s/validation.%s.eval".format(validationReportsDataLocation, testName, formatter.format(new Date))
new File(reportLocation).getParentFile.mkdirs
// Run variant eval generating the report and validating the pipeline vcfs.
var walkerCommand = ("-T VariantEval -R %s -D %s -B:eval,VCF %s" +
" -E %s -reportType R -reportLocation %s -L %s")
.format(
dataset.pipeline.getProject.getReferenceFile, variantEvalDbsnpFile, handFilteredVcf,
evalModules.mkString(" -E "), reportLocation, dataset.pipeline.getProject.getIntervalList)
for (validation <- dataset.validations) {
walkerCommand += " -summary %s".format(validation.metric)
walkerCommand += " -validate '%1$s >= %2$s' -validate '%1$s <= %3$s'".format(
validation.metric, validation.min, validation.max)
}
WalkerTest.executeTest("fullCallingPipelineValidate-" + projectName, walkerCommand, null)
}
PipelineTest.executeTest(testName, pipelineSpec)
}
class PipelineDataset(
@ -149,22 +126,8 @@ class FullCallingPipelineTest extends BaseTest {
override def toString = pipeline.getProject.getName
}
class PipelineValidation(val metric: String, val min: String, val max: String) {
}
class IntegerValidation(metric: String, target: Int)
extends PipelineValidation(metric,
(target * .99).floor.toInt.toString, (target * 1.01).ceil.toInt.toString) {
}
class DoubleValidation(metric: String, target: Double)
extends PipelineValidation(metric,
"%.2f".format((target * 99).floor / 100), "%.2f".format((target * 101).ceil / 100)) {
}
private def writeTempYaml(pipeline: Pipeline) = {
val tempFile = File.createTempFile(pipeline.getProject.getName + "-", ".yaml")
tempFile.deleteOnExit
val tempFile = BaseTest.createTempFile(pipeline.getProject.getName + "-", ".yaml")
YamlUtils.dump(pipeline, tempFile)
tempFile
}

View File

@ -2,10 +2,9 @@ package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.BaseTest
import org.testng.annotations.Test
import java.io.File
import org.testng.Assert
class ShellJobUnitTest extends BaseTest {
class ShellJobUnitTest {
@Test
def testEcho {
val job = new ShellJob
@ -33,19 +32,19 @@ class ShellJobUnitTest extends BaseTest {
job = new ShellJob
job.shellScript = writeScript("echo #")
job.outputFile = File.createTempFile("temp", "")
job.outputFile = BaseTest.createTempFile("temp", "")
job.run()
Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, "")
job = new ShellJob
job.shellScript = writeScript("""echo \#""")
job.outputFile = File.createTempFile("temp", "")
job.outputFile = BaseTest.createTempFile("temp", "")
job.run()
Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, "#")
job = new ShellJob
job.shellScript = writeScript("""echo \\#""")
job.outputFile = File.createTempFile("temp", "")
job.outputFile = BaseTest.createTempFile("temp", "")
job.run()
Assert.assertEquals(IOUtils.readContents(job.outputFile).trim, """\#""")
}
@ -66,11 +65,9 @@ class ShellJobUnitTest extends BaseTest {
job.run()
}
private val tempDir = new File(System.getProperty("java.io.tmpdir"))
private def writeScript(contents: String) = {
val script = IOUtils.writeTempFile(contents, "temp", "", tempDir)
script.deleteOnExit
script
val file = BaseTest.createTempFile("temp", "")
IOUtils.writeContents(file, contents)
file
}
}