Instead of the pipeline's cleaner only writing BAMs with the target intervals, now pulling the list of contigs from the target intervals and outputing reads in those contigs.

Added a brute force -retry <count> option to Queue for transient errors.
Waiting up to 2 minutes for the LSF logs to appear before trying to display the errors from the logs.
Updates to the local job runner error logging when a job fails.
Refactored QGraph's settings as duplicate code was getting out of control.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4563 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-22 22:22:30 +00:00
parent 9fe71dc452
commit e9c6f681a4
11 changed files with 284 additions and 225 deletions

View File

@ -1,4 +1,3 @@
import net.sf.picard.reference.FastaSequenceFile
import org.broadinstitute.sting.commandline.ArgumentSource import org.broadinstitute.sting.commandline.ArgumentSource
import org.broadinstitute.sting.datasources.pipeline.Pipeline import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeCalculationModel.Model import org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeCalculationModel.Model
@ -8,7 +7,7 @@ import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
import org.broadinstitute.sting.queue.extensions.samtools._ import org.broadinstitute.sting.queue.extensions.samtools._
import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction} import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction}
import org.broadinstitute.sting.queue.util.IOUtils import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.{QException, QScript} import org.broadinstitute.sting.queue.QScript
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.yaml.YamlUtils
import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType
@ -37,6 +36,9 @@ class fullCallingPipeline extends QScript {
@Input(doc="per-sample downsampling level",shortName="dcov",required=false) @Input(doc="per-sample downsampling level",shortName="dcov",required=false)
var downsampling_coverage = 300 var downsampling_coverage = 300
@Input(doc="level of parallelism for IndelRealigner. By default uses number of contigs.", shortName="cleanerScatter", required=false)
var num_cleaner_scatter_jobs: Option[Int] = None
@Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false) @Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false)
var num_snp_scatter_jobs = 20 var num_snp_scatter_jobs = 20
@ -58,6 +60,9 @@ class fullCallingPipeline extends QScript {
@Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false) @Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false)
var big_mem_queue: String = _ var big_mem_queue: String = _
@Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false)
var short_job_queue: String = _
private var pipeline: Pipeline = _ private var pipeline: Pipeline = _
trait CommandLineGATKArgs extends CommandLineGATK { trait CommandLineGATKArgs extends CommandLineGATK {
@ -75,123 +80,118 @@ class fullCallingPipeline extends QScript {
pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile) pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile)
val projectBase: String = qscript.pipeline.getProject.getName val projectBase: String = qscript.pipeline.getProject.getName
val cleanedBase: String = projectBase + ".cleaned" if (qscript.skip_cleaning) {
val uncleanedBase: String = projectBase + ".uncleaned" //endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind)
endToEnd(projectBase + ".uncleaned", "recalibrated")
// there are commands that use all the bam files
val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated"))
//val adprRScript = qscript.adprScript
//val seq = qscript.machine
//val expKind = qscript.protocol
// count number of contigs (needed for indel cleaning parallelism)
val contigCount = IntervalScatterFunction.countContigs(
qscript.pipeline.getProject.getReferenceFile,
List(qscript.pipeline.getProject.getIntervalList.toString))
for ( sample <- recalibratedSamples ) {
val sampleId = sample.getId
// put unclean bams in unclean genotypers in advance, create the extension files
val bam = sample.getBamFiles.get("recalibrated")
if (!sample.getBamFiles.contains("cleaned")) {
sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam"))
}
val cleaned_bam = sample.getBamFiles.get("cleaned")
val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list")
// create the cleaning commands
val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs
targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId))
targetCreator.jobName = "CreateTargets_"+sampleId
targetCreator.analysisName = "CreateTargets_"+sampleId
targetCreator.input_file :+= bam
targetCreator.out = indel_targets
targetCreator.memoryLimit = Some(2)
targetCreator.isIntermediate = true
val realigner = new IndelRealigner with CommandLineGATKArgs
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
realigner.analysisName = "RealignBam_"+sampleId
realigner.input_file = targetCreator.input_file
realigner.targetIntervals = targetCreator.out
realigner.scatterCount = contigCount
// may need to explicitly run fix mates
var fixMates = new PicardBamJarFunction {
jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
// Declare inputs/outputs for dependency tracking.
@Input(doc="unfixed bam") var unfixed: File = _
@Output(doc="fixed bam") var fixed: File = _
def inputBams = List(unfixed)
def outputBam = fixed
}
// realigner.out = cleaned_bam
// realigner.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = qscript.picardFixMatesJar }
// realigner.jobQueue = "week"
// if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates
if (realigner.scatterCount > 1) {
realigner.out = cleaned_bam
// While gathering run fix mates.
realigner.setupScatterFunction = {
case scatter: ScatterFunction =>
scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId))
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId))
}
realigner.setupCloneFunction = {
case (clone: CloneFunction, index: Int) =>
clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index))
clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index))
}
realigner.setupGatherFunction = {
case (gather: BamGatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
gather.memoryLimit = Some(6)
gather.jarFile = qscript.picardFixMatesJar
// Don't pass this AS=true to fix mates!
gather.assumeSorted = None
case (gather: GatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName))
}
} else {
realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam")
realigner.isIntermediate = true
// Explicitly run fix mates if the function won't be scattered.
fixMates.memoryLimit = Some(6)
fixMates.jarFile = qscript.picardFixMatesJar
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+sampleId
// Add the fix mates explicitly
}
var samtoolsindex = new SamtoolsIndexFunction
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_cleaned_"+sampleId
if (!qscript.skip_cleaning) {
if ( realigner.scatterCount > 1 ) {
add(targetCreator,realigner,samtoolsindex)
} else {
add(targetCreator,realigner,fixMates,samtoolsindex)
}
}
}
// actually make calls
if (!qscript.skip_cleaning) {
//endToEnd(cleanedBase, "cleaned", adprRscript, seq, expKind)
endToEnd(cleanedBase, "cleaned")
} else { } else {
//endToEnd(uncleanedBase, "recalibrated", adprRscript, seq, expKind) // there are commands that use all the bam files
endToEnd(uncleanedBase, "recalibrated") val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated"))
//val adprRScript = qscript.adprScript
//val seq = qscript.machine
//val expKind = qscript.protocol
// get contigs (needed for indel cleaning parallelism)
val contigs = IntervalScatterFunction.distinctContigs(
qscript.pipeline.getProject.getReferenceFile,
List(qscript.pipeline.getProject.getIntervalList.toString))
for ( sample <- recalibratedSamples ) {
val sampleId = sample.getId
// put unclean bams in unclean genotypers in advance, create the extension files
val bam = sample.getBamFiles.get("recalibrated")
if (!sample.getBamFiles.contains("cleaned")) {
sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam"))
}
val cleaned_bam = sample.getBamFiles.get("cleaned")
val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list")
// create the cleaning commands
val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs
targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId))
targetCreator.jobName = "CreateTargets_"+sampleId
targetCreator.analysisName = "CreateTargets_"+sampleId
targetCreator.input_file :+= bam
targetCreator.out = indel_targets
targetCreator.memoryLimit = Some(2)
targetCreator.isIntermediate = true
val realigner = new IndelRealigner with CommandLineGATKArgs
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
realigner.analysisName = "RealignBam_"+sampleId
realigner.input_file = targetCreator.input_file
realigner.targetIntervals = targetCreator.out
realigner.intervals = Nil
realigner.intervalsString = contigs
realigner.scatterCount = {
if (num_cleaner_scatter_jobs.isDefined)
num_cleaner_scatter_jobs.get min contigs.size
else
contigs.size
}
// if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates
if (realigner.scatterCount > 1) {
realigner.out = cleaned_bam
// While gathering run fix mates.
realigner.setupScatterFunction = {
case scatter: ScatterFunction =>
scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId))
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId))
}
realigner.setupCloneFunction = {
case (clone: CloneFunction, index: Int) =>
clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index))
clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index))
}
realigner.setupGatherFunction = {
case (gather: BamGatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
gather.memoryLimit = Some(6)
gather.jarFile = qscript.picardFixMatesJar
// Don't pass this AS=true to fix mates!
gather.assumeSorted = None
case (gather: GatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName))
}
add(targetCreator,realigner)
} else {
realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam")
realigner.isIntermediate = true
// Explicitly run fix mates if the function won't be scattered.
val fixMates = new PicardBamJarFunction {
// Declare inputs/outputs for dependency tracking.
@Input(doc="unfixed bam") var unfixed: File = _
@Output(doc="fixed bam") var fixed: File = _
def inputBams = List(unfixed)
def outputBam = fixed
}
fixMates.jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
fixMates.memoryLimit = Some(6)
fixMates.jarFile = qscript.picardFixMatesJar
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+sampleId
// Add the fix mates explicitly
add(targetCreator,realigner,fixMates)
}
var samtoolsindex = new SamtoolsIndexFunction
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_cleaned_"+sampleId
samtoolsindex.jobQueue = qscript.short_job_queue
add(samtoolsindex)
}
//endToEnd(projectBase + ".cleaned", "cleaned", adprRscript, seq, expKind)
endToEnd(projectBase + ".cleaned", "cleaned")
} }
} }

View File

@ -2,9 +2,9 @@ package org.broadinstitute.sting.queue
import java.io.File import java.io.File
import java.util.Arrays import java.util.Arrays
import org.broadinstitute.sting.queue.engine.QGraph
import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.engine.{QGraphSettings, QGraph}
/** /**
* Entry point of Queue. Compiles and runs QScripts passed in to the command line. * Entry point of Queue. Compiles and runs QScripts passed in to the command line.
@ -14,38 +14,8 @@ class QCommandLine extends CommandLineProgram with Logging {
@ClassType(classOf[File]) @ClassType(classOf[File])
private var scripts = List.empty[File] private var scripts = List.empty[File]
@Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false)
private var bsubAllJobs = false
@Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false)
private var run = false
@Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false)
private var dotFile: File = _
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
private var expandedDotFile: File = _
@Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false)
private var startFromScratch = false
@Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden
private var runScripts = false
@Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false)
private var getStatus = false
@ArgumentCollection @ArgumentCollection
private val qSettings = new QSettings private val settings = new QGraphSettings
@Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName
@Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
private var statusEmailTo: List[String] = Nil
@Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false)
private var deleteIntermediates = false
/** /**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated * Takes the QScripts passed in, runs their script() methods, retrieves their generated
@ -54,16 +24,8 @@ class QCommandLine extends CommandLineProgram with Logging {
def execute = { def execute = {
val qGraph = new QGraph val qGraph = new QGraph
qGraph.dryRun = !(run || runScripts) qGraph.settings = settings
qGraph.bsubAllJobs = bsubAllJobs
qGraph.startFromScratch = startFromScratch
qGraph.dotFile = dotFile
qGraph.expandedDotFile = expandedDotFile
qGraph.qSettings = qSettings
qGraph.debugMode = debugMode == true qGraph.debugMode = debugMode == true
qGraph.statusEmailFrom = statusEmailFrom
qGraph.statusEmailTo = statusEmailTo
qGraph.deleteIntermediates = deleteIntermediates
val scripts = qScriptManager.createScripts() val scripts = qScriptManager.createScripts()
for (script <- scripts) { for (script <- scripts) {
@ -83,10 +45,7 @@ class QCommandLine extends CommandLineProgram with Logging {
} }
}) })
if (getStatus) qGraph.run
qGraph.checkStatus
else
qGraph.run
if (qGraph.hasFailed) { if (qGraph.hasFailed) {
logger.info("Done with errors") logger.info("Done with errors")

View File

@ -11,6 +11,14 @@ import org.broadinstitute.sting.queue.function.QFunction
class FunctionEdge(var function: QFunction) extends QEdge { class FunctionEdge(var function: QFunction) extends QEdge {
var runner: JobRunner =_ var runner: JobRunner =_
/**
* The number of times this edge has been run.
*/
var retries = 0
/**
* Initializes with the current status of the function.
*/
private var currentStatus = { private var currentStatus = {
val isDone = function.isDone val isDone = function.isDone
val isFail = function.isFail val isFail = function.isFail
@ -22,6 +30,9 @@ class FunctionEdge(var function: QFunction) extends QEdge {
RunnerStatus.PENDING RunnerStatus.PENDING
} }
/**
* Returns the current status of the edge.
*/
def status = { def status = {
if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING)
if (runner != null) if (runner != null)
@ -29,14 +40,21 @@ class FunctionEdge(var function: QFunction) extends QEdge {
currentStatus currentStatus
} }
/**
* Marks this edge as skipped as it is not needed for the current run.
*/
def markAsSkipped() = { def markAsSkipped() = {
currentStatus = RunnerStatus.SKIPPED currentStatus = RunnerStatus.SKIPPED
} }
/**
* Resets the edge to pending status.
*/
def resetToPending(cleanOutputs: Boolean) = { def resetToPending(cleanOutputs: Boolean) = {
currentStatus = RunnerStatus.PENDING currentStatus = RunnerStatus.PENDING
if (cleanOutputs) if (cleanOutputs)
function.deleteOutputs() function.deleteOutputs()
runner = null
} }
def inputs = function.inputs def inputs = function.inputs

View File

@ -33,13 +33,18 @@ trait JobRunner {
IOUtils.writeContents(function.jobOutputFile, content) IOUtils.writeContents(function.jobOutputFile, content)
} }
protected def writeError(content: String) = {
IOUtils.writeContents(functionErrorFile, content)
}
protected def writeStackTrace(e: Throwable) = { protected def writeStackTrace(e: Throwable) = {
val stackTrace = new StringWriter val stackTrace = new StringWriter
val printWriter = new PrintWriter(stackTrace) val printWriter = new PrintWriter(stackTrace)
printWriter.println(function.description) printWriter.println(function.description)
e.printStackTrace(printWriter) e.printStackTrace(printWriter)
printWriter.close printWriter.close
val outputFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile IOUtils.writeContents(functionErrorFile, stackTrace.toString)
IOUtils.writeContents(outputFile, stackTrace.toString)
} }
private def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile
} }

View File

@ -151,9 +151,13 @@ class LsfJobRunner(val function: CommandLineFunction) extends DispatchJobRunner
*/ */
private def tailError() = { private def tailError() = {
val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile
val tailLines = IOUtils.tail(errorFile, 100) if (FileUtils.waitFor(errorFile, 120)) {
val nl = "%n".format() val tailLines = IOUtils.tail(errorFile, 100)
logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) val nl = "%n".format()
logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl)))
} else {
logger.error("Unable to access log file: %s".format(errorFile))
}
} }
/** /**

View File

@ -9,7 +9,7 @@ import org.jgrapht.EdgeFactory
import org.jgrapht.ext.DOTExporter import org.jgrapht.ext.DOTExporter
import java.io.File import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.{QSettings, QException} import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging} import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging}
@ -19,17 +19,10 @@ import org.apache.commons.lang.StringUtils
* The internal dependency tracker between sets of function input and output files. * The internal dependency tracker between sets of function input and output files.
*/ */
class QGraph extends Logging { class QGraph extends Logging {
var dryRun = true var settings: QGraphSettings = _
var bsubAllJobs = false
var startFromScratch = false
var dotFile: File = _
var expandedDotFile: File = _
var qSettings: QSettings = _
var debugMode = false var debugMode = false
var deleteIntermediates = false
var statusEmailFrom: String = _
var statusEmailTo: List[String] = _
private def dryRun = !settings.run
private val jobGraph = newGraph private val jobGraph = newGraph
private var shuttingDown = false private var shuttingDown = false
private val nl = "%n".format() private val nl = "%n".format()
@ -40,7 +33,7 @@ class QGraph extends Logging {
*/ */
def add(command: QFunction) { def add(command: QFunction) {
try { try {
command.qSettings = this.qSettings command.qSettings = settings.qSettings
command.freeze command.freeze
addEdge(new FunctionEdge(command)) addEdge(new FunctionEdge(command))
} catch { } catch {
@ -56,7 +49,10 @@ class QGraph extends Logging {
val numMissingValues = fillGraph val numMissingValues = fillGraph
val isReady = numMissingValues == 0 val isReady = numMissingValues == 0
if (this.dryRun) { if (settings.getStatus) {
logger.info("Checking pipeline status.")
logStatus()
} else if (this.dryRun) {
dryRunJobs() dryRunJobs()
} else if (isReady) { } else if (isReady) {
logger.info("Running jobs.") logger.info("Running jobs.")
@ -76,11 +72,11 @@ class QGraph extends Logging {
private def fillGraph = { private def fillGraph = {
logger.info("Generating graph.") logger.info("Generating graph.")
fill fill
if (dotFile != null) if (settings.dotFile != null)
renderToDot(dotFile) renderToDot(settings.dotFile)
var numMissingValues = validate var numMissingValues = validate
if (numMissingValues == 0 && bsubAllJobs) { if (numMissingValues == 0 && settings.bsubAllJobs) {
logger.info("Generating scatter gather jobs.") logger.info("Generating scatter gather jobs.")
val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge))
@ -103,7 +99,7 @@ class QGraph extends Logging {
logger.info("Regenerating graph.") logger.info("Regenerating graph.")
fill fill
val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile
if (scatterGatherDotFile != null) if (scatterGatherDotFile != null)
renderToDot(scatterGatherDotFile) renderToDot(scatterGatherDotFile)
numMissingValues = validate numMissingValues = validate
@ -124,14 +120,6 @@ class QGraph extends Logging {
} }
} }
def checkStatus = {
// build up the full DAG with scatter-gather jobs
fillGraph
logger.info("Checking pipeline status.")
updateGraphStatus(false)
logStatus
}
/** /**
* Walks up the graph looking for the previous command line edges. * Walks up the graph looking for the previous command line edges.
* @param function Function to examine for a previous command line job. * @param function Function to examine for a previous command line job.
@ -257,12 +245,20 @@ class QGraph extends Logging {
logger.info("Error: " + edge.function.jobErrorFile.getAbsolutePath) logger.info("Error: " + edge.function.jobErrorFile.getAbsolutePath)
} }
/**
* Logs job statuses by traversing the graph and looking for status-related files
*/
private def logStatus() = {
updateGraphStatus(false)
doStatus(status => logger.info(status))
}
/** /**
* Runs the jobs by traversing the graph. * Runs the jobs by traversing the graph.
*/ */
private def runJobs() = { private def runJobs() = {
try { try {
if (startFromScratch) { if (settings.startFromScratch) {
logger.info("Removing outputs from previous runs.") logger.info("Removing outputs from previous runs.")
foreachFunction(_.resetToPending(true)) foreachFunction(_.resetToPending(true))
} else } else
@ -291,8 +287,10 @@ class QGraph extends Logging {
} }
}) })
if (failedJobs.size > 0) if (failedJobs.size > 0) {
emailFailedJobs(failedJobs) emailFailedJobs(failedJobs)
checkRetryJobs(failedJobs)
}
if (readyJobs.size == 0 && runningJobs.size > 0) if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(30000L) Thread.sleep(30000L)
@ -359,7 +357,7 @@ class QGraph extends Logging {
private def newRunner(f: QFunction) = { private def newRunner(f: QFunction) = {
f match { f match {
case cmd: CommandLineFunction => case cmd: CommandLineFunction =>
if (this.bsubAllJobs) if (settings.bsubAllJobs)
new LsfJobRunner(cmd) new LsfJobRunner(cmd)
else else
new ShellJobRunner(cmd) new ShellJobRunner(cmd)
@ -371,18 +369,34 @@ class QGraph extends Logging {
} }
private def emailFailedJobs(failed: List[FunctionEdge]) = { private def emailFailedJobs(failed: List[FunctionEdge]) = {
if (statusEmailTo.size > 0) { if (settings.statusEmailTo.size > 0) {
val emailMessage = new EmailMessage val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom emailMessage.from = settings.statusEmailFrom
emailMessage.to = statusEmailTo emailMessage.to = settings.statusEmailTo
emailMessage.subject = "Queue function: Failure" emailMessage.subject = "Queue function: Failure"
addFailedFunctions(emailMessage, failed) addFailedFunctions(emailMessage, failed)
emailMessage.trySend(qSettings.emailSettings) emailMessage.trySend(settings.qSettings.emailSettings)
}
}
private def checkRetryJobs(failed: List[FunctionEdge]) = {
if (settings.retries > 0) {
for (failedJob <- failed) {
if (failedJob.retries < settings.retries) {
failedJob.retries += 1
failedJob.resetToPending(true)
logger.info("Reset for retry attempt %d of %d: %s".format(
failedJob.retries, settings.retries, failedJob.function.description))
} else {
logger.info("Giving up after retrying %d times: %s".format(
settings.retries, failedJob.function.description))
}
}
} }
} }
private def emailStatus() = { private def emailStatus() = {
if (statusEmailTo.size > 0) { if (settings.statusEmailTo.size > 0) {
var failed = List.empty[FunctionEdge] var failed = List.empty[FunctionEdge]
foreachFunction(edge => { foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) { if (edge.status == RunnerStatus.FAILED) {
@ -391,8 +405,8 @@ class QGraph extends Logging {
}) })
val emailMessage = new EmailMessage val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom emailMessage.from = settings.statusEmailFrom
emailMessage.to = statusEmailTo emailMessage.to = settings.statusEmailTo
emailMessage.body = getStatus + nl emailMessage.body = getStatus + nl
if (failed.size == 0) { if (failed.size == 0) {
emailMessage.subject = "Queue run: Success" emailMessage.subject = "Queue run: Success"
@ -400,7 +414,7 @@ class QGraph extends Logging {
emailMessage.subject = "Queue run: Failure" emailMessage.subject = "Queue run: Failure"
addFailedFunctions(emailMessage, failed) addFailedFunctions(emailMessage, failed)
} }
emailMessage.trySend(qSettings.emailSettings) emailMessage.trySend(settings.qSettings.emailSettings)
} }
} }
@ -417,12 +431,20 @@ class QGraph extends Logging {
|Logs: |Logs:
|%s%n |%s%n
|""".stripMargin.trim.format( |""".stripMargin.trim.format(
failed.map(_.function.description).mkString(nl+nl), failed.map(edge => failedDescription(edge)).mkString(nl+nl),
logs.map(_.getAbsolutePath).mkString(nl)) logs.map(_.getAbsolutePath).mkString(nl))
emailMessage.attachments = logs emailMessage.attachments = logs
} }
private def failedDescription(failed: FunctionEdge) = {
var description = new StringBuilder
if (settings.retries > 0)
description.append("Attempt %d of %d.%n".format(failed.retries + 1, settings.retries + 1))
description.append(failed.function.description)
description.toString
}
private def logFiles(edge: FunctionEdge) = { private def logFiles(edge: FunctionEdge) = {
var failedOutputs = List.empty[File] var failedOutputs = List.empty[File]
failedOutputs :+= edge.function.jobOutputFile failedOutputs :+= edge.function.jobOutputFile
@ -450,13 +472,6 @@ class QGraph extends Logging {
var skipped = 0 var skipped = 0
} }
/**
* Logs job statuses by traversing the graph and looking for status-related files
*/
private def logStatus = {
doStatus(status => logger.info(status))
}
/** /**
* Gets job statuses by traversing the graph and looking for status-related files * Gets job statuses by traversing the graph and looking for status-related files
*/ */
@ -683,7 +698,7 @@ class QGraph extends Logging {
} }
private def deleteIntermediateOutputs() = { private def deleteIntermediateOutputs() = {
if (this.deleteIntermediates && !hasFailed) { if (settings.deleteIntermediates && !hasFailed) {
logger.info("Deleting intermediate files.") logger.info("Deleting intermediate files.")
traverseFunctions(edge => { traverseFunctions(edge => {
if (edge.function.isIntermediate) { if (edge.function.isIntermediate) {

View File

@ -0,0 +1,44 @@
package org.broadinstitute.sting.queue.engine
import java.io.File
import org.broadinstitute.sting.queue.QSettings
import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument}
import org.broadinstitute.sting.queue.util.SystemUtils
/**
* Command line options for a QGraph.
*/
class QGraphSettings {
@ArgumentCollection
val qSettings = new QSettings
@Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false)
var bsubAllJobs = false
@Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false)
var run = false
@Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false)
var dotFile: File = _
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
var expandedDotFile: File = _
@Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false)
var startFromScratch = false
@Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false)
var getStatus = false
@Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName
@Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
var statusEmailTo: List[String] = Nil
@Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false)
var deleteIntermediates = false
@Argument(fullName="retry_failed", shortName="retry", doc="Retry the specified number of times after a command fails. Defaults to no retries.", required=false)
var retries = 0
}

View File

@ -1,7 +1,7 @@
package org.broadinstitute.sting.queue.engine package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.{Logging, ShellJob}
import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob}
/** /**
* Runs jobs one at a time locally * Runs jobs one at a time locally
@ -40,6 +40,16 @@ class ShellJobRunner(val function: CommandLineFunction) extends JobRunner with L
runStatus = RunnerStatus.DONE runStatus = RunnerStatus.DONE
logger.info("Done: " + function.commandLine) logger.info("Done: " + function.commandLine)
} catch { } catch {
case jee: JobExitException =>
runStatus = RunnerStatus.FAILED
try {
function.failOutputs.foreach(_.createNewFile())
writeError(jee.getMessage)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + function.commandLine)
logger.error(jee.stdErr)
case e => case e =>
runStatus = RunnerStatus.FAILED runStatus = RunnerStatus.FAILED
try { try {

View File

@ -20,7 +20,8 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
var splitByContig = false var splitByContig = false
private var referenceSequence: File = _ private var referenceSequence: File = _
private var intervalField: ArgumentSource = _ private var intervalsField: ArgumentSource = _
private var intervalsStringField: ArgumentSource = _
private var intervals: List[String] = Nil private var intervals: List[String] = Nil
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = { def isScatterGatherable(originalFunction: ScatterGatherableFunction) = {
@ -35,18 +36,20 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
this.referenceSequence = gatk.reference_sequence this.referenceSequence = gatk.reference_sequence
this.intervals ++= gatk.intervalsString this.intervals ++= gatk.intervalsString
this.intervals ++= gatk.intervals.map(_.toString) this.intervals ++= gatk.intervals.map(_.toString)
this.intervalField = QFunction.findField(originalFunction.getClass, "intervals") this.intervalsField = QFunction.findField(originalFunction.getClass, "intervals")
this.intervalsStringField = QFunction.findField(originalFunction.getClass, "intervalsString")
} }
def initCloneInputs(cloneFunction: CloneFunction, index: Int) = { def initCloneInputs(cloneFunction: CloneFunction, index: Int) = {
cloneFunction.setFieldValue(this.intervalField, List(new File("scatter.intervals"))) cloneFunction.setFieldValue(this.intervalsField, List(new File("scatter.intervals")))
cloneFunction.setFieldValue(this.intervalsStringField, List.empty[String])
} }
def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = { def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = {
val scatterPart = cloneFunction.getFieldValue(this.intervalField) val scatterPart = cloneFunction.getFieldValue(this.intervalsField)
.asInstanceOf[List[File]] .asInstanceOf[List[File]]
.map(file => IOUtils.subDir(cloneFunction.commandDirectory, file)) .map(file => IOUtils.subDir(cloneFunction.commandDirectory, file))
cloneFunction.setFieldValue(this.intervalField, scatterPart) cloneFunction.setFieldValue(this.intervalsField, scatterPart)
this.scatterParts ++= scatterPart this.scatterParts ++= scatterPart
} }
@ -71,18 +74,18 @@ object IntervalScatterFunction {
locs.toList locs.toList
} }
def countContigs(reference: File, intervals: List[String]) = { def distinctContigs(reference: File, intervals: List[String]) = {
val referenceSource = new ReferenceDataSource(reference) val referenceSource = new ReferenceDataSource(reference)
val locs = parseLocs(referenceSource, intervals) val locs = parseLocs(referenceSource, intervals)
var count = 0
var contig: String = null var contig: String = null
var contigs = List.empty[String]
for (loc <- locs) { for (loc <- locs) {
if (contig != loc.getContig) { if (contig != loc.getContig) {
count += 1
contig = loc.getContig contig = loc.getContig
contigs :+= contig
} }
} }
count contigs
} }
def scatter(reference: File, intervals: List[String], scatterParts: List[File], splitByContig: Boolean) = { def scatter(reference: File, intervals: List[String], scatterParts: List[File], splitByContig: Boolean) = {

View File

@ -5,7 +5,7 @@ import org.broadinstitute.sting.queue.QException
/** /**
* Captures the exit code and error text from a failed process. * Captures the exit code and error text from a failed process.
*/ */
class JobExitException(var exitText: String, var commandLine: Array[String], var exitCode: Int, var stdErr: String) class JobExitException(val exitText: String, val commandLine: Array[String], val exitCode: Int, val stdErr: String)
extends QException("%s%nCommand line:%n%s%nExit code: %s%nStandard error contained: %n%s" extends QException("%s%nCommand line:%n%s%nExit code: %s%nStandard error contained: %n%s"
.format(exitText, commandLine.mkString(" "), exitCode, stdErr)) { .format(exitText, commandLine.mkString(" "), exitCode, stdErr)) {
} }

View File

@ -21,8 +21,9 @@ class IntervalScatterFunctionUnitTest extends BaseTest {
@Test @Test
def testCountContigs = { def testCountContigs = {
Assert.assertEquals(3, IntervalScatterFunction.countContigs(reference, List("1:1-1", "2:1-1", "3:2-2"))) Assert.assertEquals(List("1"), IntervalScatterFunction.distinctContigs(reference, List(BaseTest.validationDataLocation + "chr1_b36_pilot3.interval_list")))
Assert.assertEquals(1, IntervalScatterFunction.countContigs(reference, List(BaseTest.validationDataLocation + "chr1_b36_pilot3.interval_list"))) Assert.assertEquals(List("1","2","3"), IntervalScatterFunction.distinctContigs(reference, List("1:1-1", "2:1-1", "3:2-2")))
Assert.assertEquals(List("1","2","3"), IntervalScatterFunction.distinctContigs(reference, List("2:1-1", "1:1-1", "3:2-2")))
} }
@Test @Test