Inprocess functions by default now log what output files they are running for.

On -run cleaning up .done and .fail files for jobs that will be run.
Added detection to Firehose YAML generator shell script for (g)awk versions that ignore "\n" in patterns.
Removed obsolete mergeText and splitIntervals shell scripts.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4452 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-07 19:08:02 +00:00
parent 0de658534d
commit 7f25019f37
7 changed files with 90 additions and 169 deletions

View File

@ -1,38 +0,0 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
/**
* Only logs the command to run. Doesn't actually run it.
*/
class DryRunner(function: QFunction) extends JobRunner with Logging {
/**
* Dry runs the function logging the command lines.
* @param function Command to run.
*/
// TODO: Why do we need the dry runner? Can we just use a dryRun() method to log per JobRunner?
def start() = {
function match {
case clf: CommandLineFunction => {
if (logger.isDebugEnabled) {
logger.debug(clf.commandDirectory + " > " + clf.commandLine)
} else {
logger.info(clf.commandLine)
}
logger.info("Output written to " + clf.jobOutputFile)
if (clf.jobErrorFile != null) {
logger.info("Errors written to " + clf.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + clf.jobOutputFile)
}
}
case qFunction => {
logger.info(qFunction.description)
}
}
}
def status = RunnerStatus.DONE
}

View File

@ -16,7 +16,7 @@ class FunctionEdge(var function: QFunction) extends QEdge {
val failOutputs = function.failOutputs val failOutputs = function.failOutputs
if (failOutputs.exists(_.exists)) if (failOutputs.exists(_.exists))
RunnerStatus.FAILED RunnerStatus.FAILED
else if (doneOutputs.forall(_.exists)) else if (doneOutputs.size > 0 && doneOutputs.forall(_.exists))
RunnerStatus.DONE RunnerStatus.DONE
else else
RunnerStatus.PENDING RunnerStatus.PENDING
@ -29,10 +29,10 @@ class FunctionEdge(var function: QFunction) extends QEdge {
currentStatus currentStatus
} }
def resetPending() = { def resetToPending() = {
currentStatus = RunnerStatus.PENDING currentStatus = RunnerStatus.PENDING
function.doneOutputs.foreach(_.delete) function.doneOutputs.foreach(_.delete())
function.doneOutputs.foreach(_.delete) function.failOutputs.foreach(_.delete())
} }
def inputs = function.inputs def inputs = function.inputs

View File

@ -49,7 +49,9 @@ class QGraph extends Logging {
val numMissingValues = fillGraph val numMissingValues = fillGraph
val isReady = numMissingValues == 0 val isReady = numMissingValues == 0
if (isReady || this.dryRun) { if (this.dryRun) {
dryRunJobs()
} else if (isReady) {
logger.info("Running jobs.") logger.info("Running jobs.")
runJobs() runJobs()
} }
@ -229,16 +231,43 @@ class QGraph extends Logging {
numMissingValues numMissingValues
} }
/**
* Dry-runs the jobs by traversing the graph.
*/
private def dryRunJobs() = {
traverseFunctions(edge => {
edge.function match {
case clf: CommandLineFunction => {
if (logger.isDebugEnabled) {
logger.debug(clf.commandDirectory + " > " + clf.commandLine)
} else {
logger.info(clf.commandLine)
}
logger.info("Output written to " + clf.jobOutputFile)
if (clf.jobErrorFile != null) {
logger.info("Errors written to " + clf.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + clf.jobOutputFile)
}
}
case qFunction => {
logger.info(qFunction.description)
}
}
})
}
/** /**
* Runs the jobs by traversing the graph. * Runs the jobs by traversing the graph.
*/ */
private def runJobs() = { private def runJobs() = {
foreachFunction(f => { traverseFunctions(edge => {
val isDone = !this.startClean && val isDone = !this.startClean &&
f.status == RunnerStatus.DONE && edge.status == RunnerStatus.DONE &&
this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) this.previousFunctions(edge).forall(_.status == RunnerStatus.DONE)
if (!isDone) if (!isDone)
f.resetPending() edge.resetToPending()
}) })
var readyJobs = getReadyJobs var readyJobs = getReadyJobs
@ -266,20 +295,16 @@ class QGraph extends Logging {
} }
private def newRunner(f: QFunction) = { private def newRunner(f: QFunction) = {
if (this.dryRun) f match {
new DryRunner(f) case cmd: CommandLineFunction =>
else { if (this.bsubAllJobs)
f match { new LsfJobRunner(cmd)
case cmd: CommandLineFunction => else
if (this.bsubAllJobs) new ShellJobRunner(cmd)
new LsfJobRunner(cmd) case inProc: InProcessFunction =>
else new InProcessRunner(inProc)
new ShellJobRunner(cmd) case _ =>
case inProc: InProcessFunction => throw new QException("Unexpected function: " + f)
new InProcessRunner(inProc)
case _ =>
throw new QException("Unexpected function: " + f)
}
} }
} }
@ -464,10 +489,27 @@ class QGraph extends Logging {
private def foreachFunction(f: (FunctionEdge) => Unit) = { private def foreachFunction(f: (FunctionEdge) => Unit) = {
jobGraph.edgeSet.foreach{ jobGraph.edgeSet.foreach{
case functionEdge: FunctionEdge => f(functionEdge) case functionEdge: FunctionEdge => f(functionEdge)
case _ => case map: MappingEdge => /* do nothing for mapping functions */
} }
} }
/**
* Utility function for running a method over all functions, but traversing the nodes in order of dependency.
* @param edgeFunction Function to run for each FunctionEdge.
*/
private def traverseFunctions(f: (FunctionEdge) => Unit) = {
val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QEdge]) = {
event.getEdge match {
case functionEdge: FunctionEdge => f(functionEdge)
case map: MappingEdge => /* do nothing for mapping functions */
}
}
})
iterator.foreach(_ => {})
}
/** /**
* Outputs the graph to a .dot file. * Outputs the graph to a .dot file.
* http://en.wikipedia.org/wiki/DOT_language * http://en.wikipedia.org/wiki/DOT_language
@ -493,7 +535,7 @@ class QGraph extends Logging {
* @return true if any of the jobs in the graph have a status of failed. * @return true if any of the jobs in the graph have a status of failed.
*/ */
def hasFailed = { def hasFailed = {
this.jobGraph.edgeSet.exists(edge => { !this.dryRun && this.jobGraph.edgeSet.exists(edge => {
edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED edge.isInstanceOf[FunctionEdge] && edge.asInstanceOf[FunctionEdge].status == RunnerStatus.FAILED
}) })
} }

View File

@ -8,5 +8,5 @@ import java.io.File
trait InProcessFunction extends QFunction { trait InProcessFunction extends QFunction {
def run() def run()
def useStatusOutput(file: File) = true def useStatusOutput(file: File) = true
def description = this.getClass.getSimpleName def description = (List(this.getClass.getSimpleName) ++ this.outputs.map(_.getAbsolutePath)).mkString(" ")
} }

36
shell/getFirehosePipelineYaml.sh 100644 → 100755
View File

@ -32,24 +32,32 @@ FIREHOSE_ANNOTATIONS=(reference_file dbsnp_file interval_list \
# YAML templates # YAML templates
PROJECT_YAML_TEMPLATE='" \ PROJECT_YAML_TEMPLATE='"\n\
project: { \ project: {\n\
name: '"$ENTITY_SET_ID"', \ name: '"$ENTITY_SET_ID"',\n\
referenceFile: %s, \ referenceFile: %s,\n\
dbsnpFile: %s, \ dbsnpFile: %s,\n\
intervalList: %s \ intervalList: %s\n\
},", $1, $2, $3' },", $1, $2, $3'
SAMPLE_YAML_TEMPLATE='" \ SAMPLE_YAML_TEMPLATE='"\n\
{ \ {\n\
id: %s, \ id: %s,\n\
bamFiles: { recalibrated: %s }, \ bamFiles: { recalibrated: %s },\n\
tags: { \ tags: {\n\
SQUIDProject: %s, \ SQUIDProject: %s,\n\
CollaboratorID: %s \ CollaboratorID: %s\n\
} \ }\n\
}", $4, $5, $6, $7' }", $4, $5, $6, $7'
TEST_AWK_COUNT=`echo '\n' | awk '{print $0}' | wc -c`
if [ "$TEST_AWK_COUNT" -eq 2 ]; then
# Strip the extra \n from the lines if awk of \n is
# a newline and not the two characters slash-n (on mac)
PROJECT_YAML_TEMPLATE="${PROJECT_YAML_TEMPLATE//\\\n/}"
SAMPLE_YAML_TEMPLATE="${SAMPLE_YAML_TEMPLATE//\\\n/}"
fi
index=0 index=0
count=${#FIREHOSE_ANNOTATIONS[@]} count=${#FIREHOSE_ANNOTATIONS[@]}
FIREHOSE_VARIABLES="" FIREHOSE_VARIABLES=""

View File

@ -1,36 +0,0 @@
#!/bin/sh
# Merges a set of files, skipping over common headers.
if [ $# -lt 2 ]; then
echo "Usage: $0 <output> <input> [ .. <input> ]"
exit 1
elif [ $# -eq 2 ]; then
cp $2 $1
else
outputFile=$1
shift
test -e $outputFile && rm -f $outputFile
exec 3< $1
exec 4< $2
startLine=1
while true; do
read -u 3 header1
if [ $? -ne 0 ]; then break; fi
read -u 4 header2
if [ $? -ne 0 ]; then break; fi
if [ "$header1" != "$header2" ]; then break; fi
echo "$header1" >> $outputFile
((startLine++))
done
exec 3<&-
exec 4<&-
for inputFile in $@; do
tail -n +$startLine $inputFile >> $outputFile
done
fi

View File

@ -1,55 +0,0 @@
#!/bin/sh
# Splits an interval list into multiple files
if [ $# -lt 2 ]; then
echo "Usage: $0 <input> <output> [ .. <output> ]"
exit 1
else
inputFile=$1
shift
totalLines=$(wc -l < $inputFile)
exec 3< $inputFile
numHeaders=0
while true; do
read -u 3 nextLine
if [ $? -ne 0 ]; then break; fi
if [[ $nextLine != @* ]]; then break; fi
((numHeaders++))
done
numFiles=$#
((numIntervals = totalLines - numHeaders))
if [ $numIntervals -lt $numFiles ]; then
echo "Error: Number of intervals $numIntervals is less than the number of files $numFiles."
exec 3<&-
exit 1
fi
((linesPerFile = numIntervals / numFiles))
((remainder = numIntervals % numFiles))
((linesPerFile++))
fileNumber=0
for outputFile in $@; do
# Earlier files with get the remainder until it's no longer needed.
if [ $fileNumber -eq $remainder ]; then ((linesPerFile--)); fi
((fileNumber++))
head -n $numHeaders $inputFile > $outputFile
for ((line=0; line<$linesPerFile; line++)); do
echo "$nextLine" >> $outputFile
read -u 3 nextLine
if [ $? -ne 0 ]; then break; fi
done
done
exec 3<&-
fi