Added pipeline for calling FCP in batches called MultiFullCallingPipeline.

Bug smashes for the MCFP:
  Synchronized access to LSF library and modifications to the QGraph.
  If values are missing from the graph with -run make sure to exit with a non-zero.
  Refactored QGraph to pre-generate a unique Int for each QNode speeding up getHashCode/equals inside the graph.
  Added jobPriority and removed jobLimitSeconds from QFunction.
  All scatter gather is by default in a single sub directory queueScatterGather.
  Moved some FCPTest into BaseTest/PipelineTest for use by MFCPTest.
  Rev'ed the 1000G bams used for validation from v1 to v2 and added code to look for the bams before running other tests.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5247 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-02-15 18:26:14 +00:00
parent 7598f5f6a7
commit d185c2961f
17 changed files with 826 additions and 404 deletions

View File

@ -66,6 +66,10 @@ public abstract class BaseTest {
public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod"; public static final String b37dbSNP129 = dbsnpDataLocation + "dbsnp_129_b37.rod";
public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf"; public static final String b37dbSNP132 = dbsnpDataLocation + "dbsnp_132_b37.vcf";
public static final String intervalsLocation = GATKDataLocation;
public static final String hg19Intervals = intervalsLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list";
public static final String hg19Chr20Intervals = intervalsLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list";
public static final String networkTempDir = "/broad/shptmp/"; public static final String networkTempDir = "/broad/shptmp/";
public static final File networkTempDirFile = new File(networkTempDir); public static final File networkTempDirFile = new File(networkTempDir);

View File

@ -86,6 +86,9 @@ public class LibBatIntegrationTest extends BaseTest {
req.outFile = outFile.getPath(); req.outFile = outFile.getPath();
req.options |= LibBat.SUB_OUT_FILE; req.options |= LibBat.SUB_OUT_FILE;
req.userPriority = 100;
req.options2 |= LibBat.SUB2_JOB_PRIORITY;
req.command = "echo \"Hello world.\""; req.command = "echo \"Hello world.\"";
submitReply reply = new submitReply(); submitReply reply = new submitReply();

View File

@ -0,0 +1,88 @@
import collection.JavaConversions
import org.broadinstitute.sting.queue.function.JarCommandLineFunction
import org.broadinstitute.sting.queue.QScript
import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.utils.text.XReadLines
class MultiFullCallingPipeline extends QScript {
qscript =>
@Input(doc="Sting home", shortName="stingHome")
var stingHome: File = _
@Input(doc="yaml lists to run", shortName="YL")
var yamlList: File = _
@Argument(doc="number of jobs per batch", shortName="BS")
var batchSize: Int = _
@Argument(doc="pipeline status to", shortName="PS", required = false)
var pipelineStatusTo: String = _
@Argument(doc="pipeline job queue", shortName="PJQ", required = false)
var pipelineJobQueue: String = _
@Argument(doc="pipeline short queue", shortName="PSQ", required = false)
var pipelineShortQueue: String = _
@Argument(doc="pipeline priority", shortName="PP", required = false)
var pipelinePriority: Option[Int] = None
def script {
// Global arguments for all pipeline runs
stingHome = IOUtils.absolute(stingHome)
val queueJar = new File(stingHome, "dist/Queue.jar")
val pipelineScript = new File(stingHome, "scala/qscript/playground/FullCallingPipeline.q")
val gatkJar = new File(stingHome, "dist/GenomeAnalysisTK.jar")
val tearScript = new File(stingHome, "R/DataProcessingReport/GetTearsheetStats.R")
// Parse the yaml list
var yamls = List.empty[File]
for (yaml <- JavaConversions.asScalaIterator(new XReadLines(yamlList)))
yamls :+= new File(yaml)
// The list of previous outputs
val lastOuts = new Array[File](batchSize)
for (yamlGroup <- yamls.grouped(batchSize)) {
for ((yaml, i) <- yamlGroup.zipWithIndex) {
// Get the last output for index(i), which is null for the first job.
val lastOut = lastOuts(i)
// Run the pipeline on the yaml waiting for the last output.
val runPipeline = new RunPipeline(yaml, lastOut)
// Add this run to the graph.
add(runPipeline)
// Have the next job at index(i) wait for this output file.
lastOuts(i) = runPipeline.jobOutputFile
}
}
/**
* Runs a yaml in a pipeline only after a previous pipeline
* run has produced the passed in output file.
*/
class RunPipeline(yamlFile: File, lastOutput: File) extends JarCommandLineFunction {
@Input(doc="output file to wait for", required=false)
var waitJobOutputFile = lastOutput
commandDirectory = yamlFile.getParentFile
jobOutputFile = IOUtils.absolute(commandDirectory, "queue.out")
jarFile = queueJar
memoryLimit = Some(1)
private var yamlName = yamlFile.getName.stripSuffix(".yaml")
override def commandLine = super.commandLine +
optional(" -statusTo ", qscript.pipelineStatusTo) +
optional(" -jobQueue ", qscript.pipelineJobQueue) +
optional(" -shortJobQueue ", qscript.pipelineShortQueue) +
optional(" -jobPriority ", qscript.pipelinePriority) +
" -S %s --gatkjar %s -tearScript %s -jobProject %s -jobPrefix %s -Y %s -bsub -run"
.format(pipelineScript, gatkJar, tearScript, yamlName, yamlName, yamlFile)
override def dotString = "Queue: " + yamlName
}
}
}

View File

@ -48,11 +48,11 @@ class QCommandLine extends CommandLineProgram with Logging {
logger.info("Added " + script.functions.size + " functions") logger.info("Added " + script.functions.size + " functions")
} }
qGraph.run qGraph.run()
if (qGraph.hasFailed) { if (!qGraph.success) {
logger.info("Done with errors") logger.info("Done with errors")
qGraph.logFailed qGraph.logFailed()
1 1
} else { } else {
logger.info("Done") logger.info("Done")

View File

@ -17,6 +17,9 @@ class QSettings {
@Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false) @Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false)
var jobProject: String = "Queue" var jobProject: String = "Queue"
@Argument(fullName="job_priority", shortName="jobPriority", doc="Default priority for jobs.", required=false)
var jobPriority: Option[Int] = None
@Argument(fullName="job_scatter_gather_directory", shortName="jobSGDir", doc="Default directory to place scatter gather output for compute farm jobs.", required=false) @Argument(fullName="job_scatter_gather_directory", shortName="jobSGDir", doc="Default directory to place scatter gather output for compute farm jobs.", required=false)
var jobScatterGatherDirectory: File = _ var jobScatterGatherDirectory: File = _

View File

@ -10,7 +10,7 @@ import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
* and then the runner is specified later when the time comes to * and then the runner is specified later when the time comes to
* execute the function in the edge. * execute the function in the edge.
*/ */
class FunctionEdge(var function: QFunction) extends QEdge with Logging { class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNode) extends QEdge with Logging {
var runner: JobRunner[_] =_ var runner: JobRunner[_] =_
/** /**
@ -131,8 +131,6 @@ class FunctionEdge(var function: QFunction) extends QEdge with Logging {
runner = null runner = null
} }
def inputs = function.inputs
def outputs = function.outputs
override def dotString = function.dotString override def dotString = function.dotString
/** /**

View File

@ -31,125 +31,132 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
* @param function Command to run. * @param function Command to run.
*/ */
def start() = { def start() = {
val request = new submit Lsf706JobRunner.lsfLibLock.synchronized {
for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS) val request = new submit
for (i <- 0 until LibLsf.LSF_RLIM_NLIMITS)
request.rLimits(i) = LibLsf.DEFAULT_RLIMIT; request.rLimits(i) = LibLsf.DEFAULT_RLIMIT;
request.outFile = function.jobOutputFile.getPath request.outFile = function.jobOutputFile.getPath
request.options |= LibBat.SUB_OUT_FILE request.options |= LibBat.SUB_OUT_FILE
if (function.jobErrorFile != null) { if (function.jobErrorFile != null) {
request.errFile = function.jobErrorFile.getPath request.errFile = function.jobErrorFile.getPath
request.options |= LibBat.SUB_ERR_FILE request.options |= LibBat.SUB_ERR_FILE
} }
if (function.jobProject != null) { if (function.jobProject != null) {
request.projectName = function.jobProject request.projectName = function.jobProject
request.options |= LibBat.SUB_PROJECT_NAME request.options |= LibBat.SUB_PROJECT_NAME
} }
if (function.jobQueue != null) { if (function.jobQueue != null) {
request.queue = function.jobQueue request.queue = function.jobQueue
request.options |= LibBat.SUB_QUEUE request.options |= LibBat.SUB_QUEUE
} }
if (IOUtils.absolute(new File(".")) != function.commandDirectory) { if (IOUtils.absolute(new File(".")) != function.commandDirectory) {
request.cwd = function.commandDirectory.getPath request.cwd = function.commandDirectory.getPath
request.options3 |= LibBat.SUB3_CWD request.options3 |= LibBat.SUB3_CWD
} }
if (function.jobRestartable) { if (function.jobRestartable) {
request.options |= LibBat.SUB_RERUNNABLE request.options |= LibBat.SUB_RERUNNABLE
} }
if (function.memoryLimit.isDefined) { if (function.memoryLimit.isDefined) {
request.resReq = "rusage[mem=" + function.memoryLimit.get + "]" request.resReq = "rusage[mem=" + function.memoryLimit.get + "]"
request.options |= LibBat.SUB_RES_REQ request.options |= LibBat.SUB_RES_REQ
} }
if (function.description != null) { if (function.description != null) {
request.jobName = function.description.take(1000) request.jobName = function.description.take(1000)
request.options |= LibBat.SUB_JOB_NAME request.options |= LibBat.SUB_JOB_NAME
} }
if (function.jobPriority.isDefined) {
request.userPriority = function.jobPriority.get
request.options2 |= LibBat.SUB2_JOB_PRIORITY
}
if (function.jobLimitSeconds.isDefined) {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = function.jobLimitSeconds.get
} else {
request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue) request.rLimits(LibLsf.LSF_RLIMIT_RUN) = Lsf706JobRunner.getRlimitRun(function.jobQueue)
writeExec()
request.command = "sh " + exec
// Allow advanced users to update the request.
updateJobRun(request)
runStatus = RunnerStatus.RUNNING
Retry.attempt(() => {
val reply = new submitReply
jobId = LibBat.lsb_submit(request, reply)
if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10)
logger.info("Submitted LSF job id: " + jobId)
} }
writeExec()
request.command = "sh " + exec
// Allow advanced users to update the request.
updateJobRun(request)
runStatus = RunnerStatus.RUNNING
Retry.attempt(() => {
val reply = new submitReply
jobId = LibBat.lsb_submit(request, reply)
if (jobId < 0)
throw new QException(LibBat.lsb_sperror("Unable to submit job"))
}, 1, 5, 10)
logger.info("Submitted LSF job id: " + jobId)
} }
/** /**
* Updates and returns the status. * Updates and returns the status.
*/ */
def status = { def status = {
var jobStatus = LibBat.JOB_STAT_UNKWN Lsf706JobRunner.lsfLibLock.synchronized {
var exitStatus = 0 var jobStatus = LibBat.JOB_STAT_UNKWN
var exitInfo = 0 var exitStatus = 0
var endTime: NativeLong = null var exitInfo = 0
var endTime: NativeLong = null
val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) val result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB)
if (result < 0) if (result < 0)
throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId)) throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId))
try { try {
if (result > 0) { if (result > 1)
val more = new IntByReference(result) throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId))
val jobInfo = LibBat.lsb_readjobinfo(more) else if (result == 1) {
if (jobInfo == null) val more = new IntByReference(result)
throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId)) val jobInfo = LibBat.lsb_readjobinfo(more)
jobStatus = jobInfo.status if (jobInfo == null)
exitStatus = jobInfo.exitStatus throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId))
exitInfo = jobInfo.exitInfo jobStatus = jobInfo.status
endTime = jobInfo.endTime exitStatus = jobInfo.exitStatus
exitInfo = jobInfo.exitInfo
endTime = jobInfo.endTime
}
} finally {
LibBat.lsb_closejobinfo()
} }
} finally {
LibBat.lsb_closejobinfo() logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description))
}
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
runStatus = RunnerStatus.DONE
}
}
runStatus
} }
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description))
}
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
runStatus = RunnerStatus.DONE
}
}
runStatus
} }
/** /**
@ -171,6 +178,8 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
} }
object Lsf706JobRunner extends Logging { object Lsf706JobRunner extends Logging {
private val lsfLibLock = new Object
init() init()
/** The name of the default queue. */ /** The name of the default queue. */
@ -183,8 +192,10 @@ object Lsf706JobRunner extends Logging {
* Initialize the Lsf library. * Initialize the Lsf library.
*/ */
private def init() = { private def init() = {
if (LibBat.lsb_init("Queue") < 0) lsfLibLock.synchronized {
throw new QException(LibBat.lsb_sperror("lsb_init() failed")) if (LibBat.lsb_init("Queue") < 0)
throw new QException(LibBat.lsb_sperror("lsb_init() failed"))
}
} }
/** /**
@ -194,33 +205,35 @@ object Lsf706JobRunner extends Logging {
* @return the run limit in seconds for the queue. * @return the run limit in seconds for the queue.
*/ */
def getRlimitRun(queue: String) = { def getRlimitRun(queue: String) = {
if (queue == null) { lsfLibLock.synchronized {
if (defaultQueue != null) { if (queue == null) {
queueRlimitRun(defaultQueue) if (defaultQueue != null) {
} else { queueRlimitRun(defaultQueue)
// Get the info on the default queue. } else {
val numQueues = new IntByReference(1) // Get the info on the default queue.
val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
defaultQueue = queueInfo.queue
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += defaultQueue -> limit
limit
}
} else {
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array[String](queue))
val numQueues = new IntByReference(1) val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0) val queueInfo = LibBat.lsb_queueinfo(null, numQueues, null, null, 0)
if (queueInfo == null) if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue)) throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for the default queue"))
defaultQueue = queueInfo.queue
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN) val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit queueRlimitRun += defaultQueue -> limit
limit limit
}
} else {
queueRlimitRun.get(queue) match {
case Some(limit) => limit
case None =>
// Cache miss. Go get the run limits from LSF.
val queues = new StringArray(Array[String](queue))
val numQueues = new IntByReference(1)
val queueInfo = LibBat.lsb_queueinfo(queues, numQueues, null, null, 0)
if (queueInfo == null)
throw new QException(LibBat.lsb_sperror("Unable to get LSF queue info for queue: " + queue))
val limit = queueInfo.rLimits(LibLsf.LSF_RLIMIT_RUN)
queueRlimitRun += queue -> limit
limit
}
} }
} }
} }
@ -230,23 +243,25 @@ object Lsf706JobRunner extends Logging {
* @param runners Runners to stop. * @param runners Runners to stop.
*/ */
def tryStop(runners: List[Lsf706JobRunner]) { def tryStop(runners: List[Lsf706JobRunner]) {
for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) { lsfLibLock.synchronized {
try { for (jobRunners <- runners.filterNot(_.jobId < 0).grouped(10)) {
val njobs = jobRunners.size try {
val signalJobs = new signalBulkJobs val njobs = jobRunners.size
signalJobs.jobs = { val signalJobs = new signalBulkJobs
val jobIds = new Memory(8 * njobs) signalJobs.jobs = {
jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs) val jobIds = new Memory(8 * njobs)
jobIds jobIds.write(0, jobRunners.map(_.jobId).toArray, 0, njobs)
} jobIds
signalJobs.njobs = njobs }
signalJobs.signal = 9 signalJobs.njobs = njobs
signalJobs.signal = 9
if (LibBat.lsb_killbulkjobs(signalJobs) < 0) if (LibBat.lsb_killbulkjobs(signalJobs) < 0)
throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed")) throw new QException(LibBat.lsb_sperror("lsb_killbulkjobs failed"))
} catch { } catch {
case e => case e =>
logger.error("Unable to kill all jobs.", e) logger.error("Unable to kill all jobs.", e)
}
} }
} }
} }

View File

@ -1,12 +1,10 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import java.io.File
/** /**
* Utility class to map a set of inputs to set of outputs. * Utility class to map a set of inputs to set of outputs.
* The QGraph uses this function internally to map between user defined functions. * The QGraph uses this function internally to map between user defined functions.
*/ */
class MappingEdge(val inputs: Set[File], val outputs: Set[File]) extends QEdge { class MappingEdge(val inputs: QNode, val outputs: QNode) extends QEdge {
/** /**
* For debugging purposes returns <map>. * For debugging purposes returns <map>.
* @return <map> * @return <map>

View File

@ -1,23 +1,32 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import java.io.File
/** /**
* An edge in the QGraph * An edge in the QGraph
*/ */
trait QEdge { trait QEdge {
/** /**
* Set of inputs for this function. * List of inputs for this function sorted by path.
*/ */
def inputs: Set[File] def inputs: QNode
/** /**
* Set of outputs for this function. * List of outputs for this function sorted by path.
*/ */
def outputs: Set[File] def outputs: QNode
/** /**
* The function description in .dot files * The function description in .dot files
*/ */
def dotString = "" def dotString = ""
override def hashCode = inputs.hashCode + outputs.hashCode
override def equals(obj: Any) = {
obj match {
case other: QEdge =>
this.inputs == other.inputs &&
this.outputs == other.outputs
case _ => false
}
}
} }

View File

@ -22,8 +22,10 @@ class QGraph extends Logging {
var debugMode = false var debugMode = false
private def dryRun = !settings.run private def dryRun = !settings.run
private var numMissingValues = 0
private val jobGraph = newGraph private val jobGraph = newGraph
private var shuttingDown = false private var running = true
private val runningLock = new Object
private val nl = "%n".format() private val nl = "%n".format()
private val inProcessManager = new InProcessJobManager private val inProcessManager = new InProcessJobManager
@ -35,9 +37,15 @@ class QGraph extends Logging {
*/ */
def add(command: QFunction) { def add(command: QFunction) {
try { try {
command.qSettings = settings.qSettings runningLock.synchronized {
command.freeze if (running) {
addEdge(new FunctionEdge(command)) command.qSettings = settings.qSettings
command.freeze
val inputs = getQNode(command.inputs.toList.sortWith(_.compareTo(_) < 0))
val outputs = getQNode(command.outputs.toList.sortWith(_.compareTo(_) < 0))
addEdge(new FunctionEdge(command, inputs, outputs))
}
}
} catch { } catch {
case e: Exception => case e: Exception =>
throw new QException("Error adding function: " + command, e) throw new QException("Error adding function: " + command, e)
@ -47,42 +55,45 @@ class QGraph extends Logging {
/** /**
* Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph.
*/ */
def run = { def run() {
runningLock.synchronized {
if (running) {
IOUtils.checkTempDir(settings.qSettings.tempDirectory)
fillGraph
val isReady = numMissingValues == 0
IOUtils.checkTempDir(settings.qSettings.tempDirectory) if (this.jobGraph.edgeSet.isEmpty) {
val numMissingValues = fillGraph logger.warn("Nothing to run! Were any Functions added?");
val isReady = numMissingValues == 0 } else if (settings.getStatus) {
logger.info("Checking pipeline status.")
logStatus()
} else if (this.dryRun) {
dryRunJobs()
} else if (isReady) {
logger.info("Running jobs.")
runJobs()
}
if (this.jobGraph.edgeSet.isEmpty) { if (numMissingValues > 0) {
logger.warn("Nothing to run! Were any Functions added?"); logger.error("Total missing values: " + numMissingValues)
} else if (settings.getStatus) { }
logger.info("Checking pipeline status.")
logStatus()
} else if (this.dryRun) {
dryRunJobs()
} else if (isReady) {
logger.info("Running jobs.")
runJobs()
}
if (numMissingValues > 0) { if (running && isReady && this.dryRun) {
logger.error("Total missing values: " + numMissingValues) logger.info("Dry run completed successfully!")
} logger.info("Re-run with \"-run\" to execute the functions.")
}
if (isReady && this.dryRun) { }
logger.info("Dry run completed successfully!")
logger.info("Re-run with \"-run\" to execute the functions.")
} }
} }
private def fillGraph = { private def fillGraph {
logger.info("Generating graph.") logger.info("Generating graph.")
fill fill
if (settings.dotFile != null) if (settings.dotFile != null)
renderToDot(settings.dotFile) renderToDot(settings.dotFile)
var numMissingValues = validate validate()
if (numMissingValues == 0 && settings.bsubAllJobs) { if (running && numMissingValues == 0 && settings.bsubAllJobs) {
logger.info("Generating scatter gather jobs.") logger.info("Generating scatter gather jobs.")
val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge))
@ -98,20 +109,18 @@ class QGraph extends Logging {
logger.info("Removing original jobs.") logger.info("Removing original jobs.")
this.jobGraph.removeAllEdges(scatterGathers) this.jobGraph.removeAllEdges(scatterGathers)
prune prune()
logger.info("Adding scatter gather jobs.") logger.info("Adding scatter gather jobs.")
addedFunctions.foreach(this.add(_)) addedFunctions.foreach(function => if (running) this.add(function))
logger.info("Regenerating graph.") logger.info("Regenerating graph.")
fill fill
val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile
if (scatterGatherDotFile != null) if (scatterGatherDotFile != null)
renderToDot(scatterGatherDotFile) renderToDot(scatterGatherDotFile)
numMissingValues = validate validate()
} }
numMissingValues
} }
private def scatterGatherable(edge: QEdge) = { private def scatterGatherable(edge: QEdge) = {
@ -153,23 +162,25 @@ class QGraph extends Logging {
* Fills in the graph using mapping functions, then removes out of date * Fills in the graph using mapping functions, then removes out of date
* jobs, then cleans up mapping functions and nodes that aren't need. * jobs, then cleans up mapping functions and nodes that aren't need.
*/ */
private def fill = { private def fill() {
fillIn fillIn()
prune prune()
} }
/** /**
* Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs. * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs.
*/ */
private def fillIn = { private def fillIn() {
// clone since edgeSet is backed by the graph // clone since edgeSet is backed by the graph
asScalaSet(jobGraph.edgeSet).clone.foreach { asScalaSet(jobGraph.edgeSet).clone.foreach(edge => {
case cmd: FunctionEdge => { if (running) edge match {
addCollectionOutputs(cmd.outputs) case cmd: FunctionEdge => {
addCollectionInputs(cmd.inputs) addCollectionOutputs(cmd.outputs)
addCollectionInputs(cmd.inputs)
}
case map: MappingEdge => /* do nothing for mapping edges */
} }
case map: MappingEdge => /* do nothing for mapping edges */ })
}
} }
private def getReadyJobs = { private def getReadyJobs = {
@ -190,37 +201,40 @@ class QGraph extends Logging {
/** /**
* Removes mapping edges that aren't being used, and nodes that don't belong to anything. * Removes mapping edges that aren't being used, and nodes that don't belong to anything.
*/ */
private def prune = { private def prune() {
var pruning = true var pruning = true
while (pruning) { while (pruning) {
pruning = false pruning = false
val filler = jobGraph.edgeSet.filter(isFiller(_)) val filler = jobGraph.edgeSet.filter(isFiller(_))
if (filler.size > 0) { if (filler.size > 0) {
jobGraph.removeAllEdges(filler) jobGraph.removeAllEdges(filler)
pruning = true pruning = running
} }
} }
jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_))) if (running)
jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_)))
} }
/** /**
* Validates that the functions in the graph have no missing values and that there are no cycles. * Validates that the functions in the graph have no missing values and that there are no cycles.
* @return Number of missing values.
*/ */
private def validate = { private def validate() {
var numMissingValues = 0 asScalaSet(jobGraph.edgeSet).foreach(
asScalaSet(jobGraph.edgeSet).foreach { edge =>
case cmd: FunctionEdge => if (running) edge match
val missingFieldValues = cmd.function.missingFields {
if (missingFieldValues.size > 0) { case cmd: FunctionEdge =>
numMissingValues += missingFieldValues.size val missingFieldValues = cmd.function.missingFields
logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description)) if (missingFieldValues.size > 0) {
for (missing <- missingFieldValues) numMissingValues += missingFieldValues.size
logger.error(" " + missing) logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.function.description))
for (missing <- missingFieldValues)
logger.error(" " + missing)
}
case map: MappingEdge => /* do nothing for mapping edges */
} }
case map: MappingEdge => /* do nothing for mapping edges */ )
}
val detector = new CycleDetector(jobGraph) val detector = new CycleDetector(jobGraph)
if (detector.detectCycles) { if (detector.detectCycles) {
@ -229,27 +243,31 @@ class QGraph extends Logging {
logger.error(" " + cycle) logger.error(" " + cycle)
throw new QException("Cycles were detected in the graph.") throw new QException("Cycles were detected in the graph.")
} }
numMissingValues
} }
/** /**
* Dry-runs the jobs by traversing the graph. * Dry-runs the jobs by traversing the graph.
*/ */
private def dryRunJobs() = { private def dryRunJobs() {
updateGraphStatus(false) updateGraphStatus(false)
var readyJobs = getReadyJobs var readyJobs = getReadyJobs
while (!shuttingDown && readyJobs.size > 0) { while (running && readyJobs.size > 0) {
logger.debug("+++++++")
readyJobs.foreach(edge => { readyJobs.foreach(edge => {
logEdge(edge) if (running) {
edge.markAsDone logEdge(edge)
edge.markAsDone
}
}) })
readyJobs = getReadyJobs readyJobs = getReadyJobs
} }
} }
private def logEdge(edge: FunctionEdge) = { private def logEdge(edge: FunctionEdge) {
logger.info("-------") logger.info("-------")
if (logger.isDebugEnabled) {
logger.debug("Inputs: " + edge.inputs)
}
logger.info(StringUtils.capitalize(edge.status.toString) + ": " + edge.function.description) logger.info(StringUtils.capitalize(edge.status.toString) + ": " + edge.function.description)
if (logger.isDebugEnabled) if (logger.isDebugEnabled)
logger.debug(edge.function.commandDirectory + " > " + edge.function.description) logger.debug(edge.function.commandDirectory + " > " + edge.function.description)
@ -261,7 +279,7 @@ class QGraph extends Logging {
/** /**
* Logs job statuses by traversing the graph and looking for status-related files * Logs job statuses by traversing the graph and looking for status-related files
*/ */
private def logStatus() = { private def logStatus() {
updateGraphStatus(false) updateGraphStatus(false)
doStatus(status => logger.info(status)) doStatus(status => logger.info(status))
} }
@ -269,7 +287,7 @@ class QGraph extends Logging {
/** /**
* Runs the jobs by traversing the graph. * Runs the jobs by traversing the graph.
*/ */
private def runJobs() = { private def runJobs() {
try { try {
if (settings.bsubAllJobs) if (settings.bsubAllJobs)
commandLineManager = new Lsf706JobManager commandLineManager = new Lsf706JobManager
@ -284,7 +302,7 @@ class QGraph extends Logging {
var readyJobs = getReadyJobs var readyJobs = getReadyJobs
var runningJobs = Set.empty[FunctionEdge] var runningJobs = Set.empty[FunctionEdge]
while (!shuttingDown && readyJobs.size + runningJobs.size > 0) { while (running && readyJobs.size + runningJobs.size > 0) {
var exitedJobs = List.empty[FunctionEdge] var exitedJobs = List.empty[FunctionEdge]
var failedJobs = List.empty[FunctionEdge] var failedJobs = List.empty[FunctionEdge]
@ -296,12 +314,14 @@ class QGraph extends Logging {
exitedJobs.foreach(runner => runningJobs -= runner) exitedJobs.foreach(runner => runningJobs -= runner)
readyJobs.foreach(f => { readyJobs.foreach(f => {
f.runner = newRunner(f.function) if (running) {
f.start() f.runner = newRunner(f.function)
f.status match { f.start()
case RunnerStatus.RUNNING => runningJobs += f f.status match {
case RunnerStatus.FAILED => failedJobs :+= f case RunnerStatus.RUNNING => runningJobs += f
case RunnerStatus.DONE => /* do nothing and move on */ case RunnerStatus.FAILED => failedJobs :+= f
case RunnerStatus.DONE => /* do nothing and move on */
}
} }
}) })
@ -329,7 +349,7 @@ class QGraph extends Logging {
* Updates the status of edges in the graph. * Updates the status of edges in the graph.
* @param cleanOutputs If true will delete outputs when setting edges to pending. * @param cleanOutputs If true will delete outputs when setting edges to pending.
*/ */
private def updateGraphStatus(cleanOutputs: Boolean) = { private def updateGraphStatus(cleanOutputs: Boolean) {
traverseFunctions(edge => checkDone(edge, cleanOutputs)) traverseFunctions(edge => checkDone(edge, cleanOutputs))
} }
@ -340,7 +360,7 @@ class QGraph extends Logging {
* @param edge Edge to check to see if it's done or can be skipped. * @param edge Edge to check to see if it's done or can be skipped.
* @param cleanOutputs If true will delete outputs when setting edges to pending. * @param cleanOutputs If true will delete outputs when setting edges to pending.
*/ */
private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) = { private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) {
if (edge.function.isIntermediate) { if (edge.function.isIntermediate) {
// By default we do not need to run intermediate edges. // By default we do not need to run intermediate edges.
// Mark any intermediate edges as skipped, if they're not already done. // Mark any intermediate edges as skipped, if they're not already done.
@ -365,7 +385,7 @@ class QGraph extends Logging {
* @param previous Previous edges that provide inputs to edge. * @param previous Previous edges that provide inputs to edge.
* @param cleanOutputs If true will clean up the output files when resetting skipped jobs to pending. * @param cleanOutputs If true will clean up the output files when resetting skipped jobs to pending.
*/ */
private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean): Unit = { private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean) {
for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) { for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) {
previousEdge.resetToPending(cleanOutputs) previousEdge.resetToPending(cleanOutputs)
resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge), cleanOutputs) resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge), cleanOutputs)
@ -383,8 +403,8 @@ class QGraph extends Logging {
} }
} }
private def emailFailedJobs(failed: List[FunctionEdge]) = { private def emailFailedJobs(failed: List[FunctionEdge]) {
if (settings.statusEmailTo.size > 0) { if (running && settings.statusEmailTo.size > 0) {
val emailMessage = new EmailMessage val emailMessage = new EmailMessage
emailMessage.from = settings.statusEmailFrom emailMessage.from = settings.statusEmailFrom
emailMessage.to = settings.statusEmailTo emailMessage.to = settings.statusEmailTo
@ -394,7 +414,7 @@ class QGraph extends Logging {
} }
} }
private def checkRetryJobs(failed: List[FunctionEdge]) = { private def checkRetryJobs(failed: List[FunctionEdge]) {
if (settings.retries > 0) { if (settings.retries > 0) {
for (failedJob <- failed) { for (failedJob <- failed) {
if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) { if (failedJob.function.jobRestartable && failedJob.retries < settings.retries) {
@ -410,8 +430,8 @@ class QGraph extends Logging {
} }
} }
private def emailStatus() = { private def emailStatus() {
if (settings.statusEmailTo.size > 0) { if (running && settings.statusEmailTo.size > 0) {
var failed = List.empty[FunctionEdge] var failed = List.empty[FunctionEdge]
foreachFunction(edge => { foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) { if (edge.status == RunnerStatus.FAILED) {
@ -433,7 +453,7 @@ class QGraph extends Logging {
} }
} }
private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) = { private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) {
val logs = failed.flatMap(edge => logFiles(edge)) val logs = failed.flatMap(edge => logFiles(edge))
if (emailMessage.body == null) if (emailMessage.body == null)
@ -549,7 +569,7 @@ class QGraph extends Logging {
/** /**
* Updates a status map with scatter/gather status information (e.g. counts) * Updates a status map with scatter/gather status information (e.g. counts)
*/ */
private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) = { private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) {
if (edge.function.isInstanceOf[GatherFunction]) { if (edge.function.isInstanceOf[GatherFunction]) {
updateSGStatus(stats.gather, edge) updateSGStatus(stats.gather, edge)
} else if (edge.function.isInstanceOf[CloneFunction]) { } else if (edge.function.isInstanceOf[CloneFunction]) {
@ -559,7 +579,7 @@ class QGraph extends Logging {
} }
} }
private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) = { private def updateSGStatus(stats: ScatterGatherStatus, edge: FunctionEdge) {
stats.total += 1 stats.total += 1
edge.status match { edge.status match {
case RunnerStatus.DONE => stats.done += 1 case RunnerStatus.DONE => stats.done += 1
@ -584,57 +604,56 @@ class QGraph extends Logging {
* @return A new graph * @return A new graph
*/ */
private def newGraph = new SimpleDirectedGraph[QNode, QEdge](new EdgeFactory[QNode, QEdge] { private def newGraph = new SimpleDirectedGraph[QNode, QEdge](new EdgeFactory[QNode, QEdge] {
def createEdge(input: QNode, output: QNode) = new MappingEdge(input.files, output.files)}) def createEdge(input: QNode, output: QNode) = new MappingEdge(input, output)})
private def addEdge(edge: QEdge) = { private var nextNodeId = 0
val inputs = QNode(edge.inputs) private def getQNode(files: List[File]) = {
val outputs = QNode(edge.outputs) jobGraph.vertexSet.find(node => node.files == files) match {
val newSource = jobGraph.addVertex(inputs) case Some(node) =>
val newTarget = jobGraph.addVertex(outputs) node
val removedEdges = jobGraph.removeAllEdges(inputs, outputs) case None =>
val added = jobGraph.addEdge(inputs, outputs, edge) if (nextNodeId % 100 == 0)
if (this.debugMode) { logger.debug("adding QNode: " + nextNodeId)
logger.debug("Mapped from: " + inputs) val node = new QNode(nextNodeId, files)
logger.debug("Mapped to: " + outputs) nextNodeId += 1
logger.debug("Mapped via: " + edge) jobGraph.addVertex(node)
logger.debug("Removed edges: " + removedEdges) node
logger.debug("New source?: " + newSource)
logger.debug("New target?: " + newTarget)
logger.debug("")
} }
} }
/** private def addEdge(edge: QEdge) {
* Checks to see if the set of files has more than one file and if so adds input mappings between the set and the individual files. jobGraph.removeAllEdges(edge.inputs, edge.outputs)
* @param files Set to check. jobGraph.addEdge(edge.inputs, edge.outputs, edge)
*/
private def addCollectionInputs(files: Set[File]): Unit = {
if (files.size > 1)
for (file <- files)
addMappingEdge(Set(file), files)
} }
/** /**
* Checks to see if the set of files has more than one file and if so adds output mappings between the individual files and the set. * Adds input mappings between the node's files and the individual files.
* @param files Set to check. * @param inputs Input node.
*/ */
private def addCollectionOutputs(files: Set[File]): Unit = { private def addCollectionInputs(inputs: QNode) {
if (files.size > 1) if (inputs.files.size > 1)
for (file <- files) for (file <- inputs.files) {
addMappingEdge(files, Set(file)) if (running) {
val input = getQNode(List(file))
if (!jobGraph.containsEdge(input, inputs))
addEdge(new MappingEdge(input, inputs))
}
}
} }
/** /**
* Adds a directed graph edge between the input set and the output set if there isn't a direct relationship between the two nodes already. * Adds output mappings between the node's files and the individual files.
* @param input Input set of files. * @param outputs Output node.
* @param output Output set of files.
*/ */
private def addMappingEdge(input: Set[File], output: Set[File]) = { private def addCollectionOutputs(outputs: QNode) {
val hasEdge = input == output || if (outputs.files.size > 1)
jobGraph.getEdge(QNode(input), QNode(output)) != null || for (file <- outputs.files) {
jobGraph.getEdge(QNode(output), QNode(input)) != null if (running) {
if (!hasEdge) val output = getQNode(List(file))
addEdge(new MappingEdge(input, output)) if (!jobGraph.containsEdge(outputs, output))
addEdge(new MappingEdge(outputs, output))
}
}
} }
/** /**
@ -644,13 +663,12 @@ class QGraph extends Logging {
* @return true if the edge is not needed in the graph. * @return true if the edge is not needed in the graph.
*/ */
private def isFiller(edge: QEdge) = { private def isFiller(edge: QEdge) = {
if (edge.isInstanceOf[MappingEdge]) { edge match {
if (jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0) case mapping: MappingEdge =>
true jobGraph.outgoingEdgesOf(jobGraph.getEdgeTarget(edge)).size == 0 &&
else if (jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0) jobGraph.incomingEdgesOf(jobGraph.getEdgeSource(edge)).size == 0
true case _ => false
else false }
} else false
} }
/** /**
@ -658,8 +676,10 @@ class QGraph extends Logging {
* @param node Node (set of files) to check. * @param node Node (set of files) to check.
* @return true if this set of files is not needed in the graph. * @return true if this set of files is not needed in the graph.
*/ */
private def isOrphan(node: QNode) = private def isOrphan(node: QNode) = {
(jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 jobGraph.incomingEdgesOf(node).size == 0 &&
jobGraph.outgoingEdgesOf(node).size == 0
}
/** /**
* Utility function for running a method over all function edges. * Utility function for running a method over all function edges.
@ -670,7 +690,7 @@ class QGraph extends Logging {
.filter(_.isInstanceOf[FunctionEdge]) .filter(_.isInstanceOf[FunctionEdge])
.asInstanceOf[List[FunctionEdge]] .asInstanceOf[List[FunctionEdge]]
.sortWith(compare(_,_)) .sortWith(compare(_,_))
.foreach(f(_)) .foreach(edge => if (running) f(edge))
} }
private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean = private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean =
@ -699,21 +719,23 @@ class QGraph extends Logging {
* Utility function for running a method over all functions, but traversing the nodes in order of dependency. * Utility function for running a method over all functions, but traversing the nodes in order of dependency.
* @param edgeFunction Function to run for each FunctionEdge. * @param edgeFunction Function to run for each FunctionEdge.
*/ */
private def traverseFunctions(f: (FunctionEdge) => Unit) = { private def traverseFunctions(f: (FunctionEdge) => Unit) {
val iterator = new TopologicalOrderIterator(this.jobGraph) val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = { override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = {
event.getEdge match { if (running) {
case functionEdge: FunctionEdge => f(functionEdge) event.getEdge match {
case map: MappingEdge => /* do nothing for mapping functions */ case functionEdge: FunctionEdge => f(functionEdge)
case map: MappingEdge => /* do nothing for mapping functions */
}
} }
} }
}) })
iterator.foreach(_ => {}) iterator.foreach(_ => {})
} }
private def deleteIntermediateOutputs() = { private def deleteIntermediateOutputs() {
if (!settings.keepIntermediates && !hasFailed) { if (running && !settings.keepIntermediates && success) {
logger.info("Deleting intermediate files.") logger.info("Deleting intermediate files.")
traverseFunctions(edge => { traverseFunctions(edge => {
if (edge.function.isIntermediate) { if (edge.function.isIntermediate) {
@ -729,7 +751,7 @@ class QGraph extends Logging {
* http://en.wikipedia.org/wiki/DOT_language * http://en.wikipedia.org/wiki/DOT_language
* @param file Path to output the .dot file. * @param file Path to output the .dot file.
*/ */
private def renderToDot(file: java.io.File) = { private def renderToDot(file: java.io.File) {
val out = new java.io.FileWriter(file) val out = new java.io.FileWriter(file)
// todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a
@ -745,46 +767,61 @@ class QGraph extends Logging {
} }
/** /**
* Returns true if any of the jobs in the graph have a status of failed. * Returns true if no functions have missing values nor a status of failed.
* @return true if any of the jobs in the graph have a status of failed. * @return true if no functions have missing values nor a status of failed.
*/ */
def hasFailed = { def success = {
!this.dryRun && this.jobGraph.edgeSet.exists(edge => { if (numMissingValues > 0) {
edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED false
}) } else if (this.dryRun) {
true
} else {
!this.jobGraph.edgeSet.exists(edge => {
edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED
})
}
} }
def logFailed = { def logFailed() {
foreachFunction(edge => { foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) if (edge.status == RunnerStatus.FAILED)
logEdge(edge) logEdge(edge)
}) })
} }
/**
* Returns true if the graph was shutdown instead of exiting on its own.
*/
def isShutdown = !running
/** /**
* Kills any forked jobs still running. * Kills any forked jobs still running.
*/ */
def shutdown() { def shutdown() {
shuttingDown = true // Signal the main thread to shutdown.
val runners = getRunningJobs.map(_.runner) running = false
val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] // Wait for the thread to finish and exit normally.
if (manager != null) { runningLock.synchronized {
val managerRunners = runners val runners = getRunningJobs.map(_.runner)
.filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]]
.asInstanceOf[List[JobRunner[QFunction]]] if (manager != null) {
if (managerRunners.size > 0) val managerRunners = runners
.filter(runner => manager.runnerType.isAssignableFrom(runner.getClass))
.asInstanceOf[List[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
manager.tryStop(managerRunners)
} catch {
case e => /* ignore */
}
}
runners.foreach(runner =>
try { try {
manager.tryStop(managerRunners) runner.removeTemporaryFiles()
} catch { } catch {
case e => /* ignore */ case e => /* ignore */
} }
)
} }
runners.foreach(runner =>
try {
runner.removeTemporaryFiles()
} catch {
case e => /* ignore */
}
)
} }
} }

View File

@ -4,6 +4,17 @@ import java.io.File
/** /**
* Represents a state between QFunctions the directed acyclic QGraph * Represents a state between QFunctions the directed acyclic QGraph
* @param files The set of files that represent this node state. * @param files The list of files that represent this node state ordered by file name.
*/ */
case class QNode (val files: Set[File]) class QNode (val id: Int, val files: List[File]) {
override def equals(obj: Any) = {
obj match {
case other: QNode => this.id == other.id
case _ => false
}
}
override def hashCode = id
override def toString = files.toString
}

View File

@ -37,12 +37,8 @@ trait QFunction extends Logging {
/** Order the function was added to the graph. */ /** Order the function was added to the graph. */
var addOrder: List[Int] = Nil var addOrder: List[Int] = Nil
/** /** Job priority */
* EXPERIMENTAL AND NOT SUPPORTED!! var jobPriority: Option[Int] = None
* Limits the number of seconds that the job will run.
* TODO: Replace with full resource specifications.
*/
var jobLimitSeconds: Option[Int] = None
/** Whether a job is restartable */ /** Whether a job is restartable */
var jobRestartable = true var jobRestartable = true
@ -70,7 +66,7 @@ trait QFunction extends Logging {
function.commandDirectory = this.commandDirectory function.commandDirectory = this.commandDirectory
function.jobTempDir = this.jobTempDir function.jobTempDir = this.jobTempDir
function.addOrder = this.addOrder function.addOrder = this.addOrder
function.jobLimitSeconds = this.jobLimitSeconds function.jobPriority = this.jobPriority
function.jobRestartable = this.jobRestartable function.jobRestartable = this.jobRestartable
function.updateJobRun = this.updateJobRun function.updateJobRun = this.updateJobRun
function.isIntermediate = this.isIntermediate function.isIntermediate = this.isIntermediate
@ -319,6 +315,9 @@ trait QFunction extends Logging {
if (jobTempDir == null) if (jobTempDir == null)
jobTempDir = qSettings.tempDirectory jobTempDir = qSettings.tempDirectory
if (jobPriority.isEmpty)
jobPriority = qSettings.jobPriority
// Do not set the temp dir relative to the command directory // Do not set the temp dir relative to the command directory
jobTempDir = IOUtils.absolute(jobTempDir) jobTempDir = IOUtils.absolute(jobTempDir)

View File

@ -149,9 +149,11 @@ trait ScatterGatherableFunction extends CommandLineFunction {
super.freezeFieldValues super.freezeFieldValues
if (this.scatterGatherDirectory == null) { if (this.scatterGatherDirectory == null) {
this.scatterGatherDirectory = qSettings.jobScatterGatherDirectory if (qSettings.jobScatterGatherDirectory != null) {
if (this.scatterGatherDirectory == null) this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory)
this.scatterGatherDirectory = this.commandDirectory } else {
this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather")
}
} }
} }

View File

@ -1,16 +1,59 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.pipeline package org.broadinstitute.sting.queue.pipeline
import collection.JavaConversions._
import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.utils.Utils
import org.testng.Assert import org.testng.Assert
import org.broadinstitute.sting.commandline.CommandLineProgram import org.broadinstitute.sting.commandline.CommandLineProgram
import java.io.File
import org.broadinstitute.sting.queue.util.{TextFormatUtils, ProcessController}
import java.util.Date import java.util.Date
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import org.broadinstitute.sting.{WalkerTest, BaseTest} import org.broadinstitute.sting.{WalkerTest, BaseTest}
import org.broadinstitute.sting.queue.{QException, QCommandLine} import org.broadinstitute.sting.queue.{QException, QCommandLine}
import org.broadinstitute.sting.datasources.pipeline.{Pipeline, PipelineProject, PipelineSample}
import org.broadinstitute.sting.queue.util.{Logging, ProcessController}
import java.io.{FileNotFoundException, File}
object PipelineTest { object PipelineTest extends BaseTest with Logging {
case class K1gBam(squidId: String, sampleId: String, version: Int)
/** 1000G BAMs used for validation */
val k1gBams = List(
new K1gBam("C474", "NA19651", 2),
new K1gBam("C474", "NA19655", 2),
new K1gBam("C474", "NA19669", 2),
new K1gBam("C454", "NA19834", 2),
new K1gBam("C460", "HG01440", 2),
new K1gBam("C456", "NA12342", 2),
new K1gBam("C456", "NA12748", 2),
new K1gBam("C474", "NA19649", 2),
new K1gBam("C474", "NA19652", 2),
new K1gBam("C474", "NA19654", 2))
validateK1gBams()
/** The path to the current Sting directory. Useful when specifying Sting resources. */ /** The path to the current Sting directory. Useful when specifying Sting resources. */
val currentStingDir = new File(".").getAbsolutePath val currentStingDir = new File(".").getAbsolutePath
@ -18,6 +61,10 @@ object PipelineTest {
/** The path to the current build of the GATK jar in the currentStingDir. */ /** The path to the current build of the GATK jar in the currentStingDir. */
val currentGATK = new File(currentStingDir, "dist/GenomeAnalysisTK.jar") val currentGATK = new File(currentStingDir, "dist/GenomeAnalysisTK.jar")
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/"
val run = System.getProperty("pipeline.run") == "run"
/** /**
* Returns the top level output path to this test. * Returns the top level output path to this test.
* @param testName The name of the test passed to PipelineTest.executeTest() * @param testName The name of the test passed to PipelineTest.executeTest()
@ -48,12 +95,54 @@ object PipelineTest {
*/ */
def fileMD5(testName: String, filePath: String, md5: String) = (new File(runDir(testName) + filePath), md5) def fileMD5(testName: String, filePath: String, md5: String) = (new File(runDir(testName) + filePath), md5)
private var runningCommandLines = Set.empty[QCommandLine] /**
* Creates a new pipeline from a project.
* @param project Pipeline project info.
* @param samples List of samples.
* @return a new pipeline project.
*/
def createPipeline(project: PipelineProject, samples: List[PipelineSample]) = {
val pipeline = new Pipeline
pipeline.setProject(project)
pipeline.setSamples(samples)
pipeline
}
private val validationReportsDataLocation = "/humgen/gsa-hpprojects/GATK/validationreports/submitted/" /**
* Creates a new pipeline project for hg19 with b37 132 dbsnp for genotyping, and b37 129 dbsnp for eval.
* @param projectName Name of the project.
* @param chr20 True if only chr20 should be evaluated or the whole exome.
* @return a new pipeline project.
*/
def createHg19Project(projectName: String, chr20: Boolean) = {
val project = new PipelineProject
project.setName(projectName)
project.setReferenceFile(new File(BaseTest.hg19Reference))
project.setGenotypeDbsnp(new File(BaseTest.b37dbSNP132))
project.setEvalDbsnp(new File(BaseTest.b37dbSNP129))
project.setRefseqTable(new File(BaseTest.hg19Refseq))
project.setIntervalList(new File(if (chr20) BaseTest.hg19Chr20Intervals else BaseTest.hg19Intervals))
project
}
val run = System.getProperty("pipeline.run") == "run" /**
* Creates a 1000G pipeline sample from one of the bams.
* @param idPrefix Text to prepend to the sample name.
* @param k1gBam bam to create the sample for.
* @return the created pipeline sample.
*/
def createK1gSample(idPrefix: String, k1gBam: K1gBam) = {
val sample = new PipelineSample
sample.setId(idPrefix + "_" + k1gBam.sampleId)
sample.setBamFiles(Map("cleaned" -> getPicardBam(k1gBam)))
sample.setTags(Map("SQUIDProject" -> k1gBam.squidId, "CollaboratorID" -> k1gBam.sampleId))
sample
}
/**
* Runs the pipelineTest.
* @param pipelineTest test to run.
*/
def executeTest(pipelineTest: PipelineTestSpec) { def executeTest(pipelineTest: PipelineTestSpec) {
val name = pipelineTest.name val name = pipelineTest.name
if (name == null) if (name == null)
@ -78,7 +167,7 @@ object PipelineTest {
failed += 1 failed += 1
} }
if (failed > 0) if (failed > 0)
Assert.fail("%d of %d MD5%s did not match.".format(failed, fileMD5s.size, TextFormatUtils.plural(failed))) Assert.fail("%d of %d MD5s did not match.".format(failed, fileMD5s.size))
} }
private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) { private def validateEval(name: String, evalSpec: PipelineTestEvalSpec) {
@ -115,7 +204,7 @@ object PipelineTest {
* @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null. * @param jobQueue the queue to run the job on. Defaults to hour if jobQueue is null.
* @param expectedException the expected exception or null if no exception is expected. * @param expectedException the expected exception or null if no exception is expected.
*/ */
def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) { private def executeTest(name: String, args: String, jobQueue: String, expectedException: Class[_]) {
var command = Utils.escapeExpressions(args) var command = Utils.escapeExpressions(args)
// add the logging level to each of the integration test commands // add the logging level to each of the integration test commands
@ -172,6 +261,46 @@ object PipelineTest {
} }
} }
/**
* Throws an exception if any of the 1000G bams do not exist and warns if they are out of date.
*/
private def validateK1gBams() {
var missingBams = List.empty[File]
for (k1gBam <- k1gBams) {
val latest = getLatestVersion(k1gBam)
val bam = getPicardBam(k1gBam)
if (k1gBam.version != latest)
logger.warn("1000G bam is not the latest version %d: %s".format(latest, k1gBam))
if (!bam.exists)
missingBams :+= bam
}
if (missingBams.size > 0) {
val nl = "%n".format()
throw new FileNotFoundException("The following 1000G bam files are missing.%n%s".format(missingBams.mkString(nl)))
}
}
private def getPicardBam(k1gBam: K1gBam): File =
getPicardBam(k1gBam.squidId, k1gBam.sampleId, k1gBam.version)
private def getPicardBam(squidId: String, sampleId: String, version: Int): File =
new File(getPicardDir(squidId, sampleId, version), sampleId + ".bam")
private def getPicardDir(squidId: String, sampleId: String, version: Int) =
new File("/seq/picard_aggregation/%1$s/%2$s/v%3$s/".format(squidId, sampleId, version))
private def getLatestVersion(k1gBam: K1gBam): Int =
getLatestVersion(k1gBam.squidId, k1gBam.sampleId, k1gBam.version)
private def getLatestVersion(squidId: String, sampleId: String, startVersion: Int): Int = {
var version = startVersion
while (new File(getPicardDir(squidId, sampleId, version + 1), "finished.txt").exists)
version += 1
version
}
private var runningCommandLines = Set.empty[QCommandLine]
Runtime.getRuntime.addShutdownHook(new Thread { Runtime.getRuntime.addShutdownHook(new Thread {
/** Cleanup as the JVM shuts down. */ /** Cleanup as the JVM shuts down. */
override def run { override def run {

View File

@ -1,3 +1,27 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.pipeline.examples package org.broadinstitute.sting.queue.pipeline.examples
import org.testng.annotations.Test import org.testng.annotations.Test
@ -7,8 +31,24 @@ class HelloWorldPipelineTest {
@Test @Test
def testHelloWorld { def testHelloWorld {
val spec = new PipelineTestSpec val spec = new PipelineTestSpec
spec.name = "helloworld" spec.name = "HelloWorld"
spec.args = "-S scala/qscript/examples/HelloWorld.scala"
PipelineTest.executeTest(spec)
}
@Test
def testHelloWorldWithPrefix {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPrefix"
spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPrefix HelloWorld" spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPrefix HelloWorld"
PipelineTest.executeTest(spec) PipelineTest.executeTest(spec)
} }
@Test
def testHelloWorldWithPriority {
val spec = new PipelineTestSpec
spec.name = "HelloWorldWithPriority"
spec.args = "-S scala/qscript/examples/HelloWorld.scala -jobPriority 100"
PipelineTest.executeTest(spec)
}
} }

View File

@ -1,31 +1,41 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.pipeline.playground package org.broadinstitute.sting.queue.pipeline.playground
import org.testng.annotations.{DataProvider, Test} import org.testng.annotations.{DataProvider, Test}
import collection.JavaConversions._ import collection.JavaConversions._
import java.io.File import java.io.File
import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, PipelineProject, Pipeline} import org.broadinstitute.sting.datasources.pipeline.{PipelineSample, Pipeline}
import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.BaseTest
import org.broadinstitute.sting.queue.pipeline._ import org.broadinstitute.sting.queue.pipeline._
class FullCallingPipelineTest { class FullCallingPipelineTest {
def datasets = List(k1gChr20Dataset, k1gExomeDataset) def datasets = List(k1gChr20Dataset, k1gExomeDataset)
val k1gBams = List(
new K1gBam("C474", "NA19651", 1),
new K1gBam("C474", "NA19655", 1),
new K1gBam("C474", "NA19669", 1),
new K1gBam("C454", "NA19834", 1),
new K1gBam("C460", "HG01440", 1),
new K1gBam("C456", "NA12342", 1),
new K1gBam("C456", "NA12748", 1),
new K1gBam("C474", "NA19649", 1),
new K1gBam("C474", "NA19652", 1),
new K1gBam("C474", "NA19654", 1))
val k1gChr20Dataset = { val k1gChr20Dataset = {
val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20") val dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true)
dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.chr20.interval_list"))
dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 1348) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 1348)
dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 1124) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 1124)
@ -38,8 +48,7 @@ class FullCallingPipelineTest {
} }
val k1gExomeDataset = { val k1gExomeDataset = {
val dataset = newK1gDataset("Barcoded_1000G_WEx") val dataset = newK1gDataset("Barcoded_1000G_WEx", false)
dataset.pipeline.getProject.setIntervalList(new File(BaseTest.GATKDataLocation + "whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list"))
dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 50755) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.all.counter.nCalledLoci", 50755)
dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 40894) dataset.validations :+= new IntegerValidation("eval.dbsnp.all.called.known.counter.nCalledLoci", 40894)
@ -53,34 +62,12 @@ class FullCallingPipelineTest {
dataset dataset
} }
class K1gBam(val squidId: String, val sampleId: String, val version: Int) def newK1gDataset(projectName: String, chr20: Boolean) = {
val project = PipelineTest.createHg19Project(projectName, chr20)
def newK1gDataset(projectName: String) = {
val project = new PipelineProject
project.setName(projectName)
project.setReferenceFile(new File(BaseTest.hg19Reference))
project.setGenotypeDbsnp(new File(BaseTest.b37dbSNP132))
project.setEvalDbsnp(new File(BaseTest.b37dbSNP129))
project.setRefseqTable(new File(BaseTest.hg19Refseq))
var samples = List.empty[PipelineSample] var samples = List.empty[PipelineSample]
for (k1gBam <- k1gBams) { for (k1gBam <- PipelineTest.k1gBams)
val sample = new PipelineSample samples :+= PipelineTest.createK1gSample(projectName, k1gBam)
sample.setId(projectName + "_" + k1gBam.sampleId) new PipelineDataset(PipelineTest.createPipeline(project, samples))
sample.setBamFiles(Map("recalibrated" -> new File("/seq/picard_aggregation/%1$s/%2$s/v%3$s/%2$s.bam"
.format(k1gBam.squidId, k1gBam.sampleId, k1gBam.version))))
sample.setTags(Map("SQUIDProject" -> k1gBam.squidId, "CollaboratorID" -> k1gBam.sampleId))
samples :+= sample
}
val pipeline = new Pipeline
pipeline.setProject(project)
pipeline.setSamples(samples)
val dataset = new PipelineDataset
dataset.pipeline = pipeline
dataset
} }
@DataProvider(name="datasets")//, parallel=true) @DataProvider(name="datasets")//, parallel=true)
@ -92,49 +79,40 @@ class FullCallingPipelineTest {
val projectName = dataset.pipeline.getProject.getName val projectName = dataset.pipeline.getProject.getName
val testName = "FullCallingPipeline-" + projectName val testName = "FullCallingPipeline-" + projectName
val yamlFile = writeYaml(testName, dataset.pipeline) val yamlFile = writeYaml(testName, dataset.pipeline)
var cleanType = "cleaned"
// Run the pipeline with the expected inputs. // Run the pipeline with the expected inputs.
var pipelineCommand = ("-retry 1 -S scala/qscript/playground/FullCallingPipeline.q" + val pipelineCommand = ("-retry 1 -S scala/qscript/playground/FullCallingPipeline.q" +
" -jobProject %s -Y %s" + " -jobProject %s -Y %s" +
" -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" + " -tearScript %s/R/DataProcessingReport/GetTearsheetStats.R" +
" --gatkjar %s") " --gatkjar %s")
.format(projectName, yamlFile, PipelineTest.currentStingDir, PipelineTest.currentGATK) .format(projectName, yamlFile, PipelineTest.currentStingDir, PipelineTest.currentGATK)
if (!dataset.runIndelRealigner) {
pipelineCommand += " -skipCleaning"
cleanType = "uncleaned"
}
val pipelineSpec = new PipelineTestSpec val pipelineSpec = new PipelineTestSpec
pipelineSpec.name = testName pipelineSpec.name = testName
pipelineSpec.args = pipelineCommand pipelineSpec.args = pipelineCommand
pipelineSpec.jobQueue = dataset.jobQueue pipelineSpec.jobQueue = dataset.jobQueue
pipelineSpec.evalSpec = new PipelineTestEvalSpec pipelineSpec.evalSpec = new PipelineTestEvalSpec
pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.%s.annotated.handfiltered.vcf".format(projectName, cleanType)) pipelineSpec.evalSpec.vcf = new File(PipelineTest.runDir(testName) + "SnpCalls/%s.cleaned.annotated.handfiltered.vcf".format(projectName))
pipelineSpec.evalSpec.reference = dataset.pipeline.getProject.getReferenceFile pipelineSpec.evalSpec.reference = dataset.pipeline.getProject.getReferenceFile
pipelineSpec.evalSpec.intervals = dataset.pipeline.getProject.getIntervalList pipelineSpec.evalSpec.intervals = dataset.pipeline.getProject.getIntervalList
pipelineSpec.evalSpec.dbsnp = dataset.pipeline.getProject.getEvalDbsnp pipelineSpec.evalSpec.dbsnp = dataset.pipeline.getProject.getEvalDbsnp
pipelineSpec.evalSpec.validations = dataset.validations pipelineSpec.evalSpec.validations = dataset.validations
// Run the test, at least checking if the command compiles
PipelineTest.executeTest(pipelineSpec) PipelineTest.executeTest(pipelineSpec)
} }
class PipelineDataset(
var pipeline: Pipeline = null,
var validations: List[PipelineValidation] = Nil,
var jobQueue: String = null,
var runIndelRealigner: Boolean = false) {
override def toString = pipeline.getProject.getName
}
private def writeYaml(testName: String, pipeline: Pipeline) = { private def writeYaml(testName: String, pipeline: Pipeline) = {
val runDir = PipelineTest.runDir(testName) val runDir = PipelineTest.runDir(testName)
new File(runDir).mkdirs
val yamlFile = new File(runDir, pipeline.getProject.getName + ".yaml") val yamlFile = new File(runDir, pipeline.getProject.getName + ".yaml")
yamlFile.getParentFile.mkdirs
YamlUtils.dump(pipeline, yamlFile) YamlUtils.dump(pipeline, yamlFile)
yamlFile yamlFile
} }
class PipelineDataset(var pipeline: Pipeline = null,
var validations: List[PipelineValidation] = Nil,
var jobQueue: String = null) {
override def toString = pipeline.getProject.getName
}
} }

View File

@ -0,0 +1,108 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.pipeline.playground
import collection.JavaConversions._
import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.testng.annotations.{Test, DataProvider}
import org.broadinstitute.sting.queue.pipeline.{PipelineTestSpec, PipelineTest}
import java.io.{PrintWriter, File}
import org.apache.commons.io.IOUtils
class MultiFullCallingPipelineTest {
def datasets = List(k1gChr20Dataset)
val k1gChr20Dataset = newK1gDataset("Barcoded_1000G_WEx_chr20", true, "hour")
val k1gExomeDataset = newK1gDataset("Barcoded_1000G_WEx", false, "gsa")
def newK1gDataset(datasetName: String, chr20: Boolean, pipelineJobQueue: String) = {
var dataset = new MultiPipelineDataset
dataset.name = datasetName
dataset.pipelineJobQueue = pipelineJobQueue
for (k1gBam <- PipelineTest.k1gBams) {
val project = PipelineTest.createHg19Project("SingleSample_" + k1gBam.sampleId, chr20)
val sample = PipelineTest.createK1gSample("Sample", k1gBam)
dataset.samplePipelines :+= PipelineTest.createPipeline(project, List(sample))
}
dataset
}
@DataProvider(name="datasets")//, parallel=true)
final def convertDatasets: Array[Array[AnyRef]] =
datasets.map(dataset => Array(dataset.asInstanceOf[AnyRef])).toArray
@Test(dataProvider="datasets", enabled=false)
def testMultiFullCallingPipeline(dataset: MultiPipelineDataset) = {
val projectName = dataset.name
val testName = "MultiFullCallingPipeline-" + projectName
var yamlFiles = List.empty[File]
for (samplePipeline <- dataset.samplePipelines)
yamlFiles :+= writeYaml(testName, samplePipeline)
val yamlList = writeYamlList(testName, yamlFiles)
// Run the pipeline with the expected inputs.
val pipelineCommand = ("-retry 1 -BS 3 -PP 100 -S scala/qscript/playground/MultiFullCallingPipeline.scala" +
" -jobProject %s -YL %s -PJQ %s -stingHome %s")
.format(projectName, yamlList, dataset.pipelineJobQueue, PipelineTest.currentStingDir)
val pipelineSpec = new PipelineTestSpec
pipelineSpec.name = testName
pipelineSpec.args = pipelineCommand
pipelineSpec.jobQueue = "gsa"
PipelineTest.executeTest(pipelineSpec)
}
private def writeYaml(testName: String, pipeline: Pipeline) = {
val runDir = PipelineTest.runDir(testName)
val yamlFile = new File(runDir, pipeline.getProject.getName + "/" + pipeline.getProject.getName + ".yaml").getAbsoluteFile
yamlFile.getParentFile.mkdirs
YamlUtils.dump(pipeline, yamlFile)
yamlFile
}
private def writeYamlList(testName: String, yamlFiles: List[File]) = {
val runDir = PipelineTest.runDir(testName)
val yamlList = new File(runDir, testName + "_yamls.list").getAbsoluteFile
yamlList.getParentFile.mkdirs
val writer = new PrintWriter(yamlList)
try {
for (yamlFile <- yamlFiles)
writer.println(yamlFile.toString)
} finally {
IOUtils.closeQuietly(writer)
}
yamlList
}
class MultiPipelineDataset (var name: String = null,
var pipelineJobQueue: String = null,
var samplePipelines: List[Pipeline] = Nil) {
override def toString = name
}
}