Replaced pattern matched pipeline spec with annotated objects.

Old version is no longer available.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3558 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-06-15 04:43:46 +00:00
parent b99a5e06f3
commit 32fc221ffe
41 changed files with 839 additions and 531 deletions

View File

@ -42,8 +42,8 @@
<dependency org="commons-io" name="commons-io" rev="1.3.2" conf="default"/>
<!-- Scala dependancies -->
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0.RC2" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0.RC2" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0.RC5" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0.RC5" conf="scala->default"/>
<!-- Queue additional dependencies -->
<dependency org="commons-lang" name="commons-lang" rev="2.5" conf="queue->default"/>

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies the type of an input our output field.
* Retains it during runtime to work around type erasure.
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface ClassType {
Class value();
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an input to a QueueFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Input {
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an internal setting for a QueueFunction.
* Not an input or output but should be copied with a function.
* Internals should have default values that should be handled, i.e. they are always @Optional
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Internal {
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an input or output to a QueueFunction is optional
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Optional {
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an output to a QueueFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Output {
}

View File

@ -8,13 +8,13 @@ import java.io.{FileInputStream, File}
import java.util.Properties
class QArguments(args: Array[String]) {
var useBsub = false
var bsubAllJobs = false
var dryRun = false
val scripts = new ListBuffer[String]
var inputPaths = List.empty[File]
var argMap = Map.empty[String, String]
parseArgs(args)
val userArgs = parseArgs(args)
private def parseArgs(args: Array[String]) = {
var filtered = new ListBuffer[String]
@ -26,13 +26,15 @@ class QArguments(args: Array[String]) {
if (isFlagged(filtered, "-dry"))
dryRun = true
if (isFlagged(filtered, "-bsub"))
useBsub = true
bsubAllJobs = true
for (arg <- getArgs(filtered, "-P"))
addArg(arg)
for (arg <- getArgs(filtered, "-I"))
addFile(arg)
for (arg <- getArgs(filtered, "-S"))
scripts.append(arg)
List(filtered:_*)
}
private def isFlagged(filtered: ListBuffer[String], search: String) = {
@ -65,22 +67,24 @@ class QArguments(args: Array[String]) {
var file = new File(arg)
if (arg.contains("=") && !file.exists) {
val tokens = arg.split("=", 2)
argMap = argMap.updated(tokens(0), tokens(1))
} else if (file.exists && arg.endsWith(".properties")) {
argMap += tokens(0) -> tokens(1)
} else if (arg.endsWith(".properties")) {
if (!file.exists)
throw new QException("File not found: " + file.getAbsolutePath)
var props = new Properties
props.load(new FileInputStream(file))
for ((name, value) <- props)
argMap = argMap.updated(name, value)
argMap += name -> value
} else {
throw new QException("Invalid property: " + arg)
}
}
def addFile(arg: String): Unit = {
var file = new File(arg)
if (arg.endsWith(".list")) {
inputPaths :+= file
if (arg.endsWith(".list"))
new XReadLines(file).iterator.foreach(addFile(_))
} else {
inputPaths = inputPaths ::: List(file)
}
}
}

View File

@ -13,6 +13,7 @@ object QCommandLine extends Application with Logging {
new QArguments(args)
} catch {
case exception => {
println(exception)
println(usage)
System.exit(-1)
}
@ -34,20 +35,13 @@ object QCommandLine extends Application with Logging {
System.exit(-1)
}
if (qArgs.inputPaths.size == 0) {
println("Error: No inputs specified")
println(usage)
System.exit(-1)
}
val newArgs = new ListBuffer[String]
newArgs.appendAll(args)
QArguments.strip(newArgs, "-S")
newArgs.prepend("-nocompdaemon", "-classpath", ClasspathUtils.manifestAwareClassPath, qArgs.scripts.head)
MainGenericRunner.main(newArgs.toArray)
// NOTE: This line is not reached because something in MainGenericRunner is exiting the VM.
// NOTE: This line is not reached because the MainGenericRunner exits the VM.
logger.debug("exiting")
}
}

View File

@ -0,0 +1,103 @@
package org.broadinstitute.sting.queue
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.engine.QGraph
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object QScript {
// Type aliases so users don't have to import
type File = java.io.File
type Input = org.broadinstitute.sting.queue.util.Input
type Output = org.broadinstitute.sting.queue.util.Output
type Optional = org.broadinstitute.sting.queue.util.Optional
type ClassType = org.broadinstitute.sting.queue.util.ClassType
type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction
// The arguments for executing pipelines
private var qArgs: QArguments = _
// A default pipeline. Can also use multiple 'new Pipeline()'
private val pipeline = new Pipeline
/**
* Initializes the QArguments and returns a list of the rest of the user args.
*/
def setArgs(params: Array[String]) = {
qArgs = new QArguments(params)
qArgs.userArgs
}
/**
* Returns a list of files that were specified with "-I <file>" on the command line
* or inside a .list file.
*/
def inputs(extension: String) = qArgs.inputPaths.filter(_.getName.endsWith(extension))
/**
* Exchanges the extension on a file.
*/
def swapExt(file: File, oldExtension: String, newExtension: String) =
new File(file.getName.stripSuffix(oldExtension) + newExtension)
/**
* Adds one or more command line functions for dispatch later during run()
*/
def add(functions: CommandLineFunction*) = pipeline.add(functions:_*)
/**
* Sets the @Input and @Output values for all the functions
*/
def setParams(): Unit = pipeline.setParams()
/**
* Sets the @Input and @Output values for a single function
*/
def setParams(function: CommandLineFunction): Unit = pipeline.setParams(function)
/**
* Executes functions that have been added to the pipeline.
*/
def run() = pipeline.run()
/**
* Encapsulates a set of functions to run together.
*/
protected class Pipeline {
private var functions = List.empty[CommandLineFunction]
/**
* Adds one or more command line functions for dispatch later during run()
*/
def add(functions: CommandLineFunction*) =
this.functions :::= List(functions:_*)
/**
* Sets the @Input and @Output values for all the functions
*/
def setParams(): Unit =
for (function <- functions) setParams(function)
/**
* Sets the @Input and @Output values for a single function
*/
def setParams(function: CommandLineFunction): Unit =
for ((name, value) <- qArgs.argMap) function.setValue(name, value)
/**
* Executes functions that have been added to the pipeline.
*/
def run() = {
val qGraph = new QGraph
qGraph.dryRun = qArgs.dryRun
qGraph.bsubAllJobs = qArgs.bsubAllJobs
for (function <- functions)
qGraph.add(function)
qGraph.fillIn
qGraph.run
}
}
}

View File

@ -0,0 +1,17 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.{Logging, ProcessUtils}
import org.broadinstitute.sting.queue.function.CommandLineFunction
/**
* Runs jobs one at a time locally
*/
trait CommandLineRunner extends Logging {
def run(function: CommandLineFunction, qGraph: QGraph) = {
var commandLine = function.commandLine
logger.info(commandLine)
if (!qGraph.dryRun)
ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory)
}
}

View File

@ -0,0 +1,56 @@
package org.broadinstitute.sting.queue.engine
import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
import management.ManagementFactory
import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction}
trait DispatchJobRunner {
type DispatchJobType
private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType]
protected def newJobName = DispatchJobRunner.nextJobName
def dispatch(function: DispatchFunction, qGraph: QGraph)
protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) =
dispatchJobs += function -> dispatchJob
/**
* Walks up the graph looking for the previous LsfJobs
*/
protected def previousJobs(function: QFunction, qGraph: QGraph) : List[DispatchJobType] = {
var previous = List.empty[DispatchJobType]
val source = qGraph.jobGraph.getEdgeSource(function)
for (incomingEdge <- qGraph.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along the edge and return its job id
case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction)
// For any other type of edge find the LSF jobs preceding the edge
case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph)
}
}
previous
}
}
object DispatchJobRunner {
private val jobNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
private var jobIndex = 0
private def nextJobName = {
jobIndex += 1
jobNamePrefix + "-" + jobIndex
}
}

View File

@ -0,0 +1,57 @@
package org.broadinstitute.sting.queue.engine
import collection.JavaConversions._
import edu.mit.broad.core.lsf.LocalLsfJob
import org.broadinstitute.sting.queue.function.DispatchFunction
import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.util.Logging
trait LsfJobRunner extends DispatchJobRunner with Logging {
type DispatchJobType = LocalLsfJob
def dispatch(function: DispatchFunction, qGraph: QGraph) = {
var jobName = function.jobName
if (jobName == null)
jobName = newJobName
var jobOutputFile = function.jobOutputFile
if (jobOutputFile == null)
jobOutputFile = new File(jobName + ".out")
var jobErrorFile = function.jobErrorFile
if (jobErrorFile == null)
jobErrorFile = new File(jobName + ".err")
val job = new LocalLsfJob
job.setName(jobName)
job.setOutputFile(jobOutputFile)
job.setErrFile(jobErrorFile)
job.setWorkingDir(function.commandDirectory)
job.setProject(function.jobProject)
job.setQueue(function.jobQueue)
job.setCommand(function.commandLine)
var extraArgs = List("-r")
if (function.memoryLimit.isDefined)
extraArgs :::= List("-R", "rusage[mem=" + function.memoryLimit.get + "]")
val previous = previousJobs(function, qGraph)
if (previous.size > 0)
extraArgs :::= List("-w", dependencyExpression(previous))
job.setExtraBsubArgs(new ArrayList(extraArgs))
addJob(function, job)
logger.info(job.getBsubCommand.mkString(" "))
if (!qGraph.dryRun)
job.start
}
private def dependencyExpression(jobs: List[LocalLsfJob]) = {
jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")")
}
}

View File

@ -1,16 +0,0 @@
package org.broadinstitute.sting.queue.engine
import graphing.{ExplicitJobGrapher, TreeJobGrapher}
import scheduling.ResourceNode
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object Pipeline {
def addRule(rule: (Any, Any), commandString: String): Unit = TreeJobGrapher.addRule(rule, commandString)
def run(args: Array[String], sources: Any, targets: Any): Unit = TreeJobGrapher.run(args, sources, targets)
def node() = ExplicitJobGrapher.node()
def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = ExplicitJobGrapher.addEdge(rule, commandString)
def run(): Unit = ExplicitJobGrapher.run()
}

View File

@ -1,9 +0,0 @@
package org.broadinstitute.sting.queue.engine
/**
* Defines a basic command to run
* TODO: Allow overriding arguments per command such as the job queue
*/
class QCommand(val commandString: String) extends QModelEdge {
override def toString = commandString
}

View File

@ -1,43 +0,0 @@
package org.broadinstitute.sting.queue.engine
import org.apache.commons.lang.builder.{EqualsBuilder, HashCodeBuilder}
import java.io.File
import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.QException
/**
* Represents a file extension along with several tags.
* TODO: Use the tags to map rules between wildcards, ex: *.vcf -> *.eval
*/
class QFile(val fileType: String, val parts: String*) {
val extension = (List(parts:_*) ::: List(fileType)).mkString(".")
override def toString = extension
override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
override def hashCode = HashCodeBuilder.reflectionHashCode(this)
def matchesFile(path: String): Boolean = matchesFile(new File(path))
def matchesFile(file: File): Boolean = file.getAbsolutePath.endsWith(extension)
def baseName(path: String): String = baseName(new File(path))
def baseName(file: File): String = StringUtils.removeEnd(file.getAbsolutePath, extension)
def fullName(baseName: String) = baseName + extension
}
object QFile {
def getFiles(files: Any) : List[QFile] = {
files match {
case null => List.empty[QFile]
case Nil => List.empty[QFile]
case path: String => List(new QFile(path))
case file: QFile => List(file)
// Any List or Tuple add the members to this list
case product: Product => {
var list = List.empty[QFile]
for (fileList <- product.productIterator.toList.map(getFiles(_))) {
list :::= fileList
}
list
}
case x => throw new QException("Unknown file type: " + x)
}
}
}

View File

@ -0,0 +1,71 @@
package org.broadinstitute.sting.queue.engine
import org.jgrapht.graph.SimpleDirectedGraph
import scala.collection.JavaConversions
import scala.collection.immutable.ListMap
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction}
class QGraph extends Logging {
var dryRun = true
var bsubAllJobs = false
val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction])
def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
def add(command: CommandLineFunction) {
add(command, true)
}
/**
* Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs.
*/
def fillIn = {
// clone since edgeSet is backed by the graph
for (function <- JavaConversions.asSet(jobGraph.edgeSet).clone) {
val inputs = function.inputs
val outputs = function.outputs
if (inputs.size > 1)
for ((name, input) <- inputs)
addNullEdge(ListMap(name -> input), inputs)
if (outputs.size > 1)
for ((name, output) <- outputs)
addNullEdge(outputs, ListMap(name -> output))
}
}
def run = {
var isReady = true
for (function <- JavaConversions.asSet(jobGraph.edgeSet)) {
val missingValues = function.missingValues
if (missingValues.size > 0) {
isReady = false
logger.error(function match {
case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine)
case x => "Missing values:"
})
for (missing <- missingValues) {
logger.error(" " + missing)
}
}
}
if (isReady || this.dryRun)
(new TopologicalJobScheduler(this) with LsfJobRunner).runJobs
}
private def add(f: QFunction, replace: Boolean) {
val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
jobGraph.addVertex(inputs)
jobGraph.addVertex(outputs)
if (replace)
jobGraph.removeAllEdges(inputs, outputs)
jobGraph.addEdge(inputs, outputs, f)
}
private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = {
add(new MappingFunction(input, output), false)
}
}

View File

@ -1,7 +0,0 @@
package org.broadinstitute.sting.queue.engine
/**
* Used for modeling before an edge gets
* replicated into an actual ResourceEdge
*/
class QModelEdge

View File

@ -0,0 +1,6 @@
package org.broadinstitute.sting.queue.engine
/**
* Represents a state between QFunctions the directed acyclic QGraph
*/
case class QNode (private val items: Set[Any])

View File

@ -1,3 +0,0 @@
package org.broadinstitute.sting.queue.engine
class QRule (val inputs: List[QFile], val outputs: List[QFile], var command: QCommand)

View File

@ -0,0 +1,31 @@
package org.broadinstitute.sting.queue.engine
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, DispatchFunction, QFunction}
/**
* Loops over the job graph running jobs as the edges are traversed
*/
abstract class TopologicalJobScheduler(private val qGraph: QGraph)
extends CommandLineRunner with DispatchJobRunner with Logging {
protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph)
case f: CommandLineFunction => run(f, qGraph)
case f: MappingFunction => /* do nothing for mapping functions */
}
})
def runJobs = {
logger.info("Number of jobs: %s".format(this.qGraph.numJobs))
for (target <- iterator) {
// Do nothing for now, let event handler respond
}
}
}

View File

@ -1,27 +0,0 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.broadinstitute.sting.queue.engine.scheduling.{ResourceNode, ExecEdge}
import org.broadinstitute.sting.queue.engine.QCommand
class ExplicitJobGrapher extends JobGrapher
object ExplicitJobGrapher {
private val grapher = new ExplicitJobGrapher
def node() = new ResourceNode
def addEdge(rule: (ResourceNode, ResourceNode), commandString: String): Unit = {
addEdge(rule._1, rule._2, new QCommand(commandString))
}
private def addEdge(source: ResourceNode, target: ResourceNode, command: QCommand) = {
val resourceEdge = new ExecEdge(command.commandString)
grapher.jobGraph.addVertex(source)
grapher.jobGraph.addVertex(target)
grapher.jobGraph.addEdge(source, target, resourceEdge)
}
def run() = {
grapher.run()
}
}

View File

@ -1,31 +0,0 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.broadinstitute.sting.queue.QArguments
import org.broadinstitute.sting.queue.engine.scheduling._
import org.broadinstitute.sting.queue.util.Logging
import org.jgrapht.graph.SimpleDirectedGraph
abstract class JobGrapher() extends Logging {
/**
* Jobs to be run.
* Can be populated adhoc or during createJobGraph()
*/
protected val jobGraph = new SimpleDirectedGraph[ResourceNode, ResourceEdge](classOf[ResourceEdge])
var qArgs: QArguments = _
def run() = {
createJobGraph()
val scheduler = createScheduler()
scheduler.runJobs
}
protected def createJobGraph() = {}
private def createScheduler() : JobScheduler = {
qArgs.useBsub match {
case false => new SimpleJobScheduler(jobGraph, qArgs)
case true => new DispatchJobScheduler(jobGraph, qArgs)
}
}
}

View File

@ -1,143 +0,0 @@
package org.broadinstitute.sting.queue.engine.graphing
import org.jgrapht.graph.SimpleDirectedGraph
import org.jgrapht.alg.BellmanFordShortestPath
import org.broadinstitute.sting.queue.{QArguments, QException}
import collection.mutable.ListBuffer
import collection.JavaConversions._
import org.broadinstitute.sting.queue.engine.scheduling.{ResourceEdge, ExecEdge, MapResourceNode}
import org.broadinstitute.sting.queue.engine.{QModelEdge, QCommand, QFile, QRule}
/**
* Converts a set of rules provided by the user and a list of files into a graph of jobs to run
*/
class TreeJobGrapher extends JobGrapher {
private val modelGraph = new SimpleDirectedGraph[List[QFile], QModelEdge](classOf[QModelEdge])
private val rules = new ListBuffer[QRule]
private var sourceFiles = List.empty[QFile]
private var targetFiles = List.empty[QFile]
// Used to tag a model edge for Element <-> List
private class QCollectionEdge extends QModelEdge
override protected def createJobGraph() = {
createModelGraph()
var missingPaths = List.empty[Tuple2[QFile,QFile]]
for (sourceFile <- sourceFiles) {
for (targetFile <- targetFiles) {
var shortestPath = BellmanFordShortestPath.findPathBetween(modelGraph, List(sourceFile), List(targetFile))
if (shortestPath == null)
missingPaths = missingPaths ::: List((sourceFile, targetFile))
else
addPaths(shortestPath, qArgs)
}
}
for ((sourceFile, targetFile) <- missingPaths) {
logger.error(String.format("No command path found between %s --> %s", sourceFile, targetFile))
}
if (missingPaths.size > 0)
throw new QException("Not all inputs and outputs found in the pipeline graph")
}
private def createModelGraph() = {
// Look for rules with more than one input or output and add
// internal dependencies between the elements and the list
for (rule <- rules) {
if (rule.inputs.size > 1) {
for (input <- rule.inputs) {
modelGraph.addVertex(List(input))
modelGraph.addVertex(rule.inputs)
modelGraph.addEdge(List(input), rule.inputs, new QCollectionEdge)
}
}
if (rule.outputs.size > 1) {
for (output <- rule.outputs) {
modelGraph.addVertex(rule.outputs)
modelGraph.addVertex(List(output))
modelGraph.addEdge(rule.outputs, List(output), new QCollectionEdge)
}
}
}
// Add the explicit rules
for (rule <- rules) {
modelGraph.addVertex(rule.inputs)
modelGraph.addVertex(rule.outputs)
modelGraph.addEdge(rule.inputs, rule.outputs, rule.command)
}
}
private def addPaths(shortestPath: java.util.List[QModelEdge], qArgs: QArguments) {
for (inputFile <- qArgs.inputPaths.map(_.getAbsolutePath)) {
val source = modelGraph.getEdgeSource(shortestPath.head).head
if (source.matchesFile(inputFile)) {
val baseName = source.baseName(inputFile)
val target = modelGraph.getEdgeTarget(shortestPath.last).head
addPathsToTarget(baseName, List(target))
}
}
}
private def addPathsToTarget(baseName: String, targets: List[QFile]) : Unit = {
for (command <- modelGraph.incomingEdgesOf(targets)) {
val sources = modelGraph.getEdgeSource(command)
addJobGraphEdge(baseName, sources, targets, command)
addPathsToTarget(baseName, sources)
}
}
private def addJobGraphEdge(baseName: String, sources: List[QFile], targets: List[QFile], command: QModelEdge) {
val resourceSource = new MapResourceNode(mapFiles(baseName, sources))
val resourceTarget = new MapResourceNode(mapFiles(baseName, targets))
val resourceEdge = command match {
case qCommand: QCommand => new ExecEdge(qCommand.commandString)
case qTransition: QCollectionEdge => new ResourceEdge
}
jobGraph.addVertex(resourceSource)
jobGraph.addVertex(resourceTarget)
jobGraph.addEdge(resourceSource, resourceTarget, resourceEdge)
}
/**
* Creates a mapping of the files based on the baseName.
* key: file extension
* value: full name
* Used by the JobScheduler to lookup values as the commands are expanded at exec time.
*/
private def mapFiles(baseName: String, files: List[QFile]) : Map[String, String] = {
Map(files.map(file => (file.extension, file.fullName(baseName))):_*)
}
}
/**
* Syntactic sugar for filling in a pipeline using a Scala script.
*/
object TreeJobGrapher {
private val grapher = new TreeJobGrapher
/**
* Sugar that allows addRule( inputs -> outputs, command )
*/
def addRule(rule: (Any, Any), commandString: String): Unit = {
val inputs = QFile.getFiles(rule._1)
val outputs = QFile.getFiles(rule._2)
val command = new QCommand(commandString)
addRule(inputs, outputs, command)
}
private def addRule(inputs: List[QFile], outputs: List[QFile], command: QCommand): Unit = {
grapher.rules += new QRule(inputs, outputs, command)
}
def run(args: Array[String], sources: Any, targets: Any) = {
grapher.qArgs = new QArguments(args)
grapher.sourceFiles = QFile.getFiles(sources)
grapher.targetFiles = QFile.getFiles(targets)
grapher.run()
}
}

View File

@ -1,79 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
import management.ManagementFactory
import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.QArguments
/**
* Dispatches jobs to LSF and then returns.
*/
class DispatchJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends TopologicalJobScheduler(jobGraph, qArgs) {
private var lsfJobs = Map.empty[ExecEdge, LocalLsfJob]
private var lsfJobIndex = 0
private val jvmName = ManagementFactory.getRuntimeMXBean.getName
private val jobNamePrefix = "Q-" + jvmName
def processExec(exec: ExecEdge) = {
lsfJobIndex += 1
val job = new LocalLsfJob
val jobName = jobNamePrefix + "-" + lsfJobIndex
val outputFile = jobName + ".out"
val errorFile = jobName + ".err"
val workingDir = lookup(exec, "jobWorkingDir", ".")
val lsfProject = lookup(exec, "jobProject", "Queue")
val queue = lookup(exec, "jobQueue", "broad")
val memory = lookup(exec, "jobMemory", "2")
var extraArgs = List("-r", "-R", "rusage[mem=" + memory + "]")
val sourceJobs = sourceLsfJobs(exec)
if (sourceJobs.size > 0) {
extraArgs :::= List("-w", dependencyExpression(sourceJobs))
}
job.setName(jobName)
job.setExtraBsubArgs(new ArrayList(extraArgs))
job.setProject(lsfProject)
job.setWorkingDir(new File(workingDir))
job.setProject(lsfProject)
job.setCommand(exec.commandString)
job.setOutputFile(new File(workingDir, outputFile))
job.setErrFile(new File(workingDir, errorFile))
job.setQueue(queue)
lsfJobs = lsfJobs.updated(exec, job)
logger.info(job.getBsubCommand.mkString(" "))
if (!qArgs.dryRun)
job.start
}
/**
* Walks up the graph looking for the previous LsfJobs for this node
*/
private def sourceLsfJobs(edge: ResourceEdge) : List[LocalLsfJob] = {
var sourceJobs = List.empty[LocalLsfJob]
val source = this.jobGraph.getEdgeSource(edge)
for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along this edge and return it's job id
case exec: ExecEdge => sourceJobs :::= List(lsfJobs(exec))
// For any other type of edge find the LSF jobs preceeding the edge
case resourceEdge: ResourceEdge => sourceJobs :::= sourceLsfJobs(resourceEdge)
}
}
sourceJobs
}
private def dependencyExpression(jobs: List[LocalLsfJob]) = {
jobs.toSet[LocalLsfJob].map(_.getName).mkString("ended(\"", "\") && ended(\"", "\")")
}
}

View File

@ -1,17 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.apache.commons.lang.text.{StrLookup, StrSubstitutor}
import java.lang.String
class ExecEdge(private val templateCommandString: String)
extends ResourceEdge {
private var convertedCommandString: String = _
def commandString = convertedCommandString
override def traverse(graph: JobScheduler) = {
// Lookup any variable using the target node, or any of it's input nodes.
val sub = new StrSubstitutor(new StrLookup { def lookup(key: String) = graph.lookup(ExecEdge.this, key, null) })
convertedCommandString = sub.replace(templateCommandString)
graph.processExec(this)
}
}

View File

@ -1,65 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
import org.broadinstitute.sting.queue.util.Logging
import collection.JavaConversions._
import org.broadinstitute.sting.queue.QArguments
abstract class JobScheduler(protected val jobGraph: DirectedGraph[ResourceNode, ResourceEdge],
protected val qArgs: QArguments) extends Logging {
private var missingKeys = Set.empty[String]
def runJobs
def numJobs = jobGraph.edgeSet.size
def processExec(exec: ExecEdge) : Unit
/**
* Emulates storing of properties per node by looking up values on
* the current edge/target-node or any preceding nodes in the graph.
*/
def lookup(edge: ResourceEdge, key: String, default: String) : String = {
val value = lookupRecursive(edge, key) match {
case Some(value) => value
case None => qArgs.argMap.getOrElse(key, default)
}
if (value == null)
missingKeys = missingKeys ++ Set(key)
value
}
protected def logMissingKeys = {
if (qArgs.dryRun && !missingKeys.isEmpty) {
logger.warn("Missing keys:")
for (key <- missingKeys)
logger.warn(" ${" + key + "}")
}
}
private def lookupRecursive(edge: ResourceEdge, key: String) : Option[String] = {
var value = edge.lookup(key)
if (value.isDefined)
return value
value = this.jobGraph.getEdgeTarget(edge).lookup(key)
if (value.isDefined)
return value
return lookupRecursive(this.jobGraph.getEdgeSource(edge), key)
}
private def lookupRecursive(node: ResourceNode, key: String) : Option[String] = {
var value = node.lookup(key)
if (value.isDefined)
return value
for (edge <- this.jobGraph.incomingEdgesOf(node)) {
value = lookupRecursive(edge, key)
if (value.isDefined)
return value
}
return None
}
}

View File

@ -1,5 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
class MapResourceNode (private val resources: Map[String,String]) extends ResourceNode {
override def lookup(key: String) : Option[String] = this.resources.get(key)
}

View File

@ -1,6 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
class ResourceEdge {
def traverse(graph: JobScheduler): Unit = {}
def lookup(key: String) : Option[String] = None
}

View File

@ -1,9 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.apache.commons.lang.builder.{HashCodeBuilder, EqualsBuilder}
class ResourceNode() {
override def equals(p1: Any) = EqualsBuilder.reflectionEquals(this, p1)
override def hashCode = HashCodeBuilder.reflectionHashCode(this)
def lookup(key: String) : Option[String] = None
}

View File

@ -1,19 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.broadinstitute.sting.queue.util.ProcessUtils
import org.broadinstitute.sting.queue.QArguments
import org.jgrapht.DirectedGraph
/**
* Runs jobs one at a time locally
*/
class SimpleJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends TopologicalJobScheduler(jobGraph, qArgs) {
def processExec(exec: ExecEdge) = {
var commandString = exec.commandString
logger.info(commandString)
if (!qArgs.dryRun)
ProcessUtils.runCommandAndWait(commandString)
}
}

View File

@ -1,29 +0,0 @@
package org.broadinstitute.sting.queue.engine.scheduling
import org.jgrapht.DirectedGraph
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.QArguments
/**
* Loops over the job graph running jobs as the edges are traversed
*/
abstract class TopologicalJobScheduler(jobGraph: DirectedGraph[ResourceNode, ResourceEdge], qArgs: QArguments)
extends JobScheduler(jobGraph, qArgs) {
protected val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[ResourceNode, ResourceEdge] {
override def edgeTraversed(event: EdgeTraversalEvent[ResourceNode, ResourceEdge]) =
event.getEdge.traverse(TopologicalJobScheduler.this)
})
override def runJobs = {
logger.info(String.format("Running %s jobs.", this.numJobs.toString))
for (target <- iterator) {
// Do nothing for now, let event handler respond
}
logMissingKeys
}
}

View File

@ -0,0 +1,92 @@
package org.broadinstitute.sting.queue.function
import java.io.File
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph}
trait CommandLineFunction extends QFunction with DispatchFunction {
/**
* The command line to run locally or via grid computing.
*/
def commandLine: String
/**
* The directory where the command should run.
*/
@Internal
var commandDirectory: File = new File(".")
/**
* Repeats parameters with a prefix/suffix if they are set otherwise returns "".
* Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
*/
protected def repeat(prefix: String, params: Seq[_], suffix: String = "", separator: String = "") =
params.filter(param => hasValue(param)).map(param => prefix + toValue(param) + suffix).mkString(separator)
/**
* Returns parameter with a prefix/suffix if it is set otherwise returns "".
* Does not output null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
*/
protected def optional(prefix: String, param: Any, suffix: String = "") =
if (hasValue(param)) prefix + toValue(param) + suffix else ""
/**
* Sets a field value using the name of the field.
* Field must be annotated with @Input, @Output, or @Internal
* @returns true if the value was found and set
*/
def setValue(name: String, value: String) = {
ReflectionUtils.getField(this, name) match {
case Some(field) =>
val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
if (isInput || isOutput || isInternal) {
ReflectionUtils.setValue(this, field, value)
}
true
case None => false
}
}
private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal])
def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input])
def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output])
override def missingValues = {
val missingInputs = missingFields(inputs)
val missingOutputs = missingFields(outputs)
missingInputs | missingOutputs
}
private def missingFields(fields: Map[String, Any]) = {
var missing = Set.empty[String]
for ((name, value) <- fields) {
val isOptional = ReflectionUtils.getField(this, name) match {
case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional])
case None => false
}
if (!isOptional)
if (!hasValue(value))
missing += name
}
missing
}
private def hasValue(param: Any) = param match {
case null => false
case Nil => false
case None => false
case _ => true
}
private def toValue(param: Any): String = param match {
case null => ""
case Nil => ""
case None => ""
case Some(x) => x.toString
case x => x.toString
}
}

View File

@ -0,0 +1,19 @@
package org.broadinstitute.sting.queue.function
import java.io.File
import org.broadinstitute.sting.queue.util.Internal
trait DispatchFunction extends QFunction with MemoryLimitedFunction {
def commandLine: String
var commandDirectory: File
var jobName: String = _
var jobOutputFile: File = _
var jobErrorFile: File = _
@Internal
var jobProject = "Queue"
@Internal
var jobQueue = "broad"
}

View File

@ -0,0 +1,6 @@
package org.broadinstitute.sting.queue.function
trait IntervalFunction {
type Intervals = String
var intervals: Intervals
}

View File

@ -0,0 +1,14 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.engine.QGraph
import scala.collection.immutable.ListMap
/**
* Utility class to map a set of inputs to set of outputs.
* The QGraph uses this function internally to return
*/
class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction {
def inputs = in
def outputs = out
def run(qGraph: QGraph) = null
}

View File

@ -0,0 +1,10 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.util.{Input, Optional, ClassType}
trait MemoryLimitedFunction {
@Input
@Optional
@ClassType(classOf[Int])
var memoryLimit: Option[Int] = None
}

View File

@ -0,0 +1,10 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.engine.QGraph
import scala.collection.immutable.ListMap
trait QFunction {
def inputs: ListMap[String, Any]
def outputs: ListMap[String, Any]
def missingValues = Set.empty[String]
}

View File

@ -0,0 +1,35 @@
package org.broadinstitute.sting.queue.function.gatk
import java.io.File
import org.broadinstitute.sting.queue.util.{ClassType, Input, Optional}
import org.broadinstitute.sting.queue.function.{MemoryLimitedFunction, IntervalFunction, CommandLineFunction}
trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with IntervalFunction {
@Input
@Optional
var javaTmpDir: String = _
@Input
var gatkJar: String = _
@Input
var referenceFile: String = _
@Input
@Optional
@ClassType(classOf[File])
var bamFiles: List[File] = Nil
@Input
@Optional
var dbsnp: File = _
@Input
@Optional
var intervals: Intervals = new Intervals("all")
protected def gatkCommandLine(walker: String) =
"java%s%s -jar %s -T %s -R %s%s%s "
.format(optional(" -Xmx", memoryLimit, "g"), optional(" -Djava.io.tmpdir=", javaTmpDir),
gatkJar, walker, referenceFile, repeat(" -I ", bamFiles), optional(" -D ", dbsnp), optional(" -L ", intervals))
}

View File

@ -7,7 +7,7 @@ import org.apache.log4j._
*/
trait Logging {
private val className = this.getClass.getName
lazy val logger = configuredLogger
protected lazy val logger = configuredLogger
def configuredLogger = {
Logging.configureLogging

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.utils.text.XReadLines
import collection.mutable.ListBuffer
import collection.JavaConversions._
import java.io.File
object ProcessUtils extends Logging {
@ -15,10 +16,10 @@ object ProcessUtils extends Logging {
val running = new ListBuffer[Process]()
def runCommandAndWait(command: String) = {
def runCommandAndWait(command: String, directory: File) = {
logger.debug("Running command: " + command)
var builder = new ProcessBuilder("sh", "-c", command)
var builder = new ProcessBuilder("sh", "-c", command).directory(directory)
var process = builder.start
running += process

View File

@ -0,0 +1,96 @@
package org.broadinstitute.sting.queue.util
import java.lang.reflect.Field
import org.broadinstitute.sting.queue.QException
import java.lang.annotation.Annotation
import scala.concurrent.JavaConversions
import scala.concurrent.JavaConversions._
import scala.collection.immutable.ListMap
object ReflectionUtils {
def getField(obj: AnyRef, name: String) = getAllFields(obj.getClass).find(_.getName == name)
def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null
def getFieldsAnnotatedWith(obj: AnyRef, fields: List[Field], annotation: Class[_ <: Annotation]) =
ListMap(fields.filter(field => hasAnnotation(field, annotation))
.map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*)
def getAllTypes(clazz: Class[_]) = {
var types = List.empty[Class[_]]
var c = clazz
while (c != null) {
types :+= c
c = c.getSuperclass
}
types
}
def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList)
def setValue(obj: AnyRef, field: Field, value: String) = {
val getter = fieldGetter(field)
val setter = fieldSetter(field)
if (getter == null)
throw new QException("Field may be private? Unable to find getter for field: " + field)
if (getter == null)
throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field)
if (classOf[Seq[_]].isAssignableFrom(field.getType)) {
if (!field.isAnnotationPresent(classOf[ClassType]))
throw new QException("@ClassType must be specified due to type erasure for field: " + field)
val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value
val typeValue = coerce(fieldType, value)
var list = getter.invoke(obj).asInstanceOf[Seq[_]]
list :+= typeValue
setter.invoke(obj, list)
} else if (classOf[Option[_]].isAssignableFrom(field.getType)) {
if (!field.isAnnotationPresent(classOf[ClassType]))
throw new QException("@ClassType must be specified due to type erasure for field: " + field)
val fieldType = field.getAnnotation(classOf[ClassType]).asInstanceOf[ClassType].value
val typeValue = coerce(fieldType, value)
setter.invoke(obj, Some(typeValue))
} else {
val fieldType = field.getType
val typeValue = coerce(fieldType, value)
setter.invoke(obj, typeValue.asInstanceOf[AnyRef])
}
}
private[util] def fieldGetter(field: Field) = field.getDeclaringClass.getMethod(field.getName)
private[util] def fieldSetter(field: Field) = field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType)
private def coerce(clazz: Class[_], value: String) = {
if (classOf[String] == clazz) value
else if (classOf[Boolean] == clazz) value.toBoolean
else if (classOf[Byte] == clazz) value.toByte
else if (classOf[Short] == clazz) value.toShort
else if (classOf[Int] == clazz) value.toInt
else if (classOf[Long] == clazz) value.toLong
else if (classOf[Float] == clazz) value.toFloat
else if (classOf[Double] == clazz) value.toDouble
else if (hasStringConstructor(clazz))
clazz.getConstructor(classOf[String]).newInstance(value)
else throw new QException("Unable to coerce value '%s' to type '%s'.".format(value, clazz))
}
private def hasStringConstructor(clazz: Class[_]) = {
clazz.getConstructors.exists(constructor => {
val parameters = constructor.getParameterTypes
parameters.size == 1 && parameters.head == classOf[String]
})
}
}