From 7f25019f3791817419c3471ec74bb5dcb46e4beb Mon Sep 17 00:00:00 2001 From: kshakir Date: Thu, 7 Oct 2010 19:08:02 +0000 Subject: [PATCH] Inprocess functions by default now log what output files they are running for. On -run cleaning up .done and .fail files for jobs that will be run. Added detection to Firehose YAML generator shell script for (g)awk versions that ignore "\n" in patterns. Removed obsolete mergeText and splitIntervals shell scripts. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4452 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/queue/engine/DryRunner.scala | 38 --------- .../sting/queue/engine/FunctionEdge.scala | 8 +- .../sting/queue/engine/QGraph.scala | 84 ++++++++++++++----- .../queue/function/InProcessFunction.scala | 2 +- shell/getFirehosePipelineYaml.sh | 36 ++++---- shell/mergeText.sh | 36 -------- shell/splitIntervals.sh | 55 ------------ 7 files changed, 90 insertions(+), 169 deletions(-) delete mode 100644 scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala mode change 100644 => 100755 shell/getFirehosePipelineYaml.sh delete mode 100755 shell/mergeText.sh delete mode 100755 shell/splitIntervals.sh diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala deleted file mode 100644 index 28d73c7fb..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala +++ /dev/null @@ -1,38 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.util.Logging -import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} - -/** - * Only logs the command to run. Doesn't actually run it. - */ -class DryRunner(function: QFunction) extends JobRunner with Logging { - /** - * Dry runs the function logging the command lines. - * @param function Command to run. - */ - // TODO: Why do we need the dry runner? Can we just use a dryRun() method to log per JobRunner? - def start() = { - function match { - case clf: CommandLineFunction => { - if (logger.isDebugEnabled) { - logger.debug(clf.commandDirectory + " > " + clf.commandLine) - } else { - logger.info(clf.commandLine) - } - logger.info("Output written to " + clf.jobOutputFile) - if (clf.jobErrorFile != null) { - logger.info("Errors written to " + clf.jobErrorFile) - } else { - if (logger.isDebugEnabled) - logger.info("Errors also written to " + clf.jobOutputFile) - } - } - case qFunction => { - logger.info(qFunction.description) - } - } - } - - def status = RunnerStatus.DONE -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index ce300b7a2..64ac20c53 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -16,7 +16,7 @@ class FunctionEdge(var function: QFunction) extends QEdge { val failOutputs = function.failOutputs if (failOutputs.exists(_.exists)) RunnerStatus.FAILED - else if (doneOutputs.forall(_.exists)) + else if (doneOutputs.size > 0 && doneOutputs.forall(_.exists)) RunnerStatus.DONE else RunnerStatus.PENDING @@ -29,10 +29,10 @@ class FunctionEdge(var function: QFunction) extends QEdge { currentStatus } - def resetPending() = { + def resetToPending() = { currentStatus = RunnerStatus.PENDING - function.doneOutputs.foreach(_.delete) - function.doneOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) } def inputs = function.inputs diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 9dda763cf..f8c27bf47 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -49,7 +49,9 @@ class QGraph extends Logging { val numMissingValues = fillGraph val isReady = numMissingValues == 0 - if (isReady || this.dryRun) { + if (this.dryRun) { + dryRunJobs() + } else if (isReady) { logger.info("Running jobs.") runJobs() } @@ -229,16 +231,43 @@ class QGraph extends Logging { numMissingValues } + /** + * Dry-runs the jobs by traversing the graph. + */ + private def dryRunJobs() = { + traverseFunctions(edge => { + edge.function match { + case clf: CommandLineFunction => { + if (logger.isDebugEnabled) { + logger.debug(clf.commandDirectory + " > " + clf.commandLine) + } else { + logger.info(clf.commandLine) + } + logger.info("Output written to " + clf.jobOutputFile) + if (clf.jobErrorFile != null) { + logger.info("Errors written to " + clf.jobErrorFile) + } else { + if (logger.isDebugEnabled) + logger.info("Errors also written to " + clf.jobOutputFile) + } + } + case qFunction => { + logger.info(qFunction.description) + } + } + }) + } + /** * Runs the jobs by traversing the graph. */ private def runJobs() = { - foreachFunction(f => { + traverseFunctions(edge => { val isDone = !this.startClean && - f.status == RunnerStatus.DONE && - this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) + edge.status == RunnerStatus.DONE && + this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE) if (!isDone) - f.resetPending() + edge.resetToPending() }) var readyJobs = getReadyJobs @@ -266,20 +295,16 @@ class QGraph extends Logging { } private def newRunner(f: QFunction) = { - if (this.dryRun) - new DryRunner(f) - else { - f match { - case cmd: CommandLineFunction => - if (this.bsubAllJobs) - new LsfJobRunner(cmd) - else - new ShellJobRunner(cmd) - case inProc: InProcessFunction => - new InProcessRunner(inProc) - case _ => - throw new QException("Unexpected function: " + f) - } + f match { + case cmd: CommandLineFunction => + if (this.bsubAllJobs) + new LsfJobRunner(cmd) + else + new ShellJobRunner(cmd) + case inProc: InProcessFunction => + new InProcessRunner(inProc) + case _ => + throw new QException("Unexpected function: " + f) } } @@ -464,10 +489,27 @@ class QGraph extends Logging { private def foreachFunction(f: (FunctionEdge) => Unit) = { jobGraph.edgeSet.foreach{ case functionEdge: FunctionEdge => f(functionEdge) - case _ => + case map: MappingEdge => /* do nothing for mapping functions */ } } + /** + * Utility function for running a method over all functions, but traversing the nodes in order of dependency. + * @param edgeFunction Function to run for each FunctionEdge. + */ + private def traverseFunctions(f: (FunctionEdge) => Unit) = { + val iterator = new TopologicalOrderIterator(this.jobGraph) + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = { + event.getEdge match { + case functionEdge: FunctionEdge => f(functionEdge) + case map: MappingEdge => /* do nothing for mapping functions */ + } + } + }) + iterator.foreach(_ => {}) + } + /** * Outputs the graph to a .dot file. * http://en.wikipedia.org/wiki/DOT_language @@ -493,7 +535,7 @@ class QGraph extends Logging { * @return true if any of the jobs in the graph have a status of failed. */ def hasFailed = { - this.jobGraph.edgeSet.exists(edge => { + !this.dryRun && this.jobGraph.edgeSet.exists(edge => { edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED }) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala index 754d2761b..078907386 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -8,5 +8,5 @@ import java.io.File trait InProcessFunction extends QFunction { def run() def useStatusOutput(file: File) = true - def description = this.getClass.getSimpleName + def description = (List(this.getClass.getSimpleName) ++ this.outputs.map(_.getAbsolutePath)).mkString(" ") } diff --git a/shell/getFirehosePipelineYaml.sh b/shell/getFirehosePipelineYaml.sh old mode 100644 new mode 100755 index 6966c2785..46e45bc09 --- a/shell/getFirehosePipelineYaml.sh +++ b/shell/getFirehosePipelineYaml.sh @@ -32,24 +32,32 @@ FIREHOSE_ANNOTATIONS=(reference_file dbsnp_file interval_list \ # YAML templates -PROJECT_YAML_TEMPLATE='" \ - project: { \ - name: '"$ENTITY_SET_ID"', \ - referenceFile: %s, \ - dbsnpFile: %s, \ - intervalList: %s \ +PROJECT_YAML_TEMPLATE='"\n\ + project: {\n\ + name: '"$ENTITY_SET_ID"',\n\ + referenceFile: %s,\n\ + dbsnpFile: %s,\n\ + intervalList: %s\n\ },", $1, $2, $3' -SAMPLE_YAML_TEMPLATE='" \ - { \ - id: %s, \ - bamFiles: { recalibrated: %s }, \ - tags: { \ - SQUIDProject: %s, \ - CollaboratorID: %s \ - } \ +SAMPLE_YAML_TEMPLATE='"\n\ + {\n\ + id: %s,\n\ + bamFiles: { recalibrated: %s },\n\ + tags: {\n\ + SQUIDProject: %s,\n\ + CollaboratorID: %s\n\ + }\n\ }", $4, $5, $6, $7' +TEST_AWK_COUNT=`echo '\n' | awk '{print $0}' | wc -c` +if [ "$TEST_AWK_COUNT" -eq 2 ]; then + # Strip the extra \n from the lines if awk of \n is + # a newline and not the two characters slash-n (on mac) + PROJECT_YAML_TEMPLATE="${PROJECT_YAML_TEMPLATE//\\\n/}" + SAMPLE_YAML_TEMPLATE="${SAMPLE_YAML_TEMPLATE//\\\n/}" +fi + index=0 count=${#FIREHOSE_ANNOTATIONS[@]} FIREHOSE_VARIABLES="" diff --git a/shell/mergeText.sh b/shell/mergeText.sh deleted file mode 100755 index 3cd0a4819..000000000 --- a/shell/mergeText.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/sh - -# Merges a set of files, skipping over common headers. - -if [ $# -lt 2 ]; then - echo "Usage: $0 [ .. ]" - exit 1 -elif [ $# -eq 2 ]; then - cp $2 $1 -else - outputFile=$1 - shift - - test -e $outputFile && rm -f $outputFile - - exec 3< $1 - exec 4< $2 - - startLine=1 - while true; do - read -u 3 header1 - if [ $? -ne 0 ]; then break; fi - read -u 4 header2 - if [ $? -ne 0 ]; then break; fi - if [ "$header1" != "$header2" ]; then break; fi - echo "$header1" >> $outputFile - ((startLine++)) - done - - exec 3<&- - exec 4<&- - - for inputFile in $@; do - tail -n +$startLine $inputFile >> $outputFile - done -fi diff --git a/shell/splitIntervals.sh b/shell/splitIntervals.sh deleted file mode 100755 index ca4512673..000000000 --- a/shell/splitIntervals.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/sh - -# Splits an interval list into multiple files - -if [ $# -lt 2 ]; then - echo "Usage: $0 [ .. ]" - exit 1 -else - inputFile=$1 - shift - - totalLines=$(wc -l < $inputFile) - - exec 3< $inputFile - - numHeaders=0 - while true; do - read -u 3 nextLine - if [ $? -ne 0 ]; then break; fi - if [[ $nextLine != @* ]]; then break; fi - ((numHeaders++)) - done - - numFiles=$# - ((numIntervals = totalLines - numHeaders)) - - if [ $numIntervals -lt $numFiles ]; then - echo "Error: Number of intervals $numIntervals is less than the number of files $numFiles." - exec 3<&- - exit 1 - fi - - ((linesPerFile = numIntervals / numFiles)) - ((remainder = numIntervals % numFiles)) - - ((linesPerFile++)) - - fileNumber=0 - for outputFile in $@; do - - # Earlier files with get the remainder until it's no longer needed. - if [ $fileNumber -eq $remainder ]; then ((linesPerFile--)); fi - ((fileNumber++)) - - head -n $numHeaders $inputFile > $outputFile - - for ((line=0; line<$linesPerFile; line++)); do - echo "$nextLine" >> $outputFile - read -u 3 nextLine - if [ $? -ne 0 ]; then break; fi - done - done - - exec 3<&- -fi