From e9c6f681a4d0223bff04575e93c0ab23e0f9cea9 Mon Sep 17 00:00:00 2001 From: kshakir Date: Fri, 22 Oct 2010 22:22:30 +0000 Subject: [PATCH] Instead of the pipeline's cleaner only writing BAMs with the target intervals, now pulling the list of contigs from the target intervals and outputing reads in those contigs. Added a brute force -retry option to Queue for transient errors. Waiting up to 2 minutes for the LSF logs to appear before trying to display the errors from the logs. Updates to the local job runner error logging when a job fails. Refactored QGraph's settings as duplicate code was getting out of control. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4563 348d0f76-0448-11de-a6fe-93d51630548a --- scala/qscript/fullCallingPipeline.q | 236 +++++++++--------- .../sting/queue/QCommandLine.scala | 49 +--- .../sting/queue/engine/FunctionEdge.scala | 18 ++ .../sting/queue/engine/JobRunner.scala | 9 +- .../sting/queue/engine/LsfJobRunner.scala | 10 +- .../sting/queue/engine/QGraph.scala | 103 ++++---- .../sting/queue/engine/QGraphSettings.scala | 44 ++++ .../sting/queue/engine/ShellJobRunner.scala | 12 +- .../gatk/IntervalScatterFunction.scala | 21 +- .../sting/queue/util/JobExitException.scala | 2 +- .../IntervalScatterFunctionUnitTest.scala | 5 +- 11 files changed, 284 insertions(+), 225 deletions(-) create mode 100644 scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index a5099e26c..eae68b9f3 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -1,4 +1,3 @@ -import net.sf.picard.reference.FastaSequenceFile import org.broadinstitute.sting.commandline.ArgumentSource import org.broadinstitute.sting.datasources.pipeline.Pipeline import org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeCalculationModel.Model @@ -8,7 +7,7 @@ import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction import org.broadinstitute.sting.queue.extensions.samtools._ import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction} import org.broadinstitute.sting.queue.util.IOUtils -import org.broadinstitute.sting.queue.{QException, QScript} +import org.broadinstitute.sting.queue.QScript import collection.JavaConversions._ import org.broadinstitute.sting.utils.yaml.YamlUtils import org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType @@ -37,6 +36,9 @@ class fullCallingPipeline extends QScript { @Input(doc="per-sample downsampling level",shortName="dcov",required=false) var downsampling_coverage = 300 + @Input(doc="level of parallelism for IndelRealigner. By default uses number of contigs.", shortName="cleanerScatter", required=false) + var num_cleaner_scatter_jobs: Option[Int] = None + @Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false) var num_snp_scatter_jobs = 20 @@ -58,6 +60,9 @@ class fullCallingPipeline extends QScript { @Argument(doc="Job queue for large memory jobs (>4 to 16GB)", shortName="bigMemQueue", required=false) var big_mem_queue: String = _ + @Argument(doc="Job queue for short run jobs (<1hr)", shortName="shortJobQueue", required=false) + var short_job_queue: String = _ + private var pipeline: Pipeline = _ trait CommandLineGATKArgs extends CommandLineGATK { @@ -75,123 +80,118 @@ class fullCallingPipeline extends QScript { pipeline = YamlUtils.load(classOf[Pipeline], qscript.yamlFile) val projectBase: String = qscript.pipeline.getProject.getName - val cleanedBase: String = projectBase + ".cleaned" - val uncleanedBase: String = projectBase + ".uncleaned" - - // there are commands that use all the bam files - val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated")) - //val adprRScript = qscript.adprScript - //val seq = qscript.machine - //val expKind = qscript.protocol - - // count number of contigs (needed for indel cleaning parallelism) - val contigCount = IntervalScatterFunction.countContigs( - qscript.pipeline.getProject.getReferenceFile, - List(qscript.pipeline.getProject.getIntervalList.toString)) - - for ( sample <- recalibratedSamples ) { - val sampleId = sample.getId - // put unclean bams in unclean genotypers in advance, create the extension files - val bam = sample.getBamFiles.get("recalibrated") - if (!sample.getBamFiles.contains("cleaned")) { - sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam")) - } - - val cleaned_bam = sample.getBamFiles.get("cleaned") - val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list") - - // create the cleaning commands - val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs - targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId)) - targetCreator.jobName = "CreateTargets_"+sampleId - targetCreator.analysisName = "CreateTargets_"+sampleId - targetCreator.input_file :+= bam - targetCreator.out = indel_targets - targetCreator.memoryLimit = Some(2) - targetCreator.isIntermediate = true - - val realigner = new IndelRealigner with CommandLineGATKArgs - realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId)) - realigner.analysisName = "RealignBam_"+sampleId - realigner.input_file = targetCreator.input_file - realigner.targetIntervals = targetCreator.out - realigner.scatterCount = contigCount - - // may need to explicitly run fix mates - var fixMates = new PicardBamJarFunction { - jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) - // Declare inputs/outputs for dependency tracking. - @Input(doc="unfixed bam") var unfixed: File = _ - @Output(doc="fixed bam") var fixed: File = _ - def inputBams = List(unfixed) - def outputBam = fixed - } - - // realigner.out = cleaned_bam - // realigner.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = qscript.picardFixMatesJar } - // realigner.jobQueue = "week" - - // if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates - if (realigner.scatterCount > 1) { - realigner.out = cleaned_bam - // While gathering run fix mates. - realigner.setupScatterFunction = { - case scatter: ScatterFunction => - scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId)) - scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId)) - } - realigner.setupCloneFunction = { - case (clone: CloneFunction, index: Int) => - clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index)) - clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index)) - } - realigner.setupGatherFunction = { - case (gather: BamGatherFunction, source: ArgumentSource) => - gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) - gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) - gather.memoryLimit = Some(6) - gather.jarFile = qscript.picardFixMatesJar - // Don't pass this AS=true to fix mates! - gather.assumeSorted = None - case (gather: GatherFunction, source: ArgumentSource) => - gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) - gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName)) - } - } else { - realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam") - realigner.isIntermediate = true - - // Explicitly run fix mates if the function won't be scattered. - - fixMates.memoryLimit = Some(6) - fixMates.jarFile = qscript.picardFixMatesJar - fixMates.unfixed = realigner.out - fixMates.fixed = cleaned_bam - fixMates.analysisName = "FixMates_"+sampleId - // Add the fix mates explicitly - } - - var samtoolsindex = new SamtoolsIndexFunction - samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId)) - samtoolsindex.bamFile = cleaned_bam - samtoolsindex.analysisName = "index_cleaned_"+sampleId - - if (!qscript.skip_cleaning) { - if ( realigner.scatterCount > 1 ) { - add(targetCreator,realigner,samtoolsindex) - } else { - add(targetCreator,realigner,fixMates,samtoolsindex) - } - } - } - - // actually make calls - if (!qscript.skip_cleaning) { - //endToEnd(cleanedBase, "cleaned", adprRscript, seq, expKind) - endToEnd(cleanedBase, "cleaned") + if (qscript.skip_cleaning) { + //endToEnd(projectBase + ".uncleaned", "recalibrated", adprRscript, seq, expKind) + endToEnd(projectBase + ".uncleaned", "recalibrated") } else { - //endToEnd(uncleanedBase, "recalibrated", adprRscript, seq, expKind) - endToEnd(uncleanedBase, "recalibrated") + // there are commands that use all the bam files + val recalibratedSamples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains("recalibrated")) + //val adprRScript = qscript.adprScript + //val seq = qscript.machine + //val expKind = qscript.protocol + + // get contigs (needed for indel cleaning parallelism) + val contigs = IntervalScatterFunction.distinctContigs( + qscript.pipeline.getProject.getReferenceFile, + List(qscript.pipeline.getProject.getIntervalList.toString)) + + for ( sample <- recalibratedSamples ) { + val sampleId = sample.getId + // put unclean bams in unclean genotypers in advance, create the extension files + val bam = sample.getBamFiles.get("recalibrated") + if (!sample.getBamFiles.contains("cleaned")) { + sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam")) + } + + val cleaned_bam = sample.getBamFiles.get("cleaned") + val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list") + + // create the cleaning commands + val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs + targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId)) + targetCreator.jobName = "CreateTargets_"+sampleId + targetCreator.analysisName = "CreateTargets_"+sampleId + targetCreator.input_file :+= bam + targetCreator.out = indel_targets + targetCreator.memoryLimit = Some(2) + targetCreator.isIntermediate = true + + val realigner = new IndelRealigner with CommandLineGATKArgs + realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId)) + realigner.analysisName = "RealignBam_"+sampleId + realigner.input_file = targetCreator.input_file + realigner.targetIntervals = targetCreator.out + realigner.intervals = Nil + realigner.intervalsString = contigs + realigner.scatterCount = { + if (num_cleaner_scatter_jobs.isDefined) + num_cleaner_scatter_jobs.get min contigs.size + else + contigs.size + } + + // if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates + if (realigner.scatterCount > 1) { + realigner.out = cleaned_bam + // While gathering run fix mates. + realigner.setupScatterFunction = { + case scatter: ScatterFunction => + scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId)) + scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId)) + } + realigner.setupCloneFunction = { + case (clone: CloneFunction, index: Int) => + clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index)) + clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index)) + } + realigner.setupGatherFunction = { + case (gather: BamGatherFunction, source: ArgumentSource) => + gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) + gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) + gather.memoryLimit = Some(6) + gather.jarFile = qscript.picardFixMatesJar + // Don't pass this AS=true to fix mates! + gather.assumeSorted = None + case (gather: GatherFunction, source: ArgumentSource) => + gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) + gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName)) + } + + add(targetCreator,realigner) + } else { + realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam") + realigner.isIntermediate = true + + // Explicitly run fix mates if the function won't be scattered. + val fixMates = new PicardBamJarFunction { + // Declare inputs/outputs for dependency tracking. + @Input(doc="unfixed bam") var unfixed: File = _ + @Output(doc="fixed bam") var fixed: File = _ + def inputBams = List(unfixed) + def outputBam = fixed + } + + fixMates.jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) + fixMates.memoryLimit = Some(6) + fixMates.jarFile = qscript.picardFixMatesJar + fixMates.unfixed = realigner.out + fixMates.fixed = cleaned_bam + fixMates.analysisName = "FixMates_"+sampleId + + // Add the fix mates explicitly + add(targetCreator,realigner,fixMates) + } + + var samtoolsindex = new SamtoolsIndexFunction + samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId)) + samtoolsindex.bamFile = cleaned_bam + samtoolsindex.analysisName = "index_cleaned_"+sampleId + samtoolsindex.jobQueue = qscript.short_job_queue + add(samtoolsindex) + } + + //endToEnd(projectBase + ".cleaned", "cleaned", adprRscript, seq, expKind) + endToEnd(projectBase + ".cleaned", "cleaned") } } diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index ee9a35448..e10d5176e 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -2,9 +2,9 @@ package org.broadinstitute.sting.queue import java.io.File import java.util.Arrays -import org.broadinstitute.sting.queue.engine.QGraph import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.queue.util._ +import org.broadinstitute.sting.queue.engine.{QGraphSettings, QGraph} /** * Entry point of Queue. Compiles and runs QScripts passed in to the command line. @@ -14,38 +14,8 @@ class QCommandLine extends CommandLineProgram with Logging { @ClassType(classOf[File]) private var scripts = List.empty[File] - @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false) - private var bsubAllJobs = false - - @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) - private var run = false - - @Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false) - private var dotFile: File = _ - - @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) - private var expandedDotFile: File = _ - - @Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) - private var startFromScratch = false - - @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden - private var runScripts = false - - @Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false) - private var getStatus = false - @ArgumentCollection - private val qSettings = new QSettings - - @Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false) - private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName - - @Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false) - private var statusEmailTo: List[String] = Nil - - @Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false) - private var deleteIntermediates = false + private val settings = new QGraphSettings /** * Takes the QScripts passed in, runs their script() methods, retrieves their generated @@ -54,16 +24,8 @@ class QCommandLine extends CommandLineProgram with Logging { def execute = { val qGraph = new QGraph - qGraph.dryRun = !(run || runScripts) - qGraph.bsubAllJobs = bsubAllJobs - qGraph.startFromScratch = startFromScratch - qGraph.dotFile = dotFile - qGraph.expandedDotFile = expandedDotFile - qGraph.qSettings = qSettings + qGraph.settings = settings qGraph.debugMode = debugMode == true - qGraph.statusEmailFrom = statusEmailFrom - qGraph.statusEmailTo = statusEmailTo - qGraph.deleteIntermediates = deleteIntermediates val scripts = qScriptManager.createScripts() for (script <- scripts) { @@ -83,10 +45,7 @@ class QCommandLine extends CommandLineProgram with Logging { } }) - if (getStatus) - qGraph.checkStatus - else - qGraph.run + qGraph.run if (qGraph.hasFailed) { logger.info("Done with errors") diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index b3bb52d98..f51f2c575 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -11,6 +11,14 @@ import org.broadinstitute.sting.queue.function.QFunction class FunctionEdge(var function: QFunction) extends QEdge { var runner: JobRunner =_ + /** + * The number of times this edge has been run. + */ + var retries = 0 + + /** + * Initializes with the current status of the function. + */ private var currentStatus = { val isDone = function.isDone val isFail = function.isFail @@ -22,6 +30,9 @@ class FunctionEdge(var function: QFunction) extends QEdge { RunnerStatus.PENDING } + /** + * Returns the current status of the edge. + */ def status = { if (currentStatus == RunnerStatus.PENDING || currentStatus == RunnerStatus.RUNNING) if (runner != null) @@ -29,14 +40,21 @@ class FunctionEdge(var function: QFunction) extends QEdge { currentStatus } + /** + * Marks this edge as skipped as it is not needed for the current run. + */ def markAsSkipped() = { currentStatus = RunnerStatus.SKIPPED } + /** + * Resets the edge to pending status. + */ def resetToPending(cleanOutputs: Boolean) = { currentStatus = RunnerStatus.PENDING if (cleanOutputs) function.deleteOutputs() + runner = null } def inputs = function.inputs diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 3be4fc543..16d3058ae 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -33,13 +33,18 @@ trait JobRunner { IOUtils.writeContents(function.jobOutputFile, content) } + protected def writeError(content: String) = { + IOUtils.writeContents(functionErrorFile, content) + } + protected def writeStackTrace(e: Throwable) = { val stackTrace = new StringWriter val printWriter = new PrintWriter(stackTrace) printWriter.println(function.description) e.printStackTrace(printWriter) printWriter.close - val outputFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile - IOUtils.writeContents(outputFile, stackTrace.toString) + IOUtils.writeContents(functionErrorFile, stackTrace.toString) } + + private def functionErrorFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 59f9f9baa..6dab7e210 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -151,9 +151,13 @@ class LsfJobRunner(val function: CommandLineFunction) extends DispatchJobRunner */ private def tailError() = { val errorFile = if (job.errorFile != null) job.errorFile else job.outputFile - val tailLines = IOUtils.tail(errorFile, 100) - val nl = "%n".format() - logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) + if (FileUtils.waitFor(errorFile, 120)) { + val tailLines = IOUtils.tail(errorFile, 100) + val nl = "%n".format() + logger.error("Last %d lines of %s:%n%s".format(tailLines.size, errorFile, tailLines.mkString(nl))) + } else { + logger.error("Unable to access log file: %s".format(errorFile)) + } } /** diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 1d7dda969..59f181da6 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -9,7 +9,7 @@ import org.jgrapht.EdgeFactory import org.jgrapht.ext.DOTExporter import java.io.File import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} -import org.broadinstitute.sting.queue.{QSettings, QException} +import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging} @@ -19,17 +19,10 @@ import org.apache.commons.lang.StringUtils * The internal dependency tracker between sets of function input and output files. */ class QGraph extends Logging { - var dryRun = true - var bsubAllJobs = false - var startFromScratch = false - var dotFile: File = _ - var expandedDotFile: File = _ - var qSettings: QSettings = _ + var settings: QGraphSettings = _ var debugMode = false - var deleteIntermediates = false - var statusEmailFrom: String = _ - var statusEmailTo: List[String] = _ + private def dryRun = !settings.run private val jobGraph = newGraph private var shuttingDown = false private val nl = "%n".format() @@ -40,7 +33,7 @@ class QGraph extends Logging { */ def add(command: QFunction) { try { - command.qSettings = this.qSettings + command.qSettings = settings.qSettings command.freeze addEdge(new FunctionEdge(command)) } catch { @@ -56,7 +49,10 @@ class QGraph extends Logging { val numMissingValues = fillGraph val isReady = numMissingValues == 0 - if (this.dryRun) { + if (settings.getStatus) { + logger.info("Checking pipeline status.") + logStatus() + } else if (this.dryRun) { dryRunJobs() } else if (isReady) { logger.info("Running jobs.") @@ -76,11 +72,11 @@ class QGraph extends Logging { private def fillGraph = { logger.info("Generating graph.") fill - if (dotFile != null) - renderToDot(dotFile) + if (settings.dotFile != null) + renderToDot(settings.dotFile) var numMissingValues = validate - if (numMissingValues == 0 && bsubAllJobs) { + if (numMissingValues == 0 && settings.bsubAllJobs) { logger.info("Generating scatter gather jobs.") val scatterGathers = jobGraph.edgeSet.filter(edge => scatterGatherable(edge)) @@ -103,7 +99,7 @@ class QGraph extends Logging { logger.info("Regenerating graph.") fill - val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile + val scatterGatherDotFile = if (settings.expandedDotFile != null) settings.expandedDotFile else settings.dotFile if (scatterGatherDotFile != null) renderToDot(scatterGatherDotFile) numMissingValues = validate @@ -124,14 +120,6 @@ class QGraph extends Logging { } } - def checkStatus = { - // build up the full DAG with scatter-gather jobs - fillGraph - logger.info("Checking pipeline status.") - updateGraphStatus(false) - logStatus - } - /** * Walks up the graph looking for the previous command line edges. * @param function Function to examine for a previous command line job. @@ -257,12 +245,20 @@ class QGraph extends Logging { logger.info("Error: " + edge.function.jobErrorFile.getAbsolutePath) } + /** + * Logs job statuses by traversing the graph and looking for status-related files + */ + private def logStatus() = { + updateGraphStatus(false) + doStatus(status => logger.info(status)) + } + /** * Runs the jobs by traversing the graph. */ private def runJobs() = { try { - if (startFromScratch) { + if (settings.startFromScratch) { logger.info("Removing outputs from previous runs.") foreachFunction(_.resetToPending(true)) } else @@ -291,8 +287,10 @@ class QGraph extends Logging { } }) - if (failedJobs.size > 0) + if (failedJobs.size > 0) { emailFailedJobs(failedJobs) + checkRetryJobs(failedJobs) + } if (readyJobs.size == 0 && runningJobs.size > 0) Thread.sleep(30000L) @@ -359,7 +357,7 @@ class QGraph extends Logging { private def newRunner(f: QFunction) = { f match { case cmd: CommandLineFunction => - if (this.bsubAllJobs) + if (settings.bsubAllJobs) new LsfJobRunner(cmd) else new ShellJobRunner(cmd) @@ -371,18 +369,34 @@ class QGraph extends Logging { } private def emailFailedJobs(failed: List[FunctionEdge]) = { - if (statusEmailTo.size > 0) { + if (settings.statusEmailTo.size > 0) { val emailMessage = new EmailMessage - emailMessage.from = statusEmailFrom - emailMessage.to = statusEmailTo + emailMessage.from = settings.statusEmailFrom + emailMessage.to = settings.statusEmailTo emailMessage.subject = "Queue function: Failure" addFailedFunctions(emailMessage, failed) - emailMessage.trySend(qSettings.emailSettings) + emailMessage.trySend(settings.qSettings.emailSettings) + } + } + + private def checkRetryJobs(failed: List[FunctionEdge]) = { + if (settings.retries > 0) { + for (failedJob <- failed) { + if (failedJob.retries < settings.retries) { + failedJob.retries += 1 + failedJob.resetToPending(true) + logger.info("Reset for retry attempt %d of %d: %s".format( + failedJob.retries, settings.retries, failedJob.function.description)) + } else { + logger.info("Giving up after retrying %d times: %s".format( + settings.retries, failedJob.function.description)) + } + } } } private def emailStatus() = { - if (statusEmailTo.size > 0) { + if (settings.statusEmailTo.size > 0) { var failed = List.empty[FunctionEdge] foreachFunction(edge => { if (edge.status == RunnerStatus.FAILED) { @@ -391,8 +405,8 @@ class QGraph extends Logging { }) val emailMessage = new EmailMessage - emailMessage.from = statusEmailFrom - emailMessage.to = statusEmailTo + emailMessage.from = settings.statusEmailFrom + emailMessage.to = settings.statusEmailTo emailMessage.body = getStatus + nl if (failed.size == 0) { emailMessage.subject = "Queue run: Success" @@ -400,7 +414,7 @@ class QGraph extends Logging { emailMessage.subject = "Queue run: Failure" addFailedFunctions(emailMessage, failed) } - emailMessage.trySend(qSettings.emailSettings) + emailMessage.trySend(settings.qSettings.emailSettings) } } @@ -417,12 +431,20 @@ class QGraph extends Logging { |Logs: |%s%n |""".stripMargin.trim.format( - failed.map(_.function.description).mkString(nl+nl), + failed.map(edge => failedDescription(edge)).mkString(nl+nl), logs.map(_.getAbsolutePath).mkString(nl)) emailMessage.attachments = logs } + private def failedDescription(failed: FunctionEdge) = { + var description = new StringBuilder + if (settings.retries > 0) + description.append("Attempt %d of %d.%n".format(failed.retries + 1, settings.retries + 1)) + description.append(failed.function.description) + description.toString + } + private def logFiles(edge: FunctionEdge) = { var failedOutputs = List.empty[File] failedOutputs :+= edge.function.jobOutputFile @@ -450,13 +472,6 @@ class QGraph extends Logging { var skipped = 0 } - /** - * Logs job statuses by traversing the graph and looking for status-related files - */ - private def logStatus = { - doStatus(status => logger.info(status)) - } - /** * Gets job statuses by traversing the graph and looking for status-related files */ @@ -683,7 +698,7 @@ class QGraph extends Logging { } private def deleteIntermediateOutputs() = { - if (this.deleteIntermediates && !hasFailed) { + if (settings.deleteIntermediates && !hasFailed) { logger.info("Deleting intermediate files.") traverseFunctions(edge => { if (edge.function.isIntermediate) { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala new file mode 100644 index 000000000..d2389eef3 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -0,0 +1,44 @@ +package org.broadinstitute.sting.queue.engine + +import java.io.File +import org.broadinstitute.sting.queue.QSettings +import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument} +import org.broadinstitute.sting.queue.util.SystemUtils + +/** + * Command line options for a QGraph. + */ +class QGraphSettings { + @ArgumentCollection + val qSettings = new QSettings + + @Argument(fullName="bsub_all_jobs", shortName="bsub", doc="Use bsub to submit jobs", required=false) + var bsubAllJobs = false + + @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) + var run = false + + @Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false) + var dotFile: File = _ + + @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) + var expandedDotFile: File = _ + + @Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) + var startFromScratch = false + + @Argument(fullName="status",shortName="status",doc="Get status of jobs for the qscript",required=false) + var getStatus = false + + @Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false) + var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName + + @Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false) + var statusEmailTo: List[String] = Nil + + @Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false) + var deleteIntermediates = false + + @Argument(fullName="retry_failed", shortName="retry", doc="Retry the specified number of times after a command fails. Defaults to no retries.", required=false) + var retries = 0 +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index aecb79619..2fcb39c43 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -1,7 +1,7 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.util.{Logging, ShellJob} import org.broadinstitute.sting.queue.function.CommandLineFunction +import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob} /** * Runs jobs one at a time locally @@ -40,6 +40,16 @@ class ShellJobRunner(val function: CommandLineFunction) extends JobRunner with L runStatus = RunnerStatus.DONE logger.info("Done: " + function.commandLine) } catch { + case jee: JobExitException => + runStatus = RunnerStatus.FAILED + try { + function.failOutputs.foreach(_.createNewFile()) + writeError(jee.getMessage) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + function.commandLine) + logger.error(jee.stdErr) case e => runStatus = RunnerStatus.FAILED try { diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index 833291f86..9d6f801cb 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -20,7 +20,8 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { var splitByContig = false private var referenceSequence: File = _ - private var intervalField: ArgumentSource = _ + private var intervalsField: ArgumentSource = _ + private var intervalsStringField: ArgumentSource = _ private var intervals: List[String] = Nil def isScatterGatherable(originalFunction: ScatterGatherableFunction) = { @@ -35,18 +36,20 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { this.referenceSequence = gatk.reference_sequence this.intervals ++= gatk.intervalsString this.intervals ++= gatk.intervals.map(_.toString) - this.intervalField = QFunction.findField(originalFunction.getClass, "intervals") + this.intervalsField = QFunction.findField(originalFunction.getClass, "intervals") + this.intervalsStringField = QFunction.findField(originalFunction.getClass, "intervalsString") } def initCloneInputs(cloneFunction: CloneFunction, index: Int) = { - cloneFunction.setFieldValue(this.intervalField, List(new File("scatter.intervals"))) + cloneFunction.setFieldValue(this.intervalsField, List(new File("scatter.intervals"))) + cloneFunction.setFieldValue(this.intervalsStringField, List.empty[String]) } def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = { - val scatterPart = cloneFunction.getFieldValue(this.intervalField) + val scatterPart = cloneFunction.getFieldValue(this.intervalsField) .asInstanceOf[List[File]] .map(file => IOUtils.subDir(cloneFunction.commandDirectory, file)) - cloneFunction.setFieldValue(this.intervalField, scatterPart) + cloneFunction.setFieldValue(this.intervalsField, scatterPart) this.scatterParts ++= scatterPart } @@ -71,18 +74,18 @@ object IntervalScatterFunction { locs.toList } - def countContigs(reference: File, intervals: List[String]) = { + def distinctContigs(reference: File, intervals: List[String]) = { val referenceSource = new ReferenceDataSource(reference) val locs = parseLocs(referenceSource, intervals) - var count = 0 var contig: String = null + var contigs = List.empty[String] for (loc <- locs) { if (contig != loc.getContig) { - count += 1 contig = loc.getContig + contigs :+= contig } } - count + contigs } def scatter(reference: File, intervals: List[String], scatterParts: List[File], splitByContig: Boolean) = { diff --git a/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala index 39c12d2a6..827b6132f 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/JobExitException.scala @@ -5,7 +5,7 @@ import org.broadinstitute.sting.queue.QException /** * Captures the exit code and error text from a failed process. */ -class JobExitException(var exitText: String, var commandLine: Array[String], var exitCode: Int, var stdErr: String) +class JobExitException(val exitText: String, val commandLine: Array[String], val exitCode: Int, val stdErr: String) extends QException("%s%nCommand line:%n%s%nExit code: %s%nStandard error contained: %n%s" .format(exitText, commandLine.mkString(" "), exitCode, stdErr)) { } diff --git a/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala index 2b158cc8f..25cc33157 100644 --- a/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala @@ -21,8 +21,9 @@ class IntervalScatterFunctionUnitTest extends BaseTest { @Test def testCountContigs = { - Assert.assertEquals(3, IntervalScatterFunction.countContigs(reference, List("1:1-1", "2:1-1", "3:2-2"))) - Assert.assertEquals(1, IntervalScatterFunction.countContigs(reference, List(BaseTest.validationDataLocation + "chr1_b36_pilot3.interval_list"))) + Assert.assertEquals(List("1"), IntervalScatterFunction.distinctContigs(reference, List(BaseTest.validationDataLocation + "chr1_b36_pilot3.interval_list"))) + Assert.assertEquals(List("1","2","3"), IntervalScatterFunction.distinctContigs(reference, List("1:1-1", "2:1-1", "3:2-2"))) + Assert.assertEquals(List("1","2","3"), IntervalScatterFunction.distinctContigs(reference, List("2:1-1", "1:1-1", "3:2-2"))) } @Test