Refactoring for a first version of scatter gather api with basic shell script implementations.

Modified build script so that queue is cleaned during "ant clean".



git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3611 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-06-22 18:39:20 +00:00
parent 18f62a346d
commit 30cf78fdc0
38 changed files with 938 additions and 233 deletions

101
build.xml
View File

@ -3,9 +3,10 @@
<property name="build.sysclasspath" value="first" />
<property name="source.dir" value="java/src" />
<property name="build.dir" value="build" />
<property name="dist.dir" value="dist" />
<property name="java.source.dir" value="java/src" />
<property name="java.classes" value="${build.dir}/java/classes" />
<property name="resource.file" value="StingText.properties" />
<!-- do we want to halt on failure of a junit test? default to yes (Bamboo uses 'no') -->
@ -22,7 +23,7 @@
<property environment="env"/>
<property name="target" value="${env.STING_BUILD_TYPE}" />
<dirset id="source.files" dir="${source.dir}">
<dirset id="java.source.files" dir="${java.source.dir}">
<patternset>
<include name="org/broadinstitute/sting/**"/>
<exclude name="**/playground/**" unless="include.playground" />
@ -33,7 +34,7 @@
<!-- the path for resources that need to go into the GATK jar;
any additional resources should go into this set. -->
<path id="gatk.resources">
<fileset dir="${source.dir}">
<fileset dir="${java.source.dir}">
<include name="**/**/templates/*" />
</fileset>
</path>
@ -51,7 +52,7 @@
<path id="classpath">
<path refid="runtime.dependencies" />
<pathelement location="build" />
<pathelement location="${java.classes}" />
</path>
<!-- ivy properties -->
@ -101,30 +102,31 @@
</condition>
<!-- Create the build directory structure used by compile -->
<mkdir dir="build"/>
<mkdir dir="${build.dir}"/>
<mkdir dir="${java.classes}"/>
</target>
<target name="compile" depends="init,resolve"
<target name="java.compile" depends="init,resolve"
description="compile the source">
<!-- Compile the java code from ${src} into build -->
<javac srcdir="${source.dir}" destdir="build" debug="true" debuglevel="lines,vars,source" classpathref="runtime.dependencies">
<javac srcdir="${java.source.dir}" destdir="${java.classes}" debug="true" debuglevel="lines,vars,source" classpathref="runtime.dependencies">
<exclude name="**/playground/**" unless="include.playground"/>
<exclude name="**/oneoffprojects/**" unless="include.oneoffs"/>
</javac>
</target>
<target name="extracthelp" depends="init,compile"
<target name="extracthelp" depends="init,java.compile"
description="Extract help key/value pair file from the JavaDoc tags."
unless="disable.help">
<javadoc doclet="org.broadinstitute.sting.utils.help.ResourceBundleExtractorDoclet"
docletpathref="classpath"
classpathref="runtime.dependencies"
additionalparam="-build-timestamp &quot;${build.timestamp}&quot; -version-suffix .${build.version} -out ${basedir}/${build.dir}/${resource.file}">
<packageset refid="source.files"/>
additionalparam="-build-timestamp &quot;${build.timestamp}&quot; -version-suffix .${build.version} -out ${basedir}/${java.classes}/${resource.file}">
<packageset refid="java.source.files"/>
</javadoc>
</target>
<target name="dist" depends="compile,extracthelp"
<target name="dist" depends="java.compile,extracthelp"
description="generate the distribution">
<mkdir dir="${dist.dir}"/>
<delete>
@ -136,12 +138,12 @@
</copy>
<jar jarfile="${dist.dir}/StingUtils.jar">
<fileset dir="build" includes="**/utils/**/*.class"/>
<fileset dir="${java.classes}" includes="**/utils/**/*.class"/>
</jar>
<jar jarfile="${dist.dir}/GenomeAnalysisTK.jar">
<path refid="gatk.resources"/>
<fileset dir="build">
<fileset dir="${java.classes}">
<include name="${resource.file}" />
<include name="**/gatk/**/*.class" />
<include name="**/alignment/**/*.class"/>
@ -153,11 +155,11 @@
</jar>
<jar jarfile="${dist.dir}/Aligner.jar">
<fileset dir="build" includes="**/alignment/**/*.class" />
<fileset dir="${java.classes}" includes="**/alignment/**/*.class" />
</jar>
<jar jarfile="${dist.dir}/AnalyzeCovariates.jar" whenmanifestonly="skip">
<fileset dir="build">
<fileset dir="${java.classes}">
<include name="**/analyzecovariates/**/*.class" />
<include name="**/gatk/walkers/recalibration/*.class" />
</fileset>
@ -167,7 +169,7 @@
</jar>
<jar jarfile="${dist.dir}/CGUtilities.jar" whenmanifestonly="skip">
<fileset dir="build">
<fileset dir="${java.classes}">
<include name="**/*.class" />
</fileset>
<manifest>
@ -176,7 +178,7 @@
</jar>
<jar jarfile="${dist.dir}/VCFTool.jar" whenmanifestonly="skip">
<fileset dir="build">
<fileset dir="${java.classes}">
<include name="**/*.class"/>
</fileset>
<manifest>
@ -240,14 +242,14 @@
</antcall>
</target>
<target name="test.compile" depends="oneoffs">
<target name="java.test.compile" depends="oneoffs">
<echo message="Sting: Compiling test cases!"/>
<mkdir dir="${test.classes}"/>
<javac destdir="${test.classes}" debug="true" optimize="on">
<src path="${test.sources}"/>
<mkdir dir="${java.test.classes}"/>
<javac destdir="${java.test.classes}" debug="true" optimize="on">
<src path="${java.test.sources}"/>
<classpath>
<path refid="runtime.dependencies" />
<pathelement location="build"/>
<pathelement location="${java.classes}"/>
<pathelement location="lib/junit-4.4.jar"/>
</classpath>
</javac>
@ -260,8 +262,8 @@
<param name="ivy.conf" value="scala"/>
</antcall>
<antcall target="dist"/>
<property name="scala.src" value="scala/src" />
<property name="scala.classes" value="scala/classes" />
<property name="scala.source.dir" value="scala/src" />
<property name="scala.classes" value="build/scala" />
<path id="scala.classpath">
<fileset dir="lib">
@ -279,7 +281,7 @@
</taskdef>
<mkdir dir="${scala.classes}"/>
<echo>Building Scala...</echo>
<scalac srcdir="${scala.src}" destdir="${scala.classes}" classpathref="scala.classpath" force="changed">
<scalac srcdir="${scala.source.dir}" destdir="${scala.classes}" classpathref="scala.classpath" force="changed">
<include name="*.scala"/>
</scalac>
@ -296,34 +298,29 @@
<antcall target="resolve">
<param name="ivy.conf" value="queue"/>
</antcall>
<antcall target="dist"/>
<property name="queue.src" value="scala/src" />
<property name="queue.classes" value="scala/classes" />
<pathconvert property="queuejar.classpath" pathsep=" ">
<flattenmapper/>
<fileset dir="${dist.dir}" includes="*.jar"/>
</pathconvert>
<!-- Call dist so jar files are picked up for manifest. -->
<antcall target="dist" />
<property name="queue.source.dir" value="scala/src" />
<property name="queue.classes" value="${build.dir}/scala/classes" />
<path id="queue.classpath">
<fileset dir="lib">
<include name="scala-compiler-*.jar"/>
<include name="scala-library-*.jar"/>
</fileset>
<fileset dir="${dist.dir}">
<patternset>
<include name="*.jar"/>
</patternset>
</fileset>
<path refid="runtime.dependencies" />
<pathelement location="${java.classes}" />
</path>
<taskdef resource="scala/tools/ant/antlib.xml">
<classpath refid="queue.classpath"/>
</taskdef>
<mkdir dir="${queue.classes}"/>
<echo>Building Queue...</echo>
<scalac srcdir="${queue.src}" destdir="${queue.classes}" classpathref="queue.classpath">
<scalac srcdir="${queue.source.dir}" destdir="${queue.classes}" classpathref="queue.classpath">
<include name="org/broadinstitute/sting/queue/**/*.scala"/>
</scalac>
<pathconvert property="queuejar.classpath" pathsep=" ">
<flattenmapper/>
<fileset dir="${dist.dir}" includes="*.jar"/>
</pathconvert>
<jar jarfile="${dist.dir}/Queue.jar">
<fileset dir="${queue.classes}">
<include name="org/broadinstitute/sting/queue/**/*.class"/>
@ -340,9 +337,9 @@
<!-- ***************************************************************************** -->
<!-- where to put reports and tests-->
<property name="report" value="${build.dir}/report"/>
<property name="test.classes" value="${build.dir}/testclasses"/>
<property name="java.test.classes" value="${java.classes}/testclasses"/>
<property name="test.output" value="${dist.dir}/test"/>
<property name="test.sources" value="java/test"/>
<property name="java.test.sources" value="java/test"/>
<!-- provide a ceiling on the memory that unit/integration tests can consume. -->
<property name="test.maxmemory" value="2g"/>
@ -357,14 +354,14 @@
<formatter type="xml"/>
<classpath>
<pathelement location="build"/>
<pathelement location="${java.classes}"/>
<path refid="runtime.dependencies"/>
<pathelement location="${test.classes}"/>
<pathelement location="${java.test.classes}"/>
<pathelement location="lib/junit-4.4.jar"/>
</classpath>
<batchtest fork="yes" todir="${report}">
<fileset dir="${test.classes}">
<fileset dir="${java.test.classes}">
<include name="**/@{testtype}.class"/>
<exclude name="**/BaseTest.class"/>
</fileset>
@ -376,19 +373,19 @@
<!-- our three different test conditions: Test, IntegrationTest, PerformanceTest -->
<target name="test" depends="test.compile" description="Run unit tests">
<target name="test" depends="java.test.compile" description="Run unit tests">
<condition property="ttype" value="*UnitTest" else="${single}">
<not><isset property="single"/></not>
</condition>
<run-test testtype="${ttype}"/>
</target>
<target name="integrationtest" depends="test.compile" description="Run unit tests">
<target name="integrationtest" depends="java.test.compile" description="Run unit tests">
<condition property="itype" value="*IntegrationTest" else="${single}">
<not><isset property="single"/></not>
</condition>
<run-test testtype="${itype}"/>
</target>
<target name="performancetest" depends="test.compile" description="Run unit tests">
<target name="performancetest" depends="java.test.compile" description="Run unit tests">
<condition property="ptype" value="*PerformanceTest" else="${single}">
<not><isset property="single"/></not>
</condition>
@ -401,7 +398,7 @@
<mkdir dir="javadoc"/>
<javadoc destdir="javadoc"
classpathref="runtime.dependencies">
<packageset refid="source.files"/>
<packageset refid="java.source.files"/>
</javadoc>
</target>
@ -454,7 +451,7 @@
<target name="clean" description="clean up">
<delete dir="out"/>
<delete dir="build"/>
<delete dir="${build.dir}"/>
<delete dir="lib"/>
<delete dir="staging"/>
<delete dir="${dist.dir}"/>

View File

@ -42,8 +42,8 @@
<dependency org="commons-io" name="commons-io" rev="1.3.2" conf="default"/>
<!-- Scala dependancies -->
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0.RC5" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0.RC5" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-compiler" rev="2.8.0.RC6" conf="scala->default"/>
<dependency org="org.scala-lang" name="scala-library" rev="2.8.0.RC6" conf="scala->default"/>
<!-- Queue additional dependencies -->
<dependency org="commons-lang" name="commons-lang" rev="2.5" conf="queue->default"/>

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies the class to gather an output of a QFunction.
* Not an input or output but should be copied with a function.
* Internals should have default values that should be handled, i.e. they are always @Optional
* A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc.
* or to name part of a function: farm job name
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Gather {
Class value();
}

View File

@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an input to a QueueFunction
* Specifies an input to a QFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented

View File

@ -27,9 +27,11 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an internal setting for a QueueFunction.
* Specifies an internal setting for a QFunction.
* Not an input or output but should be copied with a function.
* Internals should have default values that should be handled, i.e. they are always @Optional
* A common use for @Internal is to specify WHERE a function runs: farm queue, directory, etc.
* or to name part of a function: farm job name
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented

View File

@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an input or output to a QueueFunction is optional
* Specifies an input or output to a QFunction is optional
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented

View File

@ -27,7 +27,7 @@ package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
/**
* Specifies an output to a QueueFunction
* Specifies an output to a QFunction
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.util;
import java.lang.annotation.*;
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Scatter {
Class value();
}

View File

@ -21,8 +21,9 @@ class QArguments(args: Array[String]) {
filtered.appendAll(args)
if (isFlagged(filtered, "-debug"))
Logging.enableDebug
Logging.setDebug
if (isFlagged(filtered, "-trace"))
Logging.setTrace
if (isFlagged(filtered, "-dry"))
dryRun = true
if (isFlagged(filtered, "-bsub"))

View File

@ -15,6 +15,11 @@ object QScript {
type ClassType = org.broadinstitute.sting.queue.util.ClassType
type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
type GatkFunction = org.broadinstitute.sting.queue.function.gatk.GatkFunction
type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
type Scatter = org.broadinstitute.sting.queue.util.Scatter
type Gather = org.broadinstitute.sting.queue.util.Gather
type BamGatherFunction = org.broadinstitute.sting.queue.function.scattergather.BamGatherFunction
type SimpleTextGatherFunction = org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
// The arguments for executing pipelines
private var qArgs: QArguments = _
@ -85,7 +90,7 @@ object QScript {
* Sets the @Input and @Output values for a single function
*/
def setParams(function: CommandLineFunction): Unit =
for ((name, value) <- qArgs.argMap) function.setValue(name, value)
for ((name, value) <- qArgs.argMap) function.addOrUpdateWithStringValue(name, value)
/**
* Executes functions that have been added to the pipeline.

View File

@ -8,8 +8,11 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction
*/
trait CommandLineRunner extends Logging {
def run(function: CommandLineFunction, qGraph: QGraph) = {
var commandLine = function.commandLine
logger.info(commandLine)
if (logger.isDebugEnabled) {
logger.debug(function.commandDirectory + " > " + function.commandLine)
} else {
logger.info(function.commandLine)
}
if (!qGraph.dryRun)
ProcessUtils.runCommandAndWait(function.commandLine, function.commandDirectory)

View File

@ -1,18 +1,12 @@
package org.broadinstitute.sting.queue.engine
import edu.mit.broad.core.lsf.LocalLsfJob
import collection.JavaConversions._
import management.ManagementFactory
import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.function.{DispatchFunction, QFunction}
trait DispatchJobRunner {
type DispatchJobType
private var dispatchJobs = Map.empty[DispatchFunction, DispatchJobType]
protected def newJobName = DispatchJobRunner.nextJobName
def dispatch(function: DispatchFunction, qGraph: QGraph)
protected def addJob(function: DispatchFunction, dispatchJob: DispatchJobType) =
@ -32,25 +26,9 @@ trait DispatchJobRunner {
case dispatchFunction: DispatchFunction => previous :+= dispatchJobs(dispatchFunction)
// For any other type of edge find the LSF jobs preceding the edge
case qFunction: QFunction => previous :::= previousJobs(qFunction, qGraph)
case qFunction: QFunction => previous = previousJobs(qFunction, qGraph) ::: previous
}
}
previous
}
}
object DispatchJobRunner {
private val jobNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
private var jobIndex = 0
private def nextJobName = {
jobIndex += 1
jobNamePrefix + "-" + jobIndex
}
}

View File

@ -3,7 +3,6 @@ package org.broadinstitute.sting.queue.engine
import collection.JavaConversions._
import edu.mit.broad.core.lsf.LocalLsfJob
import org.broadinstitute.sting.queue.function.DispatchFunction
import java.io.File
import java.util.ArrayList
import org.broadinstitute.sting.queue.util.Logging
@ -11,22 +10,10 @@ trait LsfJobRunner extends DispatchJobRunner with Logging {
type DispatchJobType = LocalLsfJob
def dispatch(function: DispatchFunction, qGraph: QGraph) = {
var jobName = function.jobName
if (jobName == null)
jobName = newJobName
var jobOutputFile = function.jobOutputFile
if (jobOutputFile == null)
jobOutputFile = new File(jobName + ".out")
var jobErrorFile = function.jobErrorFile
if (jobErrorFile == null)
jobErrorFile = new File(jobName + ".err")
val job = new LocalLsfJob
job.setName(jobName)
job.setOutputFile(jobOutputFile)
job.setErrFile(jobErrorFile)
job.setName(function.jobName)
job.setOutputFile(function.jobOutputFile)
job.setErrFile(function.jobErrorFile)
job.setWorkingDir(function.commandDirectory)
job.setProject(function.jobProject)
job.setQueue(function.jobQueue)
@ -45,7 +32,11 @@ trait LsfJobRunner extends DispatchJobRunner with Logging {
addJob(function, job)
logger.info(job.getBsubCommand.mkString(" "))
if (logger.isDebugEnabled) {
logger.debug(function.commandDirectory + " > " + job.getBsubCommand.mkString(" "))
} else {
logger.info(job.getBsubCommand.mkString(" "))
}
if (!qGraph.dryRun)
job.start

View File

@ -2,14 +2,19 @@ package org.broadinstitute.sting.queue.engine
import org.jgrapht.graph.SimpleDirectedGraph
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.collection.immutable.ListMap
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
import org.broadinstitute.sting.queue.util.{CollectionUtils, Logging}
import org.broadinstitute.sting.queue.QException
import org.jgrapht.alg.CycleDetector
import org.jgrapht.EdgeFactory
class QGraph extends Logging {
var dryRun = true
var bsubAllJobs = false
val jobGraph = new SimpleDirectedGraph[QNode, QFunction](classOf[QFunction])
val jobGraph = newGraph
def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
def add(command: CommandLineFunction) {
@ -25,47 +30,135 @@ class QGraph extends Logging {
val inputs = function.inputs
val outputs = function.outputs
if (inputs.size > 1)
for ((name, input) <- inputs)
addNullEdge(ListMap(name -> input), inputs)
for ((name, input) <- inputs) {
addCollectionInputs(name, input)
if (inputs.size > 1)
addMappingEdge(ListMap(name -> input), inputs)
}
if (outputs.size > 1)
for ((name, output) <- outputs)
addNullEdge(outputs, ListMap(name -> output))
for ((name, output) <- outputs) {
addCollectionOutputs(name, output)
if (outputs.size > 1)
addMappingEdge(outputs, ListMap(name -> output))
}
}
var pruning = true
while (pruning) {
pruning = false
val filler = jobGraph.edgeSet.filter(isFiller(_))
if (filler.size > 0) {
jobGraph.removeAllEdges(filler)
pruning = true
}
}
jobGraph.removeAllVertices(jobGraph.vertexSet.filter(isOrphan(_)))
}
def run = {
var isReady = true
for (function <- JavaConversions.asSet(jobGraph.edgeSet)) {
val missingValues = function.missingValues
if (missingValues.size > 0) {
isReady = false
logger.error(function match {
case cmd: CommandLineFunction => "Missing values for function: %s".format(cmd.commandLine)
case x => "Missing values:"
})
for (missing <- missingValues) {
logger.error(" " + missing)
}
function match {
case cmd: CommandLineFunction =>
val missingValues = cmd.missingValues
if (missingValues.size > 0) {
isReady = false
logger.error("Missing values for function: %s".format(cmd.commandLine))
for (missing <- missingValues)
logger.error(" " + missing)
}
case _ =>
}
}
val detector = new CycleDetector(jobGraph)
if (detector.detectCycles) {
logger.error("Cycles were detected in the graph:")
for (cycle <- detector.findCycles)
logger.error(" " + cycle)
isReady = false
}
if (isReady || this.dryRun)
(new TopologicalJobScheduler(this) with LsfJobRunner).runJobs
}
private def add(f: QFunction, replace: Boolean) {
val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
jobGraph.addVertex(inputs)
jobGraph.addVertex(outputs)
if (replace)
jobGraph.removeAllEdges(inputs, outputs)
jobGraph.addEdge(inputs, outputs, f)
private def newGraph = new SimpleDirectedGraph[QNode, QFunction](new EdgeFactory[QNode, QFunction] {
def createEdge(input: QNode, output: QNode) = new MappingFunction(input.valueMap, output.valueMap)})
private def add(f: QFunction, replace: Boolean): Unit = {
try {
f.freeze
f match {
case scatterGather: ScatterGatherableFunction if (bsubAllJobs && scatterGather.scatterGatherable) =>
val functions = scatterGather.generateFunctions()
if (logger.isTraceEnabled)
logger.trace("Scattered into %d parts: %s".format(functions.size, functions))
functions.foreach(add(_))
case _ =>
val inputs = QNode(f.inputs.values.filter(_ != null).toSet)
val outputs = QNode(f.outputs.values.filter(_ != null).toSet)
val newSource = jobGraph.addVertex(inputs)
val newTarget = jobGraph.addVertex(outputs)
val removedEdges = if (replace) jobGraph.removeAllEdges(inputs, outputs) else Nil
val added = jobGraph.addEdge(inputs, outputs, f)
if (logger.isTraceEnabled) {
logger.trace("Mapped from: " + inputs)
logger.trace("Mapped to: " + outputs)
logger.trace("Mapped via: " + f)
logger.trace("Removed edges: " + removedEdges)
logger.trace("New source?: " + newSource)
logger.trace("New target?: " + newTarget)
logger.trace("")
}
}
} catch {
case e: Exception =>
throw new QException("Error adding function: " + f, e)
}
}
private def addNullEdge(input: ListMap[String, Any], output: ListMap[String, Any]) = {
add(new MappingFunction(input, output), false)
private def addCollectionInputs(name: String, value: Any): Unit = {
CollectionUtils.foreach(value, (item, collection) =>
addMappingEdge(ListMap(name -> item), ListMap(name -> collection)))
}
private def addCollectionOutputs(name: String, value: Any): Unit = {
CollectionUtils.foreach(value, (item, collection) =>
addMappingEdge(ListMap(name -> collection), ListMap(name -> item)))
}
private def addMappingEdge(input: ListMap[String, Any], output: ListMap[String, Any]) =
add(new MappingFunction(input, output), false)
private def isMappingEdge(edge: QFunction) =
edge.isInstanceOf[MappingFunction]
private def isFiller(edge: QFunction) = {
if (isMappingEdge(edge)) {
val source = jobGraph.getEdgeSource(edge)
val target = jobGraph.getEdgeTarget(edge)
if (jobGraph.outgoingEdgesOf(target).size == 0 || jobGraph.incomingEdgesOf(source).size == 0)
true
else if (isLoopback(source) || isLoopback(target))
true
else false
} else false
}
private def isLoopback(node: QNode) = {
var loopback = false
val incoming = jobGraph.incomingEdgesOf(node)
val outgoing = jobGraph.outgoingEdgesOf(node)
if (incoming.size == 1 && outgoing.size == 1)
if (isMappingEdge(incoming.head) && isMappingEdge(outgoing.head))
if (jobGraph.getEdgeSource(incoming.head) == jobGraph.getEdgeTarget(outgoing.head))
loopback = true
loopback
}
private def isOrphan(node: QNode) =
(jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0
}

View File

@ -1,6 +1,20 @@
package org.broadinstitute.sting.queue.engine
import scala.collection.immutable.ListMap
/**
* Represents a state between QFunctions the directed acyclic QGraph
*/
case class QNode (private val items: Set[Any])
case class QNode (private val items: Set[Any]) {
/**
* Used during QGraph error reporting.
* The EdgeFactory uses the valueMap to create new edges for the CycleDetector.
*/
def valueMap = {
var map = ListMap.empty[String, Any]
for (item <- items)
if (item != null)
map += item.toString -> item
map
}
}

View File

@ -12,20 +12,28 @@ import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunc
abstract class TopologicalJobScheduler(private val qGraph: QGraph)
extends CommandLineRunner with DispatchJobRunner with Logging {
protected val iterator = new TopologicalOrderIterator(this.qGraph.jobGraph)
protected val iterator = new TopologicalOrderIterator(qGraph.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
case f: DispatchFunction if (TopologicalJobScheduler.this.qGraph.bsubAllJobs) => dispatch(f, qGraph)
case f: DispatchFunction if (qGraph.bsubAllJobs) => dispatch(f, qGraph)
case f: CommandLineFunction => run(f, qGraph)
case f: MappingFunction => /* do nothing for mapping functions */
}
})
def runJobs = {
logger.info("Number of jobs: %s".format(this.qGraph.numJobs))
logger.info("Number of jobs: %s".format(qGraph.numJobs))
if (logger.isTraceEnabled)
logger.trace("Number of nodes: %s".format(qGraph.jobGraph.vertexSet.size))
var numNodes = 0
for (target <- iterator) {
if (logger.isTraceEnabled)
logger.trace("Visiting: " + target)
numNodes += 1
// Do nothing for now, let event handler respond
}
if (logger.isTraceEnabled)
logger.trace("Done walking %s nodes.".format(numNodes))
}
}

View File

@ -3,20 +3,9 @@ package org.broadinstitute.sting.queue.function
import java.io.File
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.queue.engine.{CommandLineRunner, QGraph}
import java.lang.reflect.Field
trait CommandLineFunction extends QFunction with DispatchFunction {
/**
* The command line to run locally or via grid computing.
*/
def commandLine: String
/**
* The directory where the command should run.
*/
@Internal
var commandDirectory: File = new File(".")
trait CommandLineFunction extends InputOutputFunction with DispatchFunction {
/**
* Repeats parameters with a prefix/suffix if they are set otherwise returns "".
* Skips null, Nil, None. Unwraps Some(x) to x. Everything else is called with x.toString.
@ -31,50 +20,25 @@ trait CommandLineFunction extends QFunction with DispatchFunction {
protected def optional(prefix: String, param: Any, suffix: String = "") =
if (hasValue(param)) prefix + toValue(param) + suffix else ""
/**
* Sets a field value using the name of the field.
* Field must be annotated with @Input, @Output, or @Internal
* @returns true if the value was found and set
*/
def setValue(name: String, value: String) = {
ReflectionUtils.getField(this, name) match {
case Some(field) =>
val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
if (isInput || isOutput || isInternal) {
ReflectionUtils.setValue(this, field, value)
}
true
case None => false
}
}
private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
private def internals = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Internal])
def inputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Input])
def outputs = ReflectionUtils.getFieldsAnnotatedWith(this, fields, classOf[Output])
override def missingValues = {
val missingInputs = missingFields(inputs)
val missingOutputs = missingFields(outputs)
def missingValues = {
val missingInputs = missingFields(inputFields)
val missingOutputs = missingFields(outputFields)
missingInputs | missingOutputs
}
private def missingFields(fields: Map[String, Any]) = {
private def missingFields(fields: List[Field]) = {
var missing = Set.empty[String]
for ((name, value) <- fields) {
val isOptional = ReflectionUtils.getField(this, name) match {
case Some(field) => ReflectionUtils.hasAnnotation(field, classOf[Optional])
case None => false
}
for (field <- fields) {
val isOptional = ReflectionUtils.hasAnnotation(field, classOf[Optional])
if (!isOptional)
if (!hasValue(value))
missing += name
if (!hasValue(ReflectionUtils.getValue(this, field)))
missing += field.getName
}
missing
}
protected def hasFieldValue(field: Field) = hasValue(this.getFieldValue(field))
private def hasValue(param: Any) = param match {
case null => false
case Nil => false

View File

@ -1,14 +1,35 @@
package org.broadinstitute.sting.queue.function
import java.io.File
import org.broadinstitute.sting.queue.util.Internal
import java.lang.management.ManagementFactory
import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
import org.broadinstitute.sting.queue.util._
trait DispatchFunction extends QFunction with MemoryLimitedFunction {
trait DispatchFunction extends InputOutputFunction {
def commandLine: String
var commandDirectory: File
@Input
@Optional
var memoryLimit: Option[Int] = None
/**
* The directory where the command should run.
*/
@Internal
var commandDirectory: File = IOUtils.CURRENT_DIR
@Internal
var jobNamePrefix: String = _
@Internal
var jobName: String = _
@Output
@Gather(classOf[SimpleTextGatherFunction])
var jobOutputFile: File = _
@Output
@Gather(classOf[SimpleTextGatherFunction])
var jobErrorFile: File = _
@Internal
@ -16,4 +37,54 @@ trait DispatchFunction extends QFunction with MemoryLimitedFunction {
@Internal
var jobQueue = "broad"
override def freeze = {
if (jobNamePrefix == null)
jobNamePrefix = DispatchFunction.processNamePrefix
if (jobName == null)
jobName = DispatchFunction.nextJobName(jobNamePrefix)
if (jobOutputFile == null)
jobOutputFile = new File(jobName + ".out")
if (jobErrorFile == null)
jobErrorFile = new File(jobName + ".err")
commandDirectory = IOUtils.absolute(IOUtils.CURRENT_DIR, commandDirectory)
super.freeze
}
/**
* Override the canon function to change any relative path to an absolute path.
*/
override protected def canon(value: Any) = {
value match {
case file: File => IOUtils.absolute(commandDirectory, file)
case x => super.canon(x)
}
}
def absolute(file: File) = IOUtils.absolute(commandDirectory, file)
def temp(subDir: String) = IOUtils.sub(commandDirectory, jobName + "-" + subDir)
override def toString = commandLine
}
object DispatchFunction {
private val processNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
private var jobIndex = 0
private def nextJobName(prefix: String) = {
jobIndex += 1
prefix + "-" + jobIndex
}
}

View File

@ -0,0 +1,67 @@
package org.broadinstitute.sting.queue.function
import java.lang.reflect.Field
import org.broadinstitute.sting.queue.util._
/**
* A function with @Inputs and @Outputs tagging fields that can be set by the user in a QScript
*/
trait InputOutputFunction extends QFunction with Cloneable {
def getFieldValue(field: Field) = ReflectionUtils.getValue(this, field)
def setFieldValue(field: Field, value: Any) = ReflectionUtils.setValue(this, field, value)
def functionFields: List[Field] = inputFields ::: outputFields ::: internalFields
def inputFields = ReflectionUtils.filterFields(fields, classOf[Input])
def outputFields = ReflectionUtils.filterFields(fields, classOf[Output])
def internalFields = ReflectionUtils.filterFields(fields, classOf[Internal])
private lazy val fields = ReflectionUtils.getAllFields(this.getClass)
def inputs = ReflectionUtils.getFieldNamesValues(this, inputFields)
def outputs = ReflectionUtils.getFieldNamesValues(this, outputFields)
def internals = ReflectionUtils.getFieldNamesValues(this, internalFields)
/**
* Sets a field value using the name of the field.
* Field must be annotated with @Input, @Output, or @Internal
* @returns true if the value was found and set
*/
def addOrUpdateWithStringValue(name: String, value: String) = {
fields.find(_.getName == name) match {
case Some(field) =>
val isInput = ReflectionUtils.hasAnnotation(field, classOf[Input])
val isOutput = ReflectionUtils.hasAnnotation(field, classOf[Output])
val isInternal = ReflectionUtils.hasAnnotation(field, classOf[Internal])
if (isInput || isOutput || isInternal) {
ReflectionUtils.addOrUpdateWithStringValue(this, field, value)
}
true
case None => false
}
}
def cloneFunction() = clone.asInstanceOf[this.type]
// explicitly overriden so that trait function cloneFunction can use this.clone
override protected def clone = super.clone
/**
* As the function is frozen, changes all fields to their canonical forms.
*/
override def freeze = {
for (field <- this.functionFields)
mapField(field, canon)
super.freeze
}
def mapField(field: Field, f: Any => Any): Any = {
var fieldValue = this.getFieldValue(field)
fieldValue = CollectionUtils.updated(fieldValue, f).asInstanceOf[AnyRef]
this.setFieldValue(field, fieldValue)
fieldValue
}
/**
* Set value to a uniform value across functions.
* The biggest example is file paths relative to the command directory in DispatchFunction
*/
protected def canon(value: Any): Any = value
}

View File

@ -1,6 +1,8 @@
package org.broadinstitute.sting.queue.function
trait IntervalFunction {
type Intervals = String
var intervals: Intervals
}
import java.io.File
trait IntervalFunction extends InputOutputFunction {
var referenceFile: File
var intervals: File
}

View File

@ -10,5 +10,5 @@ import scala.collection.immutable.ListMap
class MappingFunction(private val in: ListMap[String, Any], private val out: ListMap[String, Any]) extends QFunction {
def inputs = in
def outputs = out
def run(qGraph: QGraph) = null
override def toString = "<map>"
}

View File

@ -1,9 +0,0 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.util.{Input, Optional}
trait MemoryLimitedFunction {
@Input
@Optional
var memoryLimit: Option[Int] = None
}

View File

@ -1,10 +1,28 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.engine.QGraph
import scala.collection.immutable.ListMap
/**
* The base interface for all functions in Queue.
* Inputs and outputs are specified as ListMaps of name -> value.
* The names are used for debugging.
* Inputs are matched to other outputs by using .equals()
*/
trait QFunction {
/**
* After a function is frozen no more updates are allowed by the user.
* The function is allow to make necessary updates internally to make sure
* the inputs and outputs will be equal to other inputs and outputs.
*/
def freeze = {}
/**
* ListMap of name -> value inputs for this function.
*/
def inputs: ListMap[String, Any]
/**
* ListMap of name -> value outputs for this function.
*/
def outputs: ListMap[String, Any]
def missingValues = Set.empty[String]
}

View File

@ -1,11 +1,12 @@
package org.broadinstitute.sting.queue.function.gatk
import java.io.File
import org.broadinstitute.sting.queue.util.{Input, Optional}
import org.broadinstitute.sting.queue.function.{MemoryLimitedFunction, IntervalFunction, CommandLineFunction}
import org.broadinstitute.sting.queue.function.IntervalFunction
import org.broadinstitute.sting.queue.util.{Scatter, Internal, Input, Optional}
import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, IntervalScatterFunction}
trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with IntervalFunction {
@Input
trait GatkFunction extends ScatterGatherableFunction with IntervalFunction {
@Internal
@Optional
var javaTmpDir: String = _
@ -13,7 +14,7 @@ trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with I
var gatkJar: String = _
@Input
var referenceFile: String = _
var referenceFile: File = _
@Input
@Optional
@ -21,14 +22,15 @@ trait GatkFunction extends CommandLineFunction with MemoryLimitedFunction with I
@Input
@Optional
var dbsnp: File = _
@Scatter(classOf[IntervalScatterFunction])
var intervals: File = _
@Input
@Optional
var intervals: Intervals = new Intervals("all")
var dbsnp: File = _
protected def gatkCommandLine(walker: String) =
"java%s%s -jar %s -T %s -R %s%s%s "
"java%s%s -jar %s -T %s -R %s%s%s%s "
.format(optional(" -Xmx", memoryLimit, "g"), optional(" -Djava.io.tmpdir=", javaTmpDir),
gatkJar, walker, referenceFile, repeat(" -I ", bamFiles), optional(" -D ", dbsnp), optional(" -L ", intervals))
}

View File

@ -0,0 +1,9 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
class BamGatherFunction extends GatherFunction {
type GatherType = File
def commandLine = "samtools merge %s%s".format(originalOutput, repeat(" ", gatherParts))
}

View File

@ -0,0 +1,15 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.Input
import java.io.File
class CleanupTempDirsFunction extends CommandLineFunction {
@Input
var originalOutputs: List[Any] = Nil
@Input
var tempDirectories: List[File] = Nil
def commandLine = "rm -rf%s".format(repeat(" '", tempDirectories, "'"))
}

View File

@ -0,0 +1,15 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.{Output, Input}
class CreateTempDirsFunction extends CommandLineFunction {
@Input
var originalInputs: List[Any] = Nil
@Output
var tempDirectories: List[File] = Nil
def commandLine = "mkdir%s".format(repeat(" '", tempDirectories, "'"))
}

View File

@ -0,0 +1,16 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.{CommandLineFunction}
import org.broadinstitute.sting.queue.util.{Input, Output}
trait GatherFunction extends CommandLineFunction {
type GatherType
@Input
var gatherParts: List[GatherType] = Nil
@Output
var originalOutput: GatherType = _
def setOriginalFunction(originalFunction: ScatterGatherableFunction) = {}
}

View File

@ -0,0 +1,21 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.queue.util.Input
import org.broadinstitute.sting.queue.function.IntervalFunction
class IntervalScatterFunction extends ScatterFunction {
type ScatterType = File
@Input
var referenceFile: File = _
override def setOriginalFunction(originalFunction: ScatterGatherableFunction) = {
val command = originalFunction.asInstanceOf[IntervalFunction]
referenceFile = command.referenceFile
super.setOriginalFunction(originalFunction)
}
// TODO: Use the reference file for "all"
def commandLine = "splitIntervals.sh %s%s".format(originalInput, repeat(" ", scatterParts))
}

View File

@ -0,0 +1,20 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.queue.util.{Input, Output}
import java.io.File
trait ScatterFunction extends CommandLineFunction {
type ScatterType
@Input
var originalInput: ScatterType = _
@Input
var tempDirectories: List[File] = Nil
@Output
var scatterParts: List[ScatterType] = Nil
def setOriginalFunction(originalFunction: ScatterGatherableFunction) = {}
}

View File

@ -0,0 +1,135 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.CommandLineFunction
import java.lang.reflect.Field
import java.io.File
import org.broadinstitute.sting.queue.util._
trait ScatterGatherableFunction extends CommandLineFunction {
@Internal
var scatterCount: Int = 1
def scatterField = this.inputFields.find(field => ReflectionUtils.hasAnnotation(field, classOf[Scatter])).get
def scatterGatherable = {
if (scatterCount < 2)
false
else if (!hasFieldValue(scatterField))
false
else
true
}
def generateFunctions() = ScatterGatherableFunction.generateFunctions(this)
}
object ScatterGatherableFunction {
private def generateFunctions(originalFunction: ScatterGatherableFunction) = {
var functions = List.empty[CommandLineFunction]
var tempDirectories = List.empty[File]
// Create a function that will remove any temporary items
var cleanupFunction = new CleanupTempDirsFunction
cleanupFunction.jobNamePrefix = originalFunction.jobNamePrefix
cleanupFunction.commandDirectory = originalFunction.commandDirectory
// Find the field with @Scatter and its value
var scatterField = originalFunction.scatterField
val originalValue = originalFunction.getFieldValue(scatterField)
// Create the scatter function based on @Scatter
val scatterFunction = getScatterFunction(scatterField)
scatterFunction.setOriginalFunction(originalFunction)
scatterFunction.jobNamePrefix = originalFunction.jobNamePrefix
scatterFunction.commandDirectory = originalFunction.temp("scatter-" + scatterField.getName)
scatterFunction.originalInput = originalValue.asInstanceOf[scatterFunction.ScatterType]
tempDirectories :+= scatterFunction.commandDirectory
functions :+= scatterFunction
// Create the gather functions for each output field
var gatherFunctions = Map.empty[Field, GatherFunction]
for (outputField <- originalFunction.outputFields) {
// Create the gather function based on @Gather
val gatherFunction = getGatherFunction(outputField)
gatherFunction.setOriginalFunction(originalFunction)
gatherFunction.jobNamePrefix = originalFunction.jobNamePrefix
gatherFunction.commandDirectory = originalFunction.temp("gather-" + outputField.getName)
val gatheredValue = originalFunction.getFieldValue(outputField).asInstanceOf[gatherFunction.GatherType]
gatherFunction.originalOutput = gatheredValue
tempDirectories :+= gatherFunction.commandDirectory
cleanupFunction.originalOutputs :+= gatheredValue
functions :+= gatherFunction
gatherFunctions += outputField -> gatherFunction
}
// Create the clone functions for running the parallel jobs
var cloneFunctions = List.empty[CommandLineFunction]
for (i <- 1 to originalFunction.scatterCount) {
val cloneFunction = newFunctionClone(originalFunction)
cloneFunctions :+= cloneFunction
val tempDir = originalFunction.temp("temp-"+i)
cloneFunction.commandDirectory = tempDir
tempDirectories :+= tempDir
// Reset the input of the clone to the the temp dir and add it as an output of the scatter
var scatterPart = CollectionUtils.updated(originalValue, resetToTempDir(tempDir))
scatterFunction.scatterParts :+= scatterPart.asInstanceOf[scatterFunction.ScatterType]
cloneFunction.setFieldValue(scatterField, scatterPart)
// For each each output field, change value to the temp dir and feed it into the gatherer
for (outputField <- originalFunction.outputFields) {
val gatherFunction = gatherFunctions(outputField)
val gatherPart = cloneFunction.mapField(outputField, resetToTempDir(tempDir))
gatherFunction.gatherParts :+= gatherPart.asInstanceOf[gatherFunction.GatherType]
}
}
functions = cloneFunctions ::: functions
// Create a function to create all of the temp directories.
// All of its inputs are the inputs of the original function.
val initializeFunction = new CreateTempDirsFunction
initializeFunction.jobNamePrefix = originalFunction.jobNamePrefix
initializeFunction.commandDirectory = originalFunction.commandDirectory
for (inputField <- originalFunction.inputFields)
initializeFunction.originalInputs :+= originalFunction.getFieldValue(inputField)
initializeFunction.tempDirectories = tempDirectories
scatterFunction.tempDirectories = tempDirectories
cleanupFunction.tempDirectories = tempDirectories
functions +:= initializeFunction
functions :+= cleanupFunction
// Return all the various functions we created
functions
}
private def resetToTempDir(tempDir: File): Any => Any = {
(any: Any) => {
any match {
case file: File => IOUtils.reset(tempDir, file)
case x => x
}
}
}
private def getScatterFunction(inputField: Field) =
ReflectionUtils.getAnnotation(inputField, classOf[Scatter]).value.newInstance.asInstanceOf[ScatterFunction]
private def getGatherFunction(outputField: Field) =
ReflectionUtils.getAnnotation(outputField, classOf[Gather]).value.newInstance.asInstanceOf[GatherFunction]
private def newFunctionClone(originalFunction: ScatterGatherableFunction) = {
val cloneFunction = originalFunction.cloneFunction.asInstanceOf[ScatterGatherableFunction]
// Make sure clone doesn't get scattered
cloneFunction.scatterCount = 1
cloneFunction
}
}

View File

@ -0,0 +1,10 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
class SimpleTextGatherFunction extends GatherFunction {
type GatherType = File
// TODO: Write a text merging utility that takes into account headers.
def commandLine = "mergeText.sh %s%s".format(originalOutput, repeat(" ", gatherParts))
}

View File

@ -0,0 +1,45 @@
package org.broadinstitute.sting.queue.util
/**
* Utilities that try to deeply apply operations to collections
*/
object CollectionUtils {
def test(value: Any, f: Any => Boolean): Boolean = {
var result = f(value)
foreach(value, (item, collection) => {
result |= f(item)
})
result
}
def updated(value: Any, f: (Any) => Any): Any = {
value match {
case seq: Seq[_] => seq.map(updated(_, f))
case array: Array[_] => array.map(updated(_, f))
case option: Option[_] => option.map(updated(_, f))
case x => f(x)
}
}
def foreach(value: Any, f: (Any, Any) => Unit): Unit = {
value match {
case seq: Seq[_] =>
for (item <- seq) {
f(item, seq)
foreach(item, f)
}
case product: Product =>
for (item <- product.productIterator) {
f(item, product)
foreach(item, f)
}
case array: Array[_] =>
for (item <- array) {
f(item, array)
foreach(item, f)
}
case _ =>
}
}
}

View File

@ -0,0 +1,31 @@
package org.broadinstitute.sting.queue.util
import java.io.{IOException, File}
object IOUtils {
val CURRENT_DIR = new File(".")
def sub(parent: File, subPath: String) = {
val file = new File(subPath)
if (parent == CURRENT_DIR && file == CURRENT_DIR)
CURRENT_DIR.getCanonicalFile
else if (parent == CURRENT_DIR || file.isAbsolute)
file
else if (file == CURRENT_DIR)
parent
else
new File(parent, subPath)
}
def temp(prefix: String, suffix: String = "") = {
val tempDir = File.createTempFile(prefix + "-", suffix)
if(!tempDir.delete)
throw new IOException("Could not delete sub file: " + tempDir.getAbsolutePath())
if(!tempDir.mkdir)
throw new IOException("Could not create sub directory: " + tempDir.getAbsolutePath())
tempDir
}
def reset(dir: File, file: File) = sub(dir, file.getName).getAbsoluteFile
def absolute(dir: File, file: File) = sub(dir, file.getPath).getAbsoluteFile
}

View File

@ -7,9 +7,7 @@ import org.apache.log4j._
*/
trait Logging {
private val className = this.getClass.getName
protected lazy val logger = configuredLogger
def configuredLogger = {
protected lazy val logger = {
Logging.configureLogging
Logger.getLogger(className)
}
@ -17,15 +15,17 @@ trait Logging {
object Logging {
private var configured = false
private var isDebug = false
private var level = Level.INFO
def configureLogging = {
if (!configured) {
var root = Logger.getRootLogger
root.addAppender(new ConsoleAppender(new PatternLayout("%-5p %d{HH:mm:ss,SSS} - %m %n")))
root.setLevel(if(isDebug) Level.DEBUG else Level.INFO)
root.setLevel(level)
configured = true
}
}
def enableDebug = {isDebug = true; Logger.getRootLogger.setLevel(Level.DEBUG)}
def setDebug = setLevel(Level.DEBUG)
def setTrace = setLevel(Level.TRACE)
private def setLevel(level: Level) = {this.level = level; Logger.getRootLogger.setLevel(level)}
}

View File

@ -8,13 +8,20 @@ import scala.collection.immutable.ListMap
import java.lang.reflect.{ParameterizedType, Field}
object ReflectionUtils {
def getField(obj: AnyRef, name: String) = getAllFields(obj.getClass).find(_.getName == name)
def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null
def getAnnotation[T <: Annotation](field: Field, annotation: Class[T]): T = {
if (!hasAnnotation(field, annotation))
throw new QException("Field %s is missing annotation %s".format(field, annotation))
field.getAnnotation(annotation).asInstanceOf[T]
}
def getFieldsAnnotatedWith(obj: AnyRef, fields: List[Field], annotation: Class[_ <: Annotation]) =
ListMap(fields.filter(field => hasAnnotation(field, annotation))
.map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*)
def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList)
def filterFields(fields: List[Field], annotation: Class[_ <: Annotation]) = fields.filter(field => hasAnnotation(field, annotation))
def getFieldNamesValues(obj: AnyRef, fields: List[Field]) =
ListMap(fields.map(field => (field.getName -> fieldGetter(field).invoke(obj))) :_*)
def getAllTypes(clazz: Class[_]) = {
var types = List.empty[Class[_]]
@ -26,19 +33,13 @@ object ReflectionUtils {
types
}
def getAllFields(clazz: Class[_]) = getAllTypes(clazz).map(_.getDeclaredFields).flatMap(_.toList)
def setValue(obj: AnyRef, field: Field, value: String) = {
def getValue(obj: AnyRef, field: Field) = fieldGetter(field).invoke(obj)
def setValue(obj: AnyRef, field: Field, value: Any) = fieldSetter(field).invoke(obj, value.asInstanceOf[AnyRef])
def addOrUpdateWithStringValue(obj: AnyRef, field: Field, value: String) = {
val getter = fieldGetter(field)
val setter = fieldSetter(field)
if (getter == null)
throw new QException("Field may be private? Unable to find getter for field: " + field)
if (getter == null)
throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field)
if (classOf[Seq[_]].isAssignableFrom(field.getType)) {
val fieldType = getCollectionType(field)
@ -87,8 +88,19 @@ object ReflectionUtils {
else None
}
private[util] def fieldGetter(field: Field) = field.getDeclaringClass.getMethod(field.getName)
private[util] def fieldSetter(field: Field) = field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType)
private def fieldGetter(field: Field) =
try {
field.getDeclaringClass.getMethod(field.getName)
} catch {
case e: NoSuchMethodException => throw new QException("Field may be private? Unable to find getter for field: " + field)
}
private def fieldSetter(field: Field) =
try {
field.getDeclaringClass.getMethod(field.getName+"_$eq", field.getType)
} catch {
case e: NoSuchMethodException => throw new QException("Field may be a val instead of var? Unable to find setter for field: " + field)
}
private def coerce(clazz: Class[_], value: String) = {
if (classOf[String] == clazz) value

36
shell/mergeText.sh 100755
View File

@ -0,0 +1,36 @@
#!/bin/sh
# Merges a set of files, skipping over common headers.
if [ $# -lt 2 ]; then
echo "Usage: $0 <output> <input> [ .. <input> ]"
exit 1
elif [ $# -eq 2 ]; then
cp $2 $1
else
outputFile=$1
shift
test -e $outputFile && rm -f $outputFile
exec 3< $1
exec 4< $2
startLine=1
while true; do
read -u 3 header1
if [ $? -ne 0 ]; then break; fi
read -u 4 header2
if [ $? -ne 0 ]; then break; fi
if [ $header1 != $header2 ]; then break; fi
echo "$header1" >> outputfile
((startLine++))
done
exec 3<&-
exec 4<&-
for inputFile in $@; do
tail -n +$startLine $inputFile >> $outputFile
done
fi

View File

@ -0,0 +1,55 @@
#!/bin/sh
# Splits an interval list into multiple files
if [ $# -lt 2 ]; then
echo "Usage: $0 <input> <output> [ .. <output> ]"
exit 1
else
inputFile=$1
shift
totalLines=$(wc -l < $inputFile)
exec 3< $inputFile
numHeaders=0
while true; do
read -u 3 nextLine
if [ $? -ne 0 ]; then break; fi
if [[ $nextLine != @* ]]; then break; fi
((numHeaders++))
done
numFiles=$#
((numIntervals = totalLines - numHeaders))
if [ $numIntervals -lt $numFiles ]; then
echo "Error: Number of intervals $numIntervals is less than the number of files $numFiles."
exec 3<&-
exit 1
fi
((linesPerFile = numIntervals / numFiles))
((remainder = numIntervals % numFiles))
((linesPerFile++))
fileNumber=0
for outputFile in $@; do
# Earlier files with get the remainder until it's no longer needed.
if [ $fileNumber -eq $remainder ]; then ((linesPerFile--)); fi
((fileNumber++))
head -n $numHeaders $inputFile > $outputFile
for ((line=0; line<$linesPerFile; line++)); do
echo "$nextLine" >> $outputFile
read -u 3 nextLine
if [ $? -ne 0 ]; then break; fi
done
done
exec 3<&-
fi