A cleaned up functioning early, early access version of Queue for others to play with and provide feedback about next steps.

Current version only has syntatic sugar for accessing the graph via rules ex. "bam" -> "bam.bai", "samtools index ${bam}" and DOES NOT have sugar for constructing your own graph.
Usage info on the internal wiki at https://iwww.broadinstitute.org/gsa/wiki/index.php/Queue


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3420 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-05-23 20:21:09 +00:00
parent a280a0ff0d
commit e9ee55d7dd
22 changed files with 712 additions and 10 deletions

View File

@ -294,14 +294,12 @@
<antcall target="dist"/>
<property name="scala.src" value="scala/src" />
<property name="scala.classes" value="scala/classes" />
<pathconvert property="scalajar.classpath" pathsep=";">
<flattenmapper/>
<fileset dir="${basedir}" includes="dist/*.jar"/>
</pathconvert>
<path id="scala.classpath">
<pathelement location="lib/scala-compiler-2.7.7.jar"/>
<pathelement location="lib/scala-library-2.7.7.jar"/>
<fileset dir="lib">
<include name="scala-compiler-*.jar"/>
<include name="scala-library-*.jar"/>
</fileset>
<fileset dir="${dist.dir}">
<patternset id="scalaStuff">
<include name="*.jar"/>
@ -314,16 +312,61 @@
<mkdir dir="${scala.classes}"/>
<echo>Building Scala...</echo>
<scalac srcdir="${scala.src}" destdir="${scala.classes}" classpathref="scala.classpath" force="changed">
<include name="**/*.scala"/>
<include name="*.scala"/>
</scalac>
<jar jarfile="${dist.dir}/GATKScala.jar">
<fileset dir="${scala.classes}">
<include name="**/*.class"/>
<include name="*.class"/>
</fileset>
</jar>
</target>
<!-- Queue target -->
<target name="queue" description="build Queue">
<antcall target="resolve">
<param name="ivy.conf" value="queue"/>
</antcall>
<antcall target="dist"/>
<property name="queue.src" value="scala/src" />
<property name="queue.classes" value="scala/classes" />
<pathconvert property="queuejar.classpath" pathsep=" ">
<flattenmapper/>
<fileset dir="${dist.dir}" includes="*.jar"/>
</pathconvert>
<path id="queue.classpath">
<fileset dir="lib">
<include name="scala-compiler-*.jar"/>
<include name="scala-library-*.jar"/>
</fileset>
<fileset dir="${dist.dir}">
<patternset>
<include name="*.jar"/>
</patternset>
</fileset>
</path>
<taskdef resource="scala/tools/ant/antlib.xml">
<classpath refid="queue.classpath"/>
</taskdef>
<mkdir dir="${queue.classes}"/>
<echo>Building Queue...</echo>
<scalac srcdir="${queue.src}" destdir="${queue.classes}" classpathref="queue.classpath">
<include name="org/broadinstitute/sting/queue/**/*.scala"/>
</scalac>
<jar jarfile="${dist.dir}/Queue.jar">
<fileset dir="${queue.classes}">
<include name="org/broadinstitute/sting/queue/**/*.class"/>
</fileset>
<manifest>
<attribute name="Main-Class" value="org.broadinstitute.sting.queue.QCommandLine" />
<attribute name="Class-Path" value="${queuejar.classpath}" />
</manifest>
</jar>
</target>
<!-- ***************************************************************************** -->
<!-- *********** Tests and associated tasks ********* -->
<!-- ***************************************************************************** -->

10
ivy.xml
View File

@ -4,6 +4,7 @@
<configurations>
<conf name="default" description="the core dependencies for the GATK"/>
<conf name="scala" extends="default" description="the dependencies for scala"/>
<conf name="queue" extends="scala" description="the dependencies for Queue"/>
</configurations>
<dependencies defaultconf="default">
<dependency org="net.sf" name="sam" rev="latest.integration" conf="default"/>
@ -39,7 +40,12 @@
<dependency org="commons-logging" name="commons-logging" rev="1.1.1" conf="default"/>
<!-- Scala dependancies -->
<dependency org="org.scala-lang" name="scala-compiler" rev="2.7.7" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.7.7" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0.RC2" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0.RC2" conf="scala->default"/>
<!-- Queue additional dependencies -->
<dependency org="commons-lang" name="commons-lang" rev="2.5" conf="queue->default"/>
<dependency org="edu.mit.broad" name="broad-core-all" rev="2.8" conf="queue->default"/>
</dependencies>
</ivy-module>

View File

@ -0,0 +1,73 @@
package org.broadinstitute.sting.queue
import collection.mutable.ListBuffer
import engine.Pipeline
import org.broadinstitute.sting.queue.util.Logging
object QArguments {
val scripts = new ListBuffer[String]
def parseArgs(args: Array[String]) {
filterArgs(args)
}
/**
* Pull out any args that are meant for QCommandLine
*/
private def filterArgs(args: Array[String]) = {
var filtered = new ListBuffer[String]
filtered.appendAll(args)
if (isFlagged(filtered, "-debug"))
Logging.enableDebug
if (isFlagged(filtered, "-dry"))
Pipeline.dryRun = true
if (isFlagged(filtered, "-bsub"))
Pipeline.useBsub = true
for (arg <- getArgs(filtered, "-P"))
Pipeline.addArg(arg)
for (arg <- getArgs(filtered, "-I"))
Pipeline.addFile(arg)
for (arg <- getArgs(filtered, "-S"))
scripts.append(arg)
filtered
}
private def isFlagged(filtered: ListBuffer[String], search: String) = {
var found = false
var index = 0
while (0 <= index && index < filtered.size) {
index = filtered.indexOf(search)
if (index >= 0) {
found = true
filtered.remove(index)
}
}
found
}
private def getArgs(filtered: ListBuffer[String], search: String) = {
var found = new ListBuffer[String]
var index = 0
while (0 <= index && index < filtered.size) {
index = filtered.indexOf(search)
if (index >= 0) {
found.append(filtered(index+1))
filtered.remove(index, 2)
}
}
found
}
def strip(filtered: ListBuffer[String], search: String) = {
var index = 0
while (0 <= index && index < filtered.size) {
index = filtered.indexOf(search)
if (index >= 0) {
filtered.remove(index, 2)
}
}
}}

View File

@ -0,0 +1,50 @@
package org.broadinstitute.sting.queue
import engine.Pipeline
import tools.nsc.MainGenericRunner
import org.broadinstitute.sting.queue.util.ClasspathUtils
import org.apache.log4j._
import collection.mutable.ListBuffer
import org.broadinstitute.sting.queue.util.Logging
object QCommandLine extends Application with Logging {
var usage = """usage: java -jar Queue.jar [ -P name=value ] [ -P file.properties ] [ -I input.file ] [ -I input_files.list ] [ -bsub ] [ -dry ] [ -debug ] -S pipeline.scala"""
override def main(args: Array[String]) = {
try {
QArguments.parseArgs(args.clone)
} catch {
case e: Exception => {
println(usage)
System.exit(-1)
}
}
var newArgs = new ListBuffer[String]
newArgs.appendAll(args)
QArguments.strip(newArgs, "-S")
logger.debug("starting")
if (QArguments.scripts.size == 0) {
println("Error: Missing script")
println(usage)
System.exit(-1)
}
if (Pipeline.inputPaths.size == 0) {
println("Error: No inputs specified")
println(usage)
System.exit(-1)
}
for (script <- QArguments.scripts) {
var clone = newArgs.clone
clone.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, script)
MainGenericRunner.main(clone.toArray)
}
logger.debug("exiting")
}
}

View File

@ -0,0 +1,4 @@
package org.broadinstitute.sting.queue
class QException(private val message: String, private val throwable: Throwable = null)
extends RuntimeException(message, throwable)

View File

@ -0,0 +1,97 @@
package org.broadinstitute.sting.queue.engine
import graphing.JobGrapher
import org.broadinstitute.sting.queue.util.Logging
import scheduling.{DispatchJobScheduler, SimpleJobScheduler}
import java.lang.String
import collection.immutable.List
import collection.JavaConversions._
import java.util.Properties
import java.io.{File, FileInputStream}
import org.broadinstitute.sting.utils.text.XReadLines
import org.broadinstitute.sting.queue.{QException, QArguments}
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object Pipeline extends Logging
{
var inputPaths = List.empty[File]
// TODO: Stop using globals and wrap in an execution environment. Will need when overriding values per command.
var useBsub = false
var dryRun = false
private var argMap = Map.empty[String, String]
private var rules = List.empty[QRule]
/**
* Sugar that allows addRule( inputs -> outputs, command )
*/
def addRule(rule: (Any, Any), command: String): Unit = {
addRule(rule._1, rule._2, command)
}
private def addRule(inputs: Any, outputs: Any, command: String): Unit = {
rules :::= List(new QRule(getFiles(inputs), getFiles(outputs), new QCommand(command)))
}
def run(args: Array[String], inputs: Any, outputs: Any) = {
QArguments.parseArgs(args)
var inputFiles = getFiles(inputs)
var outputFiles = getFiles(outputs)
var grapher = new JobGrapher(inputPaths.map(_.getCanonicalPath), argMap, rules, inputFiles, outputFiles)
val scheduler = useBsub match {
case false => new SimpleJobScheduler(grapher.jobs)
case true => new DispatchJobScheduler(grapher.jobs)
}
scheduler.runJobs
}
/**
* Parses files passed in various sugar forms into a List[QFile]
*/
private def getFiles(files: Any) : List[QFile] = {
files match {
case null => List.empty[QFile]
case Nil => List.empty[QFile]
case path: String => List(new QFile(path))
case file: QFile => List(file)
// Any List or Tuple add the members to this list
case product: Product => {
var list = List.empty[QFile]
for (fileList <- product.productIterator.toList.map(getFiles(_))) {
list :::= fileList
}
list
}
case x => throw new QException("Unknown file type: " + x)
}
}
def addArg(arg: String) = {
var file = new File(arg)
if (arg.contains("=") && !file.exists) {
val tokens = arg.split("=", 2)
argMap = argMap.updated(tokens(0), tokens(1))
} else if (file.exists && arg.endsWith(".properties")) {
var props = new Properties
props.load(new FileInputStream(file))
for ((name, value) <- props)
argMap = argMap.updated(name, value)
}
}
def addFile(arg: String): Unit = {
var file = new File(arg)
if (arg.endsWith(".list")) {
for (line <- new XReadLines(file).iterator) {
addFile(line)
}
} else {
inputPaths = inputPaths ::: List(file)
}
}
}

View File

@ -0,0 +1,9 @@
package org.broadinstitute.sting.queue.engine
/**
* Defines a basic command to run
* TODO: Allow overriding arguments per command such as the job queue
*/
class QCommand(val commandString: String) {
override def toString = commandString
}

View File

@ -0,0 +1,22 @@
package org.broadinstitute.sting.queue.engine
import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder}
import java.io.File
import org.apache.commons.lang.StringUtils
/**
* Represents a file extension along with several tags.
* TODO: Use the tags to map rules between wildcards, ex: *.vcf -> *.eval
*/
class QFile(val fileType: String, val parts: String*) {
val extension = (List(parts:_*) ::: List(fileType)).mkString(".")
override def toString = extension
override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
override def hashCode = HashCodeBuilder.reflectionHashCode(this)
def matchesFile(path: String): Boolean = matchesFile(new File(path))
def matchesFile(file: File): Boolean = file.getCanonicalPath.endsWith(extension)
def baseName(path: String): String = baseName(new File(path))
def baseName(file: File): String = StringUtils.removeEnd(file.getCanonicalPath, extension)
def fullName(baseName: String) = baseName + extension
}

View File

@ -0,0 +1,3 @@
package org.broadinstitute.sting.queue.engine
class QRule (val inputs: List[QFile], val outputs: List[QFile], var command: QCommand)

View File

@ -0,0 +1,87 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.broadinstitute.sting.queue.QException
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.alg.BellmanFordShortestPath
import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.{QCommand, QFile, QRule}
import org.broadinstitute.sting.queue.engine.scheduling.{ExecEdge, ResourceEdge, ResourceNode}
import org.broadinstitute.sting.queue.util.Logging
/**
* A basic job grapher.
* Limitiations:
* - Only walks along graphs with rules that have a single input and a single output.
*/
class JobGrapher(
private val inputFiles: List[String],
private val argMap: Map[String, String],
private val rules: List[QRule],
private val sourceFiles: List[QFile],
private val targetFiles: List[QFile]) extends Logging {
private val modelGraph = new SimpleDirectedGraph[QFile, QCommand](classOf[QCommand])
private val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
createModelGraph()
createJobGraph()
def name = this.getClass.getName
def jobs = jobGraph
private def createJobGraph() = {
var missingPaths = List.empty[Tuple2[QFile,QFile]]
for (sourceFile <- sourceFiles) {
for (targetFile <- targetFiles) {
var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, sourceFile, targetFile)
if (shortestPath == null)
missingPaths = missingPaths ::: List((sourceFile, targetFile))
else
addPaths(shortestPath)
}
}
for ((sourceFile, targetFile) <- missingPaths) {
logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
}
if (missingPaths.size > 0)
throw new QException("Not all inputs and outputs found in the pipeline graph")
}
private def createModelGraph() = {
for (rule <- rules) {
if (rule.inputs.size != 1 || (rule.outputs.size != 1))
throw new QException(this.name + " can only process rules with a single input and a single output. " +
"inputs: " + rule.inputs + ", outputs: " + rule.outputs + ", command: " + rule.command)
var source = rule.inputs.head
var target = rule.outputs.head
modelGraph.addVertex(source)
modelGraph.addVertex(target)
modelGraph.addEdge(source, target, rule.command)
}
}
private def addPaths(shortestPath: java.util.List[QCommand]) {
for (inputFile <- inputFiles)
if (modelGraph.getEdgeSource(shortestPath.head).matchesFile(inputFile))
addPath(shortestPath, inputFile)
}
private def addPath(shortestPath: java.util.List[QCommand], inputFile: String) = {
var sourceFile = inputFile
for (command <- shortestPath) {
val source = modelGraph.getEdgeSource(command)
val target = modelGraph.getEdgeTarget(command)
val baseName = source.baseName(sourceFile)
val targetFile = target.fullName(baseName)
val resourceSource = new ResourceNode(Map(source.extension -> sourceFile))
val resourceTarget = new ResourceNode(Map(target.extension -> targetFile))
val resourceEdge = new ExecEdge(argMap, command)
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
sourceFile = targetFile
}
}
}

View File

@ -0,0 +1,81 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
import management.ManagementFactory
import org.broadinstitute.sting.queue.QException
import java.io.File
import java.util.{ArrayList, Properties}
import org.broadinstitute.sting.queue.engine.Pipeline
/**
* Dispatches jobs to LSF and then returns.
*/
class DispatchJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob]
private var lsfJobIndex = 0
private val jvmName = ManagementFactory.getRuntimeMXBean.getName
private val jobNamePrefix = "Q-" + jvmName
protected def traversedExec(exec: ExecEdge) = {
lsfJobIndex += 1
val props = new Properties
exec.args.foreach(x => props.put(x._1, x._2))
val job = new LocalLsfJob
val jobName = jobNamePrefix + "-" + lsfJobIndex
val outputFile = jobName + ".out"
val errorFile = jobName + ".err"
val workingDir = props.getProperty("jobWorkingDir", ".")
val lsfProject = props.getProperty("jobProject", "Queue")
val queue = props.getProperty("jobQueue", "broad")
val memory = props.getProperty("jobMemory", "2")
var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]")
val sourceJobs = sourceLsfJobs(exec)
if (sourceJobs.size > 0) {
extraArgs :::= List("-w", dependencyExpression(sourceJobs))
}
job.setName(jobName)
job.setExtraBsubArgs(new ArrayList(extraArgs))
job.setProject(lsfProject)
job.setWorkingDir(new File(workingDir))
job.setProject(lsfProject)
job.setCommand(exec.commandString)
job.setOutputFile(new File(workingDir, outputFile))
job.setErrFile(new File(workingDir, errorFile))
job.setQueue(queue)
lsfJobs = lsfJobs.updated(exec, job)
logger.info(job.getBsubCommand.mkString(" "))
if (!Pipeline.dryRun)
job.start
}
/**
* Walks up the graph looking for the previous LsfJobs for this node
*/
private def sourceLsfJobs(edge: ResourceEdge) : List[LocalLsfJob] = {
var sourceJobs = List.empty[LocalLsfJob]
val source = this.jobGraph.getEdgeSource(edge)
for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along this edge and return it's job id
case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec))
// Throw error for a new edge type that we don't know how to handle
case default => throw new QException("Unknown edge type: " + default)
}
}
sourceJobs
}
private def dependencyExpression(jobs: List[LocalLsfJob]) = {
jobs.toSet[LocalLsfJob].map(_.getName).mkString("done(\"", "\") && done(\"", "\")")
}
}

View File

@ -0,0 +1,44 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.QCommand
class ExecEdge(val args: Map[String, String], private val command: QCommand) extends ResourceEdge {
private var convertedCommandString: String = _
def commandString = convertedCommandString
override def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]) = {
// Lookup any variable using the target node, or any of it's input nodes.
val sub = new StrSubstitutor(new NodeLookup(graph.getEdgeTarget(this), graph))
convertedCommandString = sub.replace(command.commandString)
}
class NodeLookup(private val targetNode: ResourceNode, private val graph: DirectedGraph[ResourceNode, ResourceEdge]) extends StrLookup {
def lookup(key: String) = {
var value: String = null
if (args.contains(key))
value = args(key)
else
value = lookup(key, targetNode)
value
}
private def lookup(key: String, node: ResourceNode): String = {
var value: String = null
if (node.resources.contains(key)) {
value = node.resources(key)
} else {
for (edge <- graph.incomingEdgesOf(node)) {
lookup(key, graph.getEdgeSource(edge)) match {
case null => {}
case found => value = found
}
}
}
value
}
}
}

View File

@ -0,0 +1,10 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.broadinstitute.sting.queue.util.Logging
abstract class JobScheduler(protected val jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge])
extends Logging {
def runJobs
def numJobs = jobGraph.edgeSet.size
}

View File

@ -0,0 +1,7 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
abstract class ResourceEdge {
def traverse(graph: DirectedGraph[ResourceNode, ResourceEdge]): Unit
}

View File

@ -0,0 +1,8 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder}
class ResourceNode(val resources: Map[String,String]) {
override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
override def hashCode = HashCodeBuilder.reflectionHashCode(this)
}

View File

@ -0,0 +1,19 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.broadinstitute.sting.queue.util.ProcessUtils
import org.broadinstitute.sting.queue.engine.Pipeline
/**
* Runs jobs one at a time locally
*/
class SimpleJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends TopologicalJobScheduler(jobGraph) {
protected def traversedExec(exec: ExecEdge) = {
var commandString = exec.commandString
logger.info(commandString)
// TODO: Pre-print the commands?
if (!Pipeline.dryRun)
ProcessUtils.runCommandAndWait(commandString, Map.empty[String, String])
}
}

View File

@ -0,0 +1,36 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
import collection.JavaConversions._
/**
* Loops over the job graph running jobs as the edges are traversed
*/
abstract class TopologicalJobScheduler(jobGraph: SimpleDirectedGraph[ResourceNode, ResourceEdge]) extends JobScheduler(jobGraph) {
protected val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) = {
traversed(event.getEdge)
}
})
override def runJobs = {
logger.info(String.format("Running %s jobs.", this.numJobs.toString))
for (target <- iterator) {
// Do nothing for now, let event handler respond
}
}
protected def traversed(edge: ResourceEdge) = {
edge.traverse(this.jobGraph)
edge match {
case exec: ExecEdge => traversedExec(exec)
}
}
protected def traversedExec(exec: ExecEdge)
}

View File

@ -0,0 +1,18 @@
package org.broadinstitute.sting.queue.util
import collection.JavaConversions._
import org.reflections.util.ManifestAwareClasspathHelper
import java.io.File
import javax.print.URIException
import org.apache.commons.lang.StringUtils
/**
* Builds the correct class path by examining the manifests
*/
object ClasspathUtils {
def manifestAwareClassPath = {
var urls = ManifestAwareClasspathHelper.getUrlsForManifestCurrentClasspath
var files = urls.map(url => try {new File(url.toURI)} catch {case urie: URIException => new File(url.getPath)})
files.mkString(File.pathSeparator)
}
}

View File

@ -0,0 +1,31 @@
package org.broadinstitute.sting.queue.util
import org.apache.log4j._
/**
* A mixin to add logging to a class
*/
trait Logging {
val className = this.getClass.getName
lazy val logger = configuredLogger
def configuredLogger = {
Logging.configureLogging
Logger.getLogger(className)
}
}
object Logging {
private var configured = false
private var isDebug = false
def configureLogging() {
if (!configured) {
var root = Logger.getRootLogger
root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} %C{1} - %m %n")))
root.setLevel(if(isDebug) Level.DEBUG else Level.INFO)
configured = true
}
}
def enableDebug = {isDebug = true; Logger.getRootLogger.setLevel(Level.DEBUG)}
}

View File

@ -0,0 +1,46 @@
package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.utils.text.XReadLines
import collection.mutable.ListBuffer
import collection.JavaConversions._
object ProcessUtils extends Logging {
Runtime.getRuntime.addShutdownHook(new Thread {
override def run = for (process <- running.clone) {
logger.warn("Killing: " + process)
process.destroy
}
})
val running = new ListBuffer[Process]()
def runCommandAndWait(command: String, environment: Map[String, String]) = {
logger.debug("Running command: " + command)
var builder = new ProcessBuilder("sh", "-c", command)
for ((key, value) <- environment) {
logger.debug(String.format("adding env: %s = %s", key, value))
builder.environment.put(key, value)
}
var process = builder.start
running += process
var result = process.waitFor
running -= process
if (logger.isDebugEnabled) {
for (line <- new XReadLines(process.getInputStream).iterator) {
logger.debug("command: " + line)
}
for (line <- new XReadLines(process.getErrorStream).iterator) {
logger.error("command: " + line)
}
}
logger.debug("Command exited with result: " + result)
result
}
}

View File

@ -0,0 +1,8 @@
<ivy-module version="1.0">
<!-- .jar borrowed from https://svn.broadinstitute.org/CancerGenomeAnalysis/trunk/analysis_pipeline/process_management/lib/broad-core-all-2.8.jar -->
<info organisation="edu.mit.broad" module="broad-core-all" revision="2.8" status="integration" publication="20100522172500" />
<!-- BroadCore has a lot more dependencies. For now we are only using edu.mit.broad.core.lsf which depends on apache commons-logging -->
<dependencies>
<dependency org="commons-logging" name="commons-logging" rev="1.1.1" />
</dependencies>
</ivy-module>