diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala deleted file mode 100644 index 28d73c7fb..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/DryRunner.scala +++ /dev/null @@ -1,38 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.broadinstitute.sting.queue.util.Logging -import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} - -/** - * Only logs the command to run. Doesn't actually run it. - */ -class DryRunner(function: QFunction) extends JobRunner with Logging { - /** - * Dry runs the function logging the command lines. - * @param function Command to run. - */ - // TODO: Why do we need the dry runner? Can we just use a dryRun() method to log per JobRunner? - def start() = { - function match { - case clf: CommandLineFunction => { - if (logger.isDebugEnabled) { - logger.debug(clf.commandDirectory + " > " + clf.commandLine) - } else { - logger.info(clf.commandLine) - } - logger.info("Output written to " + clf.jobOutputFile) - if (clf.jobErrorFile != null) { - logger.info("Errors written to " + clf.jobErrorFile) - } else { - if (logger.isDebugEnabled) - logger.info("Errors also written to " + clf.jobOutputFile) - } - } - case qFunction => { - logger.info(qFunction.description) - } - } - } - - def status = RunnerStatus.DONE -} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index ce300b7a2..64ac20c53 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -16,7 +16,7 @@ class FunctionEdge(var function: QFunction) extends QEdge { val failOutputs = function.failOutputs if (failOutputs.exists(_.exists)) RunnerStatus.FAILED - else if (doneOutputs.forall(_.exists)) + else if (doneOutputs.size > 0 && doneOutputs.forall(_.exists)) RunnerStatus.DONE else RunnerStatus.PENDING @@ -29,10 +29,10 @@ class FunctionEdge(var function: QFunction) extends QEdge { currentStatus } - def resetPending() = { + def resetToPending() = { currentStatus = RunnerStatus.PENDING - function.doneOutputs.foreach(_.delete) - function.doneOutputs.foreach(_.delete) + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) } def inputs = function.inputs diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 9dda763cf..f8c27bf47 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -49,7 +49,9 @@ class QGraph extends Logging { val numMissingValues = fillGraph val isReady = numMissingValues == 0 - if (isReady || this.dryRun) { + if (this.dryRun) { + dryRunJobs() + } else if (isReady) { logger.info("Running jobs.") runJobs() } @@ -229,16 +231,43 @@ class QGraph extends Logging { numMissingValues } + /** + * Dry-runs the jobs by traversing the graph. + */ + private def dryRunJobs() = { + traverseFunctions(edge => { + edge.function match { + case clf: CommandLineFunction => { + if (logger.isDebugEnabled) { + logger.debug(clf.commandDirectory + " > " + clf.commandLine) + } else { + logger.info(clf.commandLine) + } + logger.info("Output written to " + clf.jobOutputFile) + if (clf.jobErrorFile != null) { + logger.info("Errors written to " + clf.jobErrorFile) + } else { + if (logger.isDebugEnabled) + logger.info("Errors also written to " + clf.jobOutputFile) + } + } + case qFunction => { + logger.info(qFunction.description) + } + } + }) + } + /** * Runs the jobs by traversing the graph. */ private def runJobs() = { - foreachFunction(f => { + traverseFunctions(edge => { val isDone = !this.startClean && - f.status == RunnerStatus.DONE && - this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) + edge.status == RunnerStatus.DONE && + this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE) if (!isDone) - f.resetPending() + edge.resetToPending() }) var readyJobs = getReadyJobs @@ -266,20 +295,16 @@ class QGraph extends Logging { } private def newRunner(f: QFunction) = { - if (this.dryRun) - new DryRunner(f) - else { - f match { - case cmd: CommandLineFunction => - if (this.bsubAllJobs) - new LsfJobRunner(cmd) - else - new ShellJobRunner(cmd) - case inProc: InProcessFunction => - new InProcessRunner(inProc) - case _ => - throw new QException("Unexpected function: " + f) - } + f match { + case cmd: CommandLineFunction => + if (this.bsubAllJobs) + new LsfJobRunner(cmd) + else + new ShellJobRunner(cmd) + case inProc: InProcessFunction => + new InProcessRunner(inProc) + case _ => + throw new QException("Unexpected function: " + f) } } @@ -464,10 +489,27 @@ class QGraph extends Logging { private def foreachFunction(f: (FunctionEdge) => Unit) = { jobGraph.edgeSet.foreach{ case functionEdge: FunctionEdge => f(functionEdge) - case _ => + case map: MappingEdge => /* do nothing for mapping functions */ } } + /** + * Utility function for running a method over all functions, but traversing the nodes in order of dependency. + * @param edgeFunction Function to run for each FunctionEdge. + */ + private def traverseFunctions(f: (FunctionEdge) => Unit) = { + val iterator = new TopologicalOrderIterator(this.jobGraph) + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = { + event.getEdge match { + case functionEdge: FunctionEdge => f(functionEdge) + case map: MappingEdge => /* do nothing for mapping functions */ + } + } + }) + iterator.foreach(_ => {}) + } + /** * Outputs the graph to a .dot file. * http://en.wikipedia.org/wiki/DOT_language @@ -493,7 +535,7 @@ class QGraph extends Logging { * @return true if any of the jobs in the graph have a status of failed. */ def hasFailed = { - this.jobGraph.edgeSet.exists(edge => { + !this.dryRun && this.jobGraph.edgeSet.exists(edge => { edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED }) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala index 754d2761b..078907386 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -8,5 +8,5 @@ import java.io.File trait InProcessFunction extends QFunction { def run() def useStatusOutput(file: File) = true - def description = this.getClass.getSimpleName + def description = (List(this.getClass.getSimpleName) ++ this.outputs.map(_.getAbsolutePath)).mkString(" ") } diff --git a/shell/getFirehosePipelineYaml.sh b/shell/getFirehosePipelineYaml.sh old mode 100644 new mode 100755 index 6966c2785..46e45bc09 --- a/shell/getFirehosePipelineYaml.sh +++ b/shell/getFirehosePipelineYaml.sh @@ -32,24 +32,32 @@ FIREHOSE_ANNOTATIONS=(reference_file dbsnp_file interval_list \ # YAML templates -PROJECT_YAML_TEMPLATE='" \ - project: { \ - name: '"$ENTITY_SET_ID"', \ - referenceFile: %s, \ - dbsnpFile: %s, \ - intervalList: %s \ +PROJECT_YAML_TEMPLATE='"\n\ + project: {\n\ + name: '"$ENTITY_SET_ID"',\n\ + referenceFile: %s,\n\ + dbsnpFile: %s,\n\ + intervalList: %s\n\ },", $1, $2, $3' -SAMPLE_YAML_TEMPLATE='" \ - { \ - id: %s, \ - bamFiles: { recalibrated: %s }, \ - tags: { \ - SQUIDProject: %s, \ - CollaboratorID: %s \ - } \ +SAMPLE_YAML_TEMPLATE='"\n\ + {\n\ + id: %s,\n\ + bamFiles: { recalibrated: %s },\n\ + tags: {\n\ + SQUIDProject: %s,\n\ + CollaboratorID: %s\n\ + }\n\ }", $4, $5, $6, $7' +TEST_AWK_COUNT=`echo '\n' | awk '{print $0}' | wc -c` +if [ "$TEST_AWK_COUNT" -eq 2 ]; then + # Strip the extra \n from the lines if awk of \n is + # a newline and not the two characters slash-n (on mac) + PROJECT_YAML_TEMPLATE="${PROJECT_YAML_TEMPLATE//\\\n/}" + SAMPLE_YAML_TEMPLATE="${SAMPLE_YAML_TEMPLATE//\\\n/}" +fi + index=0 count=${#FIREHOSE_ANNOTATIONS[@]} FIREHOSE_VARIABLES="" diff --git a/shell/mergeText.sh b/shell/mergeText.sh deleted file mode 100755 index 3cd0a4819..000000000 --- a/shell/mergeText.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/sh - -# Merges a set of files, skipping over common headers. - -if [ $# -lt 2 ]; then - echo "Usage: $0 [ .. ]" - exit 1 -elif [ $# -eq 2 ]; then - cp $2 $1 -else - outputFile=$1 - shift - - test -e $outputFile && rm -f $outputFile - - exec 3< $1 - exec 4< $2 - - startLine=1 - while true; do - read -u 3 header1 - if [ $? -ne 0 ]; then break; fi - read -u 4 header2 - if [ $? -ne 0 ]; then break; fi - if [ "$header1" != "$header2" ]; then break; fi - echo "$header1" >> $outputFile - ((startLine++)) - done - - exec 3<&- - exec 4<&- - - for inputFile in $@; do - tail -n +$startLine $inputFile >> $outputFile - done -fi diff --git a/shell/splitIntervals.sh b/shell/splitIntervals.sh deleted file mode 100755 index ca4512673..000000000 --- a/shell/splitIntervals.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/sh - -# Splits an interval list into multiple files - -if [ $# -lt 2 ]; then - echo "Usage: $0 [ .. ]" - exit 1 -else - inputFile=$1 - shift - - totalLines=$(wc -l < $inputFile) - - exec 3< $inputFile - - numHeaders=0 - while true; do - read -u 3 nextLine - if [ $? -ne 0 ]; then break; fi - if [[ $nextLine != @* ]]; then break; fi - ((numHeaders++)) - done - - numFiles=$# - ((numIntervals = totalLines - numHeaders)) - - if [ $numIntervals -lt $numFiles ]; then - echo "Error: Number of intervals $numIntervals is less than the number of files $numFiles." - exec 3<&- - exit 1 - fi - - ((linesPerFile = numIntervals / numFiles)) - ((remainder = numIntervals % numFiles)) - - ((linesPerFile++)) - - fileNumber=0 - for outputFile in $@; do - - # Earlier files with get the remainder until it's no longer needed. - if [ $fileNumber -eq $remainder ]; then ((linesPerFile--)); fi - ((fileNumber++)) - - head -n $numHeaders $inputFile > $outputFile - - for ((line=0; line<$linesPerFile; line++)); do - echo "$nextLine" >> $outputFile - read -u 3 nextLine - if [ $? -ne 0 ]; then break; fi - done - done - - exec 3<&- -fi