Exposed CommandLineFunction defaults to the Queue.jar command line (see -help).

Added ability to skip up-to-date jobs where the outputs are older than the inputs.
Changed -T CountDuplicates --quiet to --quietLocus so that Queue GATK extensions can use both short and full argument names.
Short names can be used to set values on Queue GATK extensions, for example: vf.XL :+= myFile
Moved Hidden from the GATK to StingUtils.
Updated ivy from 2.0.0 to 2.2.0-rc1 to fix sha1 issue: http://bit.ly/aX72w7
Added Queue to javadoc and testing build targets.
Added first Queue unit test.
Another pass at avoiding cycles in the DAG thanks to all function I/O being files.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4017 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-08-11 21:58:26 +00:00
parent 8c08f47923
commit f39dce1082
28 changed files with 659 additions and 259 deletions

110
build.xml
View File

@ -60,10 +60,25 @@
</fileset>
</path>
<!-- Path to queue dependencies. -->
<path id="queue.dependencies">
<path refid="runtime.dependencies" />
<pathelement location="${java.classes}" />
</path>
<!-- Path to queue-gatk-extendsions dependencies. -->
<path id="queue-gatk-extensions.dependencies">
<path refid="runtime.dependencies" />
<pathelement location="${java.classes}" />
<pathelement location="${queue.classes}" />
<!-- Need the resources as we will be running a command line program which needs the help text. -->
<pathelement location="${resource.path}" />
</path>
<target name="resolve" depends="init"
description="locate and download library dependencies">
<!-- ivy properties -->
<property name="ivy.install.version" value="2.0.0"/>
<property name="ivy.install.version" value="2.2.0-rc1"/>
<property name="ivy.home" value="${user.home}/.ant"/>
<property name="ivy.jar.dir" value="${ivy.home}/lib"/>
<property name="ivy.jar.file" value="ivy-${ivy.install.version}.jar"/>
@ -191,34 +206,29 @@
<!-- Queue depends on the gatk since it contains the StingUtils (including CommandLine) -->
<target name="queue.compile" depends="init,resolve,gatk.compile,init.scalatasks" if="queue.include" description="build Queue">
<path id="queue.classpath">
<path refid="runtime.dependencies" />
<pathelement location="${java.classes}" />
</path>
<mkdir dir="${queue.classes}"/>
<echo>Building Queue...</echo>
<scalac srcdir="${queue.source.dir}" destdir="${queue.classes}" classpathref="queue.classpath" deprecation="yes" unchecked="yes">
<scalac srcdir="${queue.source.dir}" destdir="${queue.classes}" classpathref="queue.dependencies" deprecation="yes" unchecked="yes">
<include name="org/broadinstitute/sting/queue/**/*.scala"/>
</scalac>
</target>
<!-- NOTE: Extracting help first to avoid "Unable to load help text. Help output will be sparse." warning message. -->
<target name="queue-gatk-extensions.compile" depends="gatk.compile, queue.compile, extracthelp" if="queue-gatk-extensions.include" description="generate GATK modules for Queue">
<target name="queue-gatk-extensions.generate" depends="gatk.compile, queue.compile, extracthelp" if="queue-gatk-extensions.include" description="generate GATK modules for Queue">
<mkdir dir="${queue-gatk-extensions.source.dir}"/>
<mkdir dir="${queue-gatk-extensions.classes}"/>
<path id="queue-gatk-extensions.classpath">
<path refid="runtime.dependencies" />
<pathelement location="${resource.path}" />
<pathelement location="${java.classes}" />
<pathelement location="${queue.classes}" />
</path>
<echo>Generating Queue GATK extensions...</echo>
<java fork="true" classname="org.broadinstitute.sting.queue.extensions.gatk.GATKExtensionsGenerator" classpathref="queue-gatk-extensions.classpath">
<java fork="true" failonerror="true" classname="org.broadinstitute.sting.queue.extensions.gatk.GATKExtensionsGenerator" classpathref="queue-gatk-extensions.dependencies">
<arg value="-outDir" />
<arg path="${queue-gatk-extensions.source.dir}" />
<arg value="-l" />
<arg value="WARN" />
</java>
</target>
<target name="queue-gatk-extensions.compile" depends="queue-gatk-extensions.generate" if="queue-gatk-extensions.include" description="compile GATK modules for Queue">
<mkdir dir="${queue-gatk-extensions.classes}"/>
<echo>Building Queue GATK extensions...</echo>
<scalac srcdir="${queue-gatk-extensions.source.dir}" destdir="${queue-gatk-extensions.classes}" classpathref="queue-gatk-extensions.classpath" deprecation="yes" unchecked="yes">
<scalac srcdir="${queue-gatk-extensions.source.dir}" destdir="${queue-gatk-extensions.classes}" classpathref="queue-gatk-extensions.dependencies" deprecation="yes" unchecked="yes">
<include name="**/*.scala"/>
</scalac>
</target>
@ -395,19 +405,19 @@
<target name="dist" depends="sting.manifests" />
<target name="core" description="force a build of the Sting core code">
<antcall target="dist" inheritAll="true">
<antcall target="dist" inheritAll="false">
<param name="sting.target" value="core" />
</antcall>
</target>
<target name="playground" description="force a build of the Sting experimental code">
<antcall target="dist" inheritAll="true">
<antcall target="dist" inheritAll="false">
<param name="sting.target" value="playground" />
</antcall>
</target>
<target name="oneoffs" description="force a build of the Sting experimental code and one-offs">
<antcall target="dist" inheritAll="true">
<antcall target="dist" inheritAll="false">
<param name="sting.target" value="oneoffs" />
</antcall>
</target>
@ -424,19 +434,35 @@
</antcall>
</target>
<target name="java.test.compile" depends="oneoffs">
<target name="init.test.compile">
<property name="gatk.target" value="oneoffs"/>
<property name="queue.target" value="core"/>
<property name="queue-gatk-extensions.target" value="core"/>
</target>
<target name="test.compile" depends="init.test.compile,dist">
<echo message="Sting: Compiling test cases!"/>
<mkdir dir="${java.test.classes}"/>
<javac destdir="${java.test.classes}" debug="true" optimize="on">
<src path="${java.test.sources}"/>
<exclude name="**/playground/**" unless="include.playground"/>
<exclude name="**/oneoffprojects/**" unless="include.oneoffs"/>
<classpath>
<path refid="runtime.dependencies" />
<pathelement location="${java.classes}"/>
<pathelement location="lib/junit-4.4.jar"/>
</classpath>
</javac>
<echo message="Queue: Compiling test cases!"/>
<mkdir dir="${queue.test.classes}"/>
<scalac srcdir="${queue.test.sources}" destdir="${queue.test.classes}" deprecation="yes" unchecked="yes">
<include name="org/broadinstitute/sting/queue/**/*.scala"/>
<classpath>
<path refid="queue.dependencies"/>
<pathelement location="${queue.classes}"/>
<pathelement location="${java.test.classes}"/>
<pathelement location="lib/junit-4.4.jar"/>
</classpath>
</scalac>
</target>
<!-- new scala target -->
@ -484,6 +510,8 @@
<property name="java.test.classes" value="${java.classes}/testclasses"/>
<property name="test.output" value="${dist.dir}/test"/>
<property name="java.test.sources" value="java/test"/>
<property name="queue.test.classes" value="${queue.classes}/testclasses"/>
<property name="queue.test.sources" value="scala/test"/>
<!-- provide a ceiling on the memory that unit/integration tests can consume. -->
<property name="test.maxmemory" value="2g"/>
@ -497,9 +525,11 @@
<formatter type="brief" usefile="false" />
<formatter type="xml"/>
<classpath>
<pathelement location="${java.classes}"/>
<path refid="runtime.dependencies"/>
<pathelement location="${java.classes}"/>
<pathelement location="${java.test.classes}"/>
<pathelement location="${queue.classes}"/>
<pathelement location="${queue.test.classes}"/>
<pathelement location="lib/junit-4.4.jar"/>
</classpath>
@ -508,6 +538,10 @@
<include name="**/@{testtype}.class"/>
<exclude name="**/BaseTest.class"/>
</fileset>
<fileset dir="${queue.test.classes}">
<include name="**/@{testtype}.class"/>
<exclude name="**/BaseTest.class"/>
</fileset>
</batchtest>
</junit>
<fail message="test failed" if="test.failure" />
@ -516,19 +550,19 @@
<!-- our three different test conditions: Test, IntegrationTest, PerformanceTest -->
<target name="test" depends="java.test.compile,tribble.test" description="Run unit tests">
<target name="test" depends="test.compile,tribble.test" description="Run unit tests">
<condition property="ttype" value="*UnitTest" else="${single}">
<not><isset property="single"/></not>
</condition>
<run-test testtype="${ttype}"/>
</target>
<target name="integrationtest" depends="java.test.compile" description="Run integration tests">
<target name="integrationtest" depends="test.compile" description="Run integration tests">
<condition property="itype" value="*IntegrationTest" else="${single}">
<not><isset property="single"/></not>
</condition>
<run-test testtype="${itype}"/>
</target>
<target name="performancetest" depends="java.test.compile" description="Run performance tests">
<target name="performancetest" depends="test.compile" description="Run performance tests">
<condition property="ptype" value="*PerformanceTest" else="${single}">
<not><isset property="single"/></not>
</condition>
@ -564,13 +598,33 @@
</target>
<!-- ***************************************************************************** -->
<target name="clean.javadoc">
<delete dir="javadoc"/>
</target>
<target name="javadoc" depends="init,resolve" description="generates javadoc">
<target name="init.javadoc">
<!-- Set the properties needed to build Queue and the Queue GATK Extensions -->
<property name="gatk.target" value="oneoffs"/>
<property name="queue.target" value="core"/>
<property name="queue-gatk-extensions.target" value="core"/>
</target>
<target name="javadoc" depends="init.javadoc,resolve,queue-gatk-extensions.generate" description="generates javadoc">
<mkdir dir="javadoc"/>
<javadoc destdir="javadoc"
classpathref="runtime.dependencies">
<packageset refid="java.source.files"/>
</javadoc>
<mkdir dir="javadoc/queue"/>
<scaladoc srcdir="${queue.source.dir}" destdir="javadoc/queue" classpathref="queue.dependencies" deprecation="yes" unchecked="yes">
<include name="org/broadinstitute/sting/queue/**/*.scala"/>
</scaladoc>
<mkdir dir="javadoc/queue-gatk-extensions"/>
<scaladoc srcdir="${queue-gatk-extensions.source.dir}" destdir="javadoc/queue-gatk-extensions" classpathref="queue-gatk-extensions.dependencies" deprecation="yes" unchecked="yes">
<include name="org/broadinstitute/sting/queue/extensions/**/*.scala"/>
</scaladoc>
</target>
<!-- Unzip all classes from their current locations and assemble them in a staging directory -->
@ -648,7 +702,7 @@
<target name="clean" description="clean up" depends="tribble.clean">
<target name="clean" description="clean up" depends="tribble.clean,clean.javadoc">
<delete dir="out"/>
<delete dir="${build.dir}"/>
<delete dir="lib"/>

View File

@ -25,8 +25,6 @@
package org.broadinstitute.sting.commandline;
import org.broadinstitute.sting.gatk.walkers.Hidden;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;

View File

@ -26,7 +26,6 @@
package org.broadinstitute.sting.commandline;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.gatk.walkers.Hidden;
import org.apache.log4j.Logger;
import java.lang.annotation.Annotation;

View File

@ -22,7 +22,7 @@
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.walkers;
package org.broadinstitute.sting.commandline;
import java.lang.annotation.*;

View File

@ -26,7 +26,7 @@
package org.broadinstitute.sting.gatk;
import net.sf.picard.filter.SamRecordFilter;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.commandline.Hidden;
import org.broadinstitute.sting.gatk.filters.FilterManager;
import org.broadinstitute.sting.gatk.refdata.tracks.RMDTrack;
import org.broadinstitute.sting.gatk.walkers.*;

View File

@ -28,6 +28,7 @@ package org.broadinstitute.sting.gatk.walkers.indels;
import net.sf.samtools.*;
import net.sf.samtools.util.StringUtil;
import org.broad.tribble.util.variantcontext.VariantContext;
import org.broadinstitute.sting.commandline.Hidden;
import org.broadinstitute.sting.utils.interval.IntervalMergingRule;
import org.broadinstitute.sting.utils.interval.IntervalUtils;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
@ -36,7 +37,6 @@ import org.broadinstitute.sting.gatk.refdata.utils.GATKFeature;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.Reference;
import org.broadinstitute.sting.gatk.walkers.Window;
import org.broadinstitute.sting.gatk.walkers.Hidden;
import org.broadinstitute.sting.gatk.filters.BadMateFilter;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.interval.IntervalFileMergingIterator;

View File

@ -51,7 +51,7 @@ class DuplicateCount {
* @author mark DePristo
*/
public class CountDuplicatesWalker extends DuplicateWalker<DuplicateCount, DuplicateCount> {
@Argument(fullName="quiet", required=false, doc="If true, per locus information isn't printex")
@Argument(fullName="quietLocus", required=false, doc="If true, per locus information isn't printed")
public boolean quiet = false;
/**

View File

@ -47,10 +47,37 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
@Override protected String getExclusiveOf() { return escape(argumentDefinition.exclusiveOf); }
@Override protected String getValidation() { return escape(argumentDefinition.validation); }
protected final String getShortFieldGetter() { return getFieldName(getRawShortFieldName()); }
protected final String getShortFieldSetter() { return getFieldName(getRawShortFieldName() + "_="); }
protected String getRawShortFieldName() { return argumentDefinition.shortName; }
@Override protected String getDefineAddition() {
if (argumentDefinition.shortName == null)
return "";
else if(getShortFieldGetter().equals(getFieldName()))
return "";
else
return String.format("%n" +
"/** %n" +
" * Short name of %1$s%n" +
" * @return Short name of %1$s%n" +
" */%n" +
"def %3$s = this.%1$s%n" +
"%n" +
"/** %n" +
" * Short name of %1$s%n" +
" * @param value Short name of %1$s%n" +
" */%n" +
"def %4$s(value: %2$s) = this.%1$s = value%n",
getFieldName(),
getFieldType(),
getShortFieldGetter(),
getShortFieldSetter());
}
protected static final String REQUIRED_TEMPLATE = " + \" %1$s \" + %2$s.format(%3$s)";
protected static final String REPEAT_TEMPLATE = " + repeat(\" %1$s \", %3$s, format=%2$s)";
protected static final String OPTIONAL_TEMPLATE = " + optional(\" %1$s \", %3$s, format=%2$s)";
protected static final String FLAG_TEMPLATE = " + (if (%3$s) \" %1$s \" else \"\")";
protected static final String FLAG_TEMPLATE = " + (if (%3$s) \" %1$s\" else \"\")";
public final String getCommandLineAddition() {
return String.format(getCommandLineTemplate(), getCommandLineParam(), getCommandLineFormat(), getFieldName());
@ -169,6 +196,7 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
@Override protected Class<?> getInnerType() { return String.class; }
@Override protected String getRawFieldName() { return super.getRawFieldName() + "String"; }
@Override protected String getFullName() { return super.getFullName() + "String"; }
@Override protected String getRawShortFieldName() { return super.getRawShortFieldName() + "String"; }
@Override protected String getFieldType() { return "List[String]"; }
@Override protected String getDefaultValue() { return "Nil"; }
@Override public String getCommandLineTemplate() { return REPEAT_TEMPLATE; }

View File

@ -62,7 +62,8 @@ public abstract class ArgumentField {
return String.format("%n" +
"/** %s */%n" +
"@%s(fullName=\"%s\", shortName=\"%s\", doc=\"%s\", required=%s, exclusiveOf=\"%s\", validation=\"%s\")%n" +
"%svar %s: %s = %s%n",
"%svar %s: %s = %s%n" +
"%s",
getDoc(),
getAnnotationIOClass().getSimpleName(),
getFullName(),
@ -71,9 +72,13 @@ public abstract class ArgumentField {
isRequired(),
getExclusiveOf(),
getValidation(),
getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue());
getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(),
getDefineAddition());
}
/** @return Scala code with defines to append to the argument definition. */
protected String getDefineAddition() { return ""; }
/** @return Scala code to append to the command line. */
public abstract String getCommandLineAddition();
@ -141,7 +146,7 @@ public abstract class ArgumentField {
*/
protected static String getFieldName(String rawFieldName) {
String fieldName = rawFieldName;
if (!StringUtils.isAlpha(fieldName.substring(0,1)))
if (StringUtils.isNumeric(fieldName.substring(0,1)))
fieldName = "_" + fieldName;
if (isReserved(fieldName) || fieldName.contains("-"))
fieldName = "`" + fieldName + "`";

View File

@ -117,7 +117,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType));
writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME,
walkerName, String.format("analysis_type = \"%s\"%n%n", walkerName), argumentFields);
walkerName, String.format("analysis_type = \"%s\"%n", walkerName), argumentFields);
}
}
}

View File

@ -20,7 +20,7 @@ public class DuplicatesWalkersIntegrationTest extends WalkerTest {
List<File> result = executeTest(name, spec).getFirst();
}
@Test public void testChr110Mb() { testCounter("testChr1-10mb", "-L chr1:1-10,000,000 --quiet", "fa8bfdd0b62a13a543bae90f7c674db7"); }
@Test public void testChr110Mb() { testCounter("testChr1-10mb", "-L chr1:1-10,000,000 --quietLocus", "fa8bfdd0b62a13a543bae90f7c674db7"); }
@Test public void testIntervalVerbose() { testCounter("testIntervalVerbose", "-L chr1:6,527,154-6,528,292", "1ebcc10b85af16805a54391721776657"); }
public void testCombiner(String name, String args, String md51, String md52) {

View File

@ -38,15 +38,9 @@ class UnifiedGenotyperExample extends QScript {
val vf = new VariantFiltration with UnifiedGenotyperArguments
val ve = new VariantEval with UnifiedGenotyperArguments
val pr = new PrintReads with UnifiedGenotyperArguments
pr.input_file :+= bam
pr.outputBamFile = swapExt(bam, "bam", "new.bam")
pr.scatterCount = 2
pr.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = new File("/path/to/jar") }
add(pr)
// Make sure the Sting/shell folder is in your path to use mergeText.sh and splitIntervals.sh.
ug.scatterCount = 3
ug.cleanupTempDirectories = true
ug.input_file :+= bam
ug.out = swapExt(bam, "bam", "unfiltered.vcf")
@ -56,8 +50,7 @@ class UnifiedGenotyperExample extends QScript {
ve.rodBind :+= RodBind("vcf", "VCF", vf.out)
ve.out = swapExt(bam, "bam", "eval")
//add(ug, vf, ve)
add(ug, vf, ve)
}
}
}

View File

@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue
import java.io.File
import java.util.Arrays
import org.broadinstitute.sting.queue.engine.QGraph
import org.broadinstitute.sting.commandline.{ClassType, Input, Argument, CommandLineProgram}
import org.broadinstitute.sting.queue.util.{Logging, ScalaCompoundArgumentTypeDescriptor}
import org.broadinstitute.sting.commandline._
/**
* Entry point of Queue. Compiles and runs QScripts passed in to the command line.
@ -20,11 +20,23 @@ class QCommandLine extends CommandLineProgram with Logging {
@Argument(fullName="bsub_wait_jobs", shortName="bsubWait", doc="Wait for bsub submitted jobs before exiting", required=false)
private var bsubWaitJobs = false
@Argument(fullName="run_scripts", shortName="run", doc="Run QScripts", required=false)
@Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false)
private var run = false
@Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false)
private var queueDot: File = _
private var dotFile: File = _
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
private var expandedDotFile: File = _
@Argument(fullName="skip_up_to_date", shortName="skipUpToDate", doc="Does not run command line functions that don't depend on other jobs if the outputs exist and are older than the inputs.", required=false)
private var skipUpToDate = false
@Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden
private var runScripts = false
@ArgumentCollection
private val qSettings = new QSettings
/**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated
@ -32,9 +44,13 @@ class QCommandLine extends CommandLineProgram with Logging {
*/
def execute = {
val qGraph = new QGraph
qGraph.dryRun = !run
qGraph.dryRun = !(run || runScripts)
qGraph.bsubAllJobs = bsubAllJobs
qGraph.bsubWaitJobs = bsubWaitJobs
qGraph.skipUpToDateJobs = skipUpToDate
qGraph.dotFile = dotFile
qGraph.expandedDotFile = expandedDotFile
qGraph.qSettings = qSettings
val scripts = qScriptManager.createScripts()
for (script <- scripts) {
@ -45,15 +61,9 @@ class QCommandLine extends CommandLineProgram with Logging {
logger.info("Added " + script.functions.size + " functions")
}
logger.info("Binding functions")
qGraph.fillIn
if (queueDot != null) {
logger.info("Generating " + queueDot)
qGraph.renderToDot(queueDot)
}
logger.info("Running generated graph")
qGraph.run
logger.info("Done")
0
}

View File

@ -48,7 +48,7 @@ object QScriptManager extends Logging {
if (scripts.size > 0) {
val settings = new Settings((error: String) => logger.error(error))
val outdir = IOUtils.tempDir("Q-classes").getAbsoluteFile
val outdir = IOUtils.tempDir("Q-classes")
settings.outdir.value = outdir.getPath
// Set the classpath to the current class path.

View File

@ -0,0 +1,42 @@
package org.broadinstitute.sting.queue
import org.broadinstitute.sting.commandline.Argument
import java.io.File
import java.lang.management.ManagementFactory
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
*/
class QSettings {
@Argument(fullName="job_name_prefix", shortName="jobPrefix", doc="Default name prefix for compute farm jobs.", required=false)
var jobNamePrefix: String = QSettings.processNamePrefix
@Argument(fullName="job_queue", shortName="jobQueue", doc="Default queue for compute farm jobs.", required=false)
var jobQueue: String = "broad"
@Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false)
var jobProject: String = "Queue"
@Argument(fullName="job_scatter_gather_directory", shortName="jobSGDir", doc="Default directory to place scatter gather output for compute farm jobs.", required=false)
var jobScatterGatherDirectory: File = _
@Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false)
var memoryLimit: Option[Int] = None
@Argument(fullName="runJobsIfPrecedingFail", shortName="runIfFail", doc="If this flag is set then ALL jobs will run even if the previous jobs fail.", required=false)
var runJobsIfPrecedingFail = false
}
/**
* Default settings settable on the command line and passed to CommandLineFunctions.
*/
object QSettings {
/** A semi-unique job prefix using the host name and the process id. */
private val processNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
}

View File

@ -1,13 +1,14 @@
package org.broadinstitute.sting.queue.engine
import collection.JavaConversions._
import org.broadinstitute.sting.queue.function.{CommandLineFunction, QFunction}
import scala.collection.immutable.ListSet
import org.broadinstitute.sting.queue.util.IOUtils
import java.io.File
/**
* Dispatches jobs to a compute cluster.
*/
trait DispatchJobRunner {
trait DispatchJobRunner extends JobRunner {
/** Type of the job. */
type DispatchJobType
/** An internal cache of all the jobs that have run by command line function. */
@ -15,14 +16,6 @@ trait DispatchJobRunner {
/** An internal list of functions that have no other dependencies. */
private var waitJobsByGraph = Map.empty[QGraph, ListSet[DispatchJobType]]
/**
* Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction
* in which case it waits for all other terminal functions to complete.
* @param function Command to run.
* @param qGraph graph that holds the job, and if this is a dry run.
*/
def dispatch(function: CommandLineFunction, qGraph: QGraph)
/**
* Adds the job to the internal cache of previous jobs and removes the previous jobs that
* the job was dependent on from the list of function that have no dependencies.
@ -47,22 +40,8 @@ trait DispatchJobRunner {
* @param qGraph The graph that contains the jobs.
* @return A list of prior jobs.
*/
protected def previousJobs(function: QFunction, qGraph: QGraph) : List[DispatchJobType] = {
var previous = List.empty[DispatchJobType]
val source = qGraph.jobGraph.getEdgeSource(function)
for (incomingEdge <- qGraph.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along the edge and return its job id
case dispatchFunction: CommandLineFunction => previous :+= dispatchJobs(dispatchFunction)
// For any other type of edge find the LSF jobs preceding the edge
case qFunction: QFunction => previous ++= previousJobs(qFunction, qGraph)
}
}
previous
}
protected def previousJobs(function: CommandLineFunction, qGraph: QGraph) : List[DispatchJobType] =
qGraph.previousJobs(function).map(dispatchJobs(_))
/**
* Returns a set of jobs that have no following jobs in the graph.
@ -81,7 +60,9 @@ trait DispatchJobRunner {
* @return A "cd <dir_1> [&& cd <dir_n>]" command.
*/
protected def mountCommand(function: CommandLineFunction) = {
val dirs = function.jobDirectories
var dirs = Set.empty[File]
for (dir <- function.jobDirectories)
dirs += IOUtils.dirLevel(dir, 2)
if (dirs.size > 0)
Some("\'" + dirs.mkString("cd ", " && cd ", "") + "\'")
else

View File

@ -0,0 +1,16 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.CommandLineFunction
/**
* Base interface for job runners.
*/
trait JobRunner {
/**
* Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction
* in which case it waits for all other terminal functions to complete.
* @param function Command to run.
* @param qGraph graph that holds the job, and if this is a dry run.
*/
def run(function: CommandLineFunction, qGraph: QGraph)
}

View File

@ -6,7 +6,7 @@ import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging}
/**
* Runs jobs on an LSF compute cluster.
*/
trait LsfJobRunner extends DispatchJobRunner with Logging {
class LsfJobRunner extends DispatchJobRunner with Logging {
type DispatchJobType = LsfJob
/**
@ -14,7 +14,7 @@ trait LsfJobRunner extends DispatchJobRunner with Logging {
* @param function Command to run.
* @param qGraph graph that holds the job, and if this is a dry run.
*/
def dispatch(function: CommandLineFunction, qGraph: QGraph) = {
def run(function: CommandLineFunction, qGraph: QGraph) = {
val job = new LsfJob
job.name = function.jobName
job.outputFile = function.jobOutputFile

View File

@ -4,14 +4,15 @@ import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.graph.SimpleDirectedGraph
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.QException
import org.jgrapht.alg.CycleDetector
import org.jgrapht.EdgeFactory
import org.jgrapht.ext.DOTExporter
import java.io.File
import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.{QSettings, QException}
import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction}
/**
* The internal dependency tracker between sets of function input and output files.
@ -20,8 +21,11 @@ class QGraph extends Logging {
var dryRun = true
var bsubAllJobs = false
var bsubWaitJobs = false
val jobGraph = newGraph
def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
var skipUpToDateJobs = false
var dotFile: File = _
var expandedDotFile: File = _
var qSettings: QSettings = _
private val jobGraph = newGraph
/**
* Adds a QScript created CommandLineFunction to the graph.
@ -32,15 +36,130 @@ class QGraph extends Logging {
}
/**
* Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs.
* Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph.
*/
def fillIn = {
// clone since edgeSet is backed by the graph
for (function <- JavaConversions.asSet(jobGraph.edgeSet).clone) {
addCollectionOutputs(function.outputs)
addCollectionInputs(function.inputs)
def run = {
fill
if (dotFile != null)
renderToDot(dotFile)
var numMissingValues = validate
if (numMissingValues == 0 && bsubAllJobs) {
logger.debug("Scatter gathering jobs.")
var scatterGathers = List.empty[ScatterGatherableFunction]
loop({
case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) =>
scatterGathers :+= scatterGather
})
var addedFunctions = List.empty[CommandLineFunction]
for (scatterGather <- scatterGathers) {
val functions = scatterGather.generateFunctions()
if (logger.isTraceEnabled)
logger.trace("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format())))
addedFunctions ++= functions
}
this.jobGraph.removeAllEdges(scatterGathers)
prune
addedFunctions.foreach(this.addFunction(_))
fill
val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile
if (scatterGatherDotFile != null)
renderToDot(scatterGatherDotFile)
numMissingValues = validate
}
val isReady = numMissingValues == 0
if (isReady || this.dryRun)
runJobs
if (numMissingValues > 0) {
logger.error("Total missing values: " + numMissingValues)
}
if (isReady && this.dryRun) {
logger.info("Dry run completed successfully!")
logger.info("Re-run with \"-run\" to execute the functions.")
}
}
/**
* Walks up the graph looking for the previous LsfJobs.
* @param function Function to examine for a previous command line job.
* @param qGraph The graph that contains the jobs.
* @return A list of prior jobs.
*/
def previousJobs(function: QFunction) : List[CommandLineFunction] = {
var previous = List.empty[CommandLineFunction]
val source = this.jobGraph.getEdgeSource(function)
for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
incomingEdge match {
// Stop recursing when we find a job along the edge and return its job id
case commandLineFunction: CommandLineFunction => previous :+= commandLineFunction
// For any other type of edge find the LSF jobs preceding the edge
case qFunction: QFunction => previous ++= previousJobs(qFunction)
}
}
previous
}
/**
* Fills in the graph using mapping functions, then removes out of date
* jobs, then cleans up mapping functions and nodes that aren't need.
*/
private def fill = {
fillIn
if (skipUpToDateJobs)
removeUpToDate
prune
}
/**
* Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs.
*/
private def fillIn = {
// clone since edgeSet is backed by the graph
JavaConversions.asSet(jobGraph.edgeSet).clone.foreach {
case cmd: CommandLineFunction => {
addCollectionOutputs(cmd.outputs)
addCollectionInputs(cmd.inputs)
}
case map: MappingFunction => /* do nothing for mapping functions */
}
}
/**
* Removes functions that are up to date.
*/
private def removeUpToDate = {
var upToDateJobs = Set.empty[CommandLineFunction]
loop({
case f if (upToDate(f, upToDateJobs)) => {
logger.info("Skipping command because it is up to date: %n%s".format(f.commandLine))
upToDateJobs += f
}
})
for (upToDateJob <- upToDateJobs)
jobGraph.removeEdge(upToDateJob)
}
/**
* Returns true if the all previous functions in the graph are up to date, and the function is up to date.
*/
private def upToDate(commandLineFunction: CommandLineFunction, upToDateJobs: Set[CommandLineFunction]) = {
this.previousJobs(commandLineFunction).forall(upToDateJobs.contains(_)) && commandLineFunction.upToDate
}
/**
* Removes mapping edges that aren't being used, and nodes that don't belong to anything.
*/
private def prune = {
var pruning = true
while (pruning) {
pruning = false
@ -55,27 +174,21 @@ class QGraph extends Logging {
}
/**
* Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph.
* Validates that the functions in the graph have no missing values and that there are no cycles.
* @return Number of missing values.
*/
def run = {
var isReady = true
var totalMissingValues = 0
for (function <- JavaConversions.asSet(jobGraph.edgeSet)) {
function match {
case cmd: CommandLineFunction =>
val missingFieldValues = cmd.missingFields
if (missingFieldValues.size > 0) {
totalMissingValues += missingFieldValues.size
logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine))
for (missing <- missingFieldValues)
logger.error(" " + missing)
}
case _ =>
}
}
if (totalMissingValues > 0) {
isReady = false
private def validate = {
var numMissingValues = 0
JavaConversions.asSet(jobGraph.edgeSet).foreach {
case cmd: CommandLineFunction =>
val missingFieldValues = cmd.missingFields
if (missingFieldValues.size > 0) {
numMissingValues += missingFieldValues.size
logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine))
for (missing <- missingFieldValues)
logger.error(" " + missing)
}
case map: MappingFunction => /* do nothing for mapping functions */
}
val detector = new CycleDetector(jobGraph)
@ -83,19 +196,46 @@ class QGraph extends Logging {
logger.error("Cycles were detected in the graph:")
for (cycle <- detector.findCycles)
logger.error(" " + cycle)
isReady = false
throw new QException("Cycles were detected in the graph.")
}
if (isReady || this.dryRun)
(new TopologicalJobScheduler(this) with LsfJobRunner).runJobs
numMissingValues
}
if (totalMissingValues > 0) {
logger.error("Total missing values: " + totalMissingValues)
/**
* Runs the jobs by traversing the graph.
*/
private def runJobs = {
val runner = if (bsubAllJobs) new LsfJobRunner else new ShellJobRunner
val numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size
logger.info("Number of jobs: %s".format(numJobs))
if (logger.isTraceEnabled) {
val numNodes = jobGraph.vertexSet.size
logger.trace("Number of nodes: %s".format(numNodes))
}
var numNodes = 0
if (isReady && this.dryRun) {
logger.info("Dry run completed successfully!")
logger.info("Re-run with \"-run\" to execute the functions.")
loop(
edgeFunction = { case f => runner.run(f, this) },
nodeFunction = {
case node => {
if (logger.isTraceEnabled)
logger.trace("Visiting: " + node)
numNodes += 1
}
})
if (logger.isTraceEnabled)
logger.trace("Done walking %s nodes.".format(numNodes))
if (bsubAllJobs && bsubWaitJobs) {
logger.info("Waiting for jobs to complete.")
val wait = new DispatchWaitFunction
wait.qSettings = this.qSettings
wait.freeze
runner.run(wait, this)
}
}
@ -108,35 +248,29 @@ class QGraph extends Logging {
/**
* Adds a generic QFunction to the graph.
* If the function is scatterable and the jobs request bsub, splits the job into parts and adds the parts instead.
* @param f Generic QFunction to add to the graph.
*/
private def addFunction(f: QFunction): Unit = {
try {
f.freeze
f match {
case scatterGather: ScatterGatherableFunction if (bsubAllJobs && scatterGather.scatterGatherable) =>
val functions = scatterGather.generateFunctions()
if (logger.isTraceEnabled)
logger.trace("Scattered into %d parts: %s".format(functions.size, functions))
functions.foreach(addFunction(_))
case _ =>
val inputs = QNode(f.inputs)
val outputs = QNode(f.outputs)
val newSource = jobGraph.addVertex(inputs)
val newTarget = jobGraph.addVertex(outputs)
val removedEdges = jobGraph.removeAllEdges(inputs, outputs)
val added = jobGraph.addEdge(inputs, outputs, f)
if (logger.isTraceEnabled) {
logger.trace("Mapped from: " + inputs)
logger.trace("Mapped to: " + outputs)
logger.trace("Mapped via: " + f)
logger.trace("Removed edges: " + removedEdges)
logger.trace("New source?: " + newSource)
logger.trace("New target?: " + newTarget)
logger.trace("")
}
case cmd: CommandLineFunction => cmd.qSettings = this.qSettings
case map: MappingFunction => /* do nothing for mapping functions */
}
f.freeze
val inputs = QNode(f.inputs)
val outputs = QNode(f.outputs)
val newSource = jobGraph.addVertex(inputs)
val newTarget = jobGraph.addVertex(outputs)
val removedEdges = jobGraph.removeAllEdges(inputs, outputs)
val added = jobGraph.addEdge(inputs, outputs, f)
if (logger.isTraceEnabled) {
logger.trace("Mapped from: " + inputs)
logger.trace("Mapped to: " + outputs)
logger.trace("Mapped via: " + f)
logger.trace("Removed edges: " + removedEdges)
logger.trace("New source?: " + newSource)
logger.trace("New target?: " + newTarget)
logger.trace("")
}
} catch {
case e: Exception =>
@ -209,12 +343,28 @@ class QGraph extends Logging {
private def isOrphan(node: QNode) =
(jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0
/**
* Utility function for looping over the internal graph and running functions.
* @param edgeFunction Optional function to run for each edge visited.
* @param nodeFunction Optional function to run for each node visited.
*/
private def loop(edgeFunction: PartialFunction[CommandLineFunction, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = {
val iterator = new TopologicalOrderIterator(this.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
case cmd: CommandLineFunction => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd)
case map: MappingFunction => /* do nothing for mapping functions */
}
})
iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node))
}
/**
* Outputs the graph to a .dot file.
* http://en.wikipedia.org/wiki/DOT_language
* @param file Path to output the .dot file.
*/
def renderToDot(file: java.io.File) = {
private def renderToDot(file: java.io.File) = {
val out = new java.io.FileWriter(file)
// todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a

View File

@ -6,7 +6,7 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction
/**
* Runs jobs one at a time locally
*/
trait ShellJobRunner extends Logging {
class ShellJobRunner extends JobRunner with Logging {
/**
* Runs the function on the local shell.
* @param function Command to run.

View File

@ -1,54 +0,0 @@
package org.broadinstitute.sting.queue.engine
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.function._
/**
* Loops over the job graph running jobs as the edges are traversed.
* @param val The graph that contains the jobs to be run.
*/
abstract class TopologicalJobScheduler(private val qGraph: QGraph)
extends ShellJobRunner with DispatchJobRunner with Logging {
protected val iterator = new TopologicalOrderIterator(qGraph.jobGraph)
iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] {
/**
* As each edge is traversed, either dispatch the job or run it locally.
* @param event Event holding the edge that was passed.
*/
override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match {
case f: CommandLineFunction if (qGraph.bsubAllJobs) => dispatch(f, qGraph)
case f: CommandLineFunction => run(f, qGraph)
case f: MappingFunction => /* do nothing for mapping functions */
}
})
/**
* Runs the jobs by traversing the graph.
*/
def runJobs = {
logger.info("Number of jobs: %s".format(qGraph.numJobs))
if (logger.isTraceEnabled)
logger.trace("Number of nodes: %s".format(qGraph.jobGraph.vertexSet.size))
var numNodes = 0
for (target <- iterator) {
if (logger.isTraceEnabled)
logger.trace("Visiting: " + target)
numNodes += 1
// Do nothing for now, let event handler respond
}
if (logger.isTraceEnabled)
logger.trace("Done walking %s nodes.".format(numNodes))
if (qGraph.bsubAllJobs && qGraph.bsubWaitJobs) {
logger.info("Waiting for jobs to complete.")
val wait = new DispatchWaitFunction
wait.freeze
dispatch(wait, qGraph)
}
}
}

View File

@ -6,8 +6,7 @@ import org.broadinstitute.sting.commandline._
import java.io.File
import collection.JavaConversions._
import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather}
import java.lang.management.ManagementFactory
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.{QSettings, QException}
/**
* A command line that will be run in a pipeline.
@ -15,6 +14,9 @@ import org.broadinstitute.sting.queue.QException
trait CommandLineFunction extends QFunction with Logging {
def commandLine: String
/** Default settings */
var qSettings: QSettings = _
/** Upper memory limit */
var memoryLimit: Option[Int] = None
@ -25,16 +27,16 @@ trait CommandLineFunction extends QFunction with Logging {
var commandDirectory: File = IOUtils.CURRENT_DIR
/** Prefix for automatic job name creation */
var jobNamePrefix: String = CommandLineFunction.processNamePrefix
var jobNamePrefix: String = _
/** The name name of the job */
var jobName: String = _
/** Job project to run the command */
var jobProject = "Queue"
var jobProject: String = _
/** Job queue to run the command */
var jobQueue = "broad"
var jobQueue: String = _
/** Temporary directory to write any files */
var jobTempDir: File = new File(System.getProperty("java.io.tmpdir"))
@ -99,6 +101,21 @@ trait CommandLineFunction extends QFunction with Logging {
files
}
/**
* Returns true if all outputs already exist and are older that the inputs.
* If there are no outputs then returns false.
* @return true if all outputs already exist and are older that the inputs.
*/
def upToDate = {
val inputFiles = inputs
val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile))
if (outputFiles.size > 0 && outputFiles.forall(_.exists)) {
val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified))
val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified))
maxInput < minOutput
} else false
}
/**
* Gets the files from the field. The field must be a File, a FileProvider, or a List or Set of either.
* @param fields Field to get files.
@ -177,6 +194,21 @@ trait CommandLineFunction extends QFunction with Logging {
* Sets all field values.
*/
def freezeFieldValues = {
if (jobNamePrefix == null)
jobNamePrefix = qSettings.jobNamePrefix
if (jobQueue == null)
jobQueue = qSettings.jobQueue
if (jobProject == null)
jobProject = qSettings.jobProject
if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined)
memoryLimit = qSettings.memoryLimit
if (qSettings.runJobsIfPrecedingFail)
jobRunOnlyIfPreviousSucceed = false
if (jobName == null)
jobName = CommandLineFunction.nextJobName(jobNamePrefix)
@ -202,7 +234,7 @@ trait CommandLineFunction extends QFunction with Logging {
* Set value to a uniform value across functions.
* Base implementation changes any relative path to an absolute path.
* @param value to be updated
* @returns the modified value, or a copy if the value is immutable
* @return the modified value, or a copy if the value is immutable
*/
protected def canon(value: Any) = {
value match {
@ -276,20 +308,8 @@ trait CommandLineFunction extends QFunction with Logging {
* Scala sugar type for checking annotation required and exclusiveOf.
*/
private type ArgumentAnnotation = {
/**
* Returns true if the field is required.
* @return true if the field is required.
*/
def required(): Boolean
/**
* Returns the comma separated list of fields that may be set instead of this field.
* @return the comma separated list of fields that may be set instead of this field.
*/
def exclusiveOf(): String
/**
* Returns the documentation for this field.
* @return the documentation for this field.
*/
def doc(): String
}
@ -378,15 +398,6 @@ trait CommandLineFunction extends QFunction with Logging {
* A command line that will be run in a pipeline.
*/
object CommandLineFunction {
/** A semi-unique job prefix using the host name and the process id. */
private val processNamePrefix = "Q-" + {
var prefix = ManagementFactory.getRuntimeMXBean.getName
val index = prefix.indexOf(".")
if (index >= 0)
prefix = prefix.substring(0, index)
prefix
}
/** Job index counter for this run of Queue. */
private var jobIndex = 0

View File

@ -9,7 +9,7 @@ import java.io.File
class MappingFunction(val inputs: Set[File], val outputs: Set[File]) extends QFunction {
/**
* For debugging purposes returns <map>.
* @returns <map>
* @return <map>
*/
override def toString = "<map>"
}

View File

@ -20,5 +20,7 @@ class CleanupTempDirsFunction extends CommandLineFunction {
@Argument(doc="rmdir script or command")
var rmdirScript = "rm -rf"
override def upToDate = tempDirectories.forall(!_.exists)
def commandLine = "%s%s".format(rmdirScript, repeat(" '", tempDirectories, "'"))
}

View File

@ -20,6 +20,8 @@ class CreateTempDirsFunction extends CommandLineFunction {
@Argument(doc="mkdir script or command")
var mkdirScript = "mkdir -pv"
override def upToDate = tempDirectories.forall(_.exists)
def commandLine = "%s%s".format(mkdirScript, repeat(" '", tempDirectories, "'"))
/**

View File

@ -154,8 +154,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
*/
override def freezeFieldValues = {
super.freezeFieldValues
if (this.scatterGatherDirectory == null)
this.scatterGatherDirectory = this.commandDirectory
if (this.scatterGatherDirectory == null) {
this.scatterGatherDirectory = qSettings.jobScatterGatherDirectory
if (this.scatterGatherDirectory == null)
this.scatterGatherDirectory = this.commandDirectory
}
}
/**
@ -177,7 +181,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the CreateTempDirsFunction that will create the temporary directories.
* The initializeFunction jobNamePrefix is set so that the CreateTempDirsFunction runs with the same prefix as this ScatterGatherableFunction.
* The initializeFunction qSettings is set so that the CreateTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The initializeFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction.
* The initializeFunction is modified to become dependent on the input files for this ScatterGatherableFunction.
* Calls setupInitializeFunction with initializeFunction.
@ -185,7 +189,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* @param inputFields The input fields that the original function was dependent on.
*/
protected def initInitializeFunction(initializeFunction: CreateTempDirsFunction, inputFields: List[ArgumentSource]) = {
initializeFunction.jobNamePrefix = this.jobNamePrefix
initializeFunction.qSettings = this.qSettings
initializeFunction.commandDirectory = this.commandDirectory
for (inputField <- inputFields)
initializeFunction.originalInputs ++= this.getFieldFiles(inputField)
@ -209,7 +213,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories.
* The scatterFunction jobNamePrefix is set so that the ScatterFunction runs with the same prefix as this ScatterGatherableFunction.
* The scatterFunction qSettings is set so that the ScatterFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The scatterFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory.
* The scatterFunction has it's originalInput set with the file to be scattered into scatterCount pieces.
* Calls scatterFunction.setOriginalFunction with this ScatterGatherableFunction.
@ -218,7 +222,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* @param scatterField The input field being scattered.
*/
protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = {
scatterFunction.jobNamePrefix = this.jobNamePrefix
scatterFunction.qSettings = this.qSettings
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
scatterFunction.originalInput = this.getFieldFile(scatterField)
scatterFunction.setOriginalFunction(this, scatterField)
@ -245,7 +249,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the GatherFunction created by newGatherFunction() that will collect the gather pieces in the temporary directories.
* The gatherFunction jobNamePrefix is set so that the GatherFunction runs with the same prefix as this ScatterGatherableFunction.
* The gatherFunction qSettings is set so that the GatherFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The gatherFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory.
* The gatherFunction has it's originalOutput set with the file to be gathered from the scatterCount pieces.
* Calls the gatherFunction.setOriginalFunction with this ScatterGatherableFunction.
@ -254,7 +258,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* @param gatherField The output field being gathered.
*/
protected def initGatherFunction(gatherFunction: GatherFunction, gatherField: ArgumentSource) = {
gatherFunction.jobNamePrefix = this.jobNamePrefix
gatherFunction.qSettings = this.qSettings
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
@ -332,7 +336,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the CleanupTempDirsFunction created by newCleanupFunction() that will remove the temporary directories.
* The cleanupFunction jobNamePrefix is set so that the CleanupTempDirsFunction runs with the same prefix as this ScatterGatherableFunction.
* The cleanupFunction qSettings is set so that the CleanupTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The cleanupFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction.
* The initializeFunction is modified to become dependent on the output files for this ScatterGatherableFunction.
* Calls setupCleanupFunction with cleanupFunction.
@ -341,7 +345,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* @param outputFields The output fields that the original function was dependent on.
*/
protected def initCleanupFunction(cleanupFunction: CleanupTempDirsFunction, gatherFunctions: Map[ArgumentSource, GatherFunction], outputFields: List[ArgumentSource]) = {
cleanupFunction.jobNamePrefix = this.jobNamePrefix
cleanupFunction.qSettings = this.qSettings
cleanupFunction.commandDirectory = this.commandDirectory
for (gatherField <- outputFields)
cleanupFunction.originalOutputs += gatherFunctions(gatherField).originalOutput

View File

@ -9,7 +9,6 @@ object IOUtils {
/** The current directory "." */
val CURRENT_DIR = new File(".")
/**
* Returns the sub path rooted at the parent.
* If the sub path is already absolute, returns the sub path.
@ -21,27 +20,30 @@ object IOUtils {
* @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path.
*/
def subDir(dir: File, path: String): File =
subDir(dir.getAbsoluteFile, new File(path))
subDir(dir, new File(path))
/**
* Returns the sub path rooted at the parent.
* If the sub path is already absolute, returns the sub path.
* If the parent is the current directory, returns the sub path.
* If the sub bath is the current directory, returns the parent.
* If the sub path is the current directory, returns the parent.
* Else returns new File(parent, subPath)
* @param parent The parent directory
* @param file The sub path to append to the parent, if the path is not absolute.
* @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path.
*/
def subDir(parent: File, file: File): File = {
if (parent == CURRENT_DIR && file == CURRENT_DIR)
CURRENT_DIR.getCanonicalFile.getAbsoluteFile
else if (parent == CURRENT_DIR || file.isAbsolute)
file.getAbsoluteFile
else if (file == CURRENT_DIR)
parent.getAbsoluteFile
val parentAbs = absolute(parent)
val fileAbs = absolute(file)
val currentAbs = absolute(CURRENT_DIR)
if (parentAbs == currentAbs && fileAbs == currentAbs)
absolute(CURRENT_DIR.getCanonicalFile)
else if (parentAbs == currentAbs || file.isAbsolute)
fileAbs
else if (fileAbs == currentAbs)
parentAbs
else
new File(parent, file.getPath).getAbsoluteFile
absolute(new File(parentAbs, file.getPath))
}
/**
@ -50,7 +52,7 @@ object IOUtils {
* @param file Path to the file to be re-rooted.
* @return Absolute path to the new file.
*/
def resetParent(dir: File, file: File) = subDir(dir.getAbsoluteFile, file.getName).getAbsoluteFile
def resetParent(dir: File, file: File) = absolute(subDir(dir, file.getName))
/**
* Creates a scatterGatherTempDir directory with the prefix and optional suffix.
@ -65,6 +67,61 @@ object IOUtils {
throw new IOException("Could not delete sub file: " + temp.getAbsolutePath())
if(!temp.mkdir)
throw new IOException("Could not create sub directory: " + temp.getAbsolutePath())
temp
absolute(temp)
}
/**
* Returns the directory at the number of levels deep.
* For example 2 levels of /path/to/dir will return /path/to
* @param dir Directory path.
* @param level how many levels deep from the root.
* @return The path to the parent directory that is level-levels deep.
*/
def dirLevel(dir: File, level: Int): File = {
var directories = List.empty[File]
var parentDir = absolute(dir)
while (parentDir != null) {
directories +:= parentDir
parentDir = parentDir.getParentFile
}
if (directories.size <= level)
directories.last
else
directories(level)
}
def absolute(file: File) = {
var fileAbs = file.getAbsoluteFile
var names = List.empty[String]
while (fileAbs != null) {
val name = fileAbs.getName
fileAbs = fileAbs.getParentFile
if (name == ".") {
/* skip */
/* TODO: What do we do for ".."?
} else if (name == "..") {
CentOS tcsh says use getCanonicalFile:
~ $ mkdir -p test1/test2
~ $ ln -s test1/test2 test3
~ $ cd test3/..
~/test1 $
Mac bash says keep going with getAbsoluteFile:
~ $ mkdir -p test1/test2
~ $ ln -s test1/test2 test3
~ $ cd test3/..
~ $
For now, leave it and let the shell figure it out.
*/
} else {
names +:= name
}
}
new File(names.mkString("/", "/", ""))
}
}

View File

@ -0,0 +1,102 @@
package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.BaseTest
import org.junit.{Assert, Test}
import java.io.File
class IOUtilsUnitTest extends BaseTest {
@Test
def testAbsoluteSubDir = {
var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/to/file"))
Assert.assertEquals(new File("/path/to/file"), subDir)
subDir = IOUtils.subDir(new File("/different/path"), new File("/path/to/file"))
Assert.assertEquals(new File("/path/to/file"), subDir)
subDir = IOUtils.subDir(new File("/different/path"), IOUtils.CURRENT_DIR)
Assert.assertEquals(new File("/different/path"), subDir)
}
@Test
def testRelativeSubDir = {
var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("path/to/file"))
Assert.assertEquals(new File("path/to/file").getCanonicalFile, subDir.getCanonicalFile)
subDir = IOUtils.subDir(new File("/different/path"), new File("path/to/file"))
Assert.assertEquals(new File("/different/path/path/to/file"), subDir)
}
@Test
def testDottedSubDir = {
var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("path/../to/file"))
Assert.assertEquals(new File("path/../to/./file").getCanonicalFile, subDir.getCanonicalFile)
subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/../to/file"))
Assert.assertEquals(new File("/path/../to/file"), subDir)
subDir = IOUtils.subDir(new File("/different/../path"), new File("path/to/file"))
Assert.assertEquals(new File("/different/../path/path/to/file"), subDir)
subDir = IOUtils.subDir(new File("/different/./path"), new File("/path/../to/file"))
Assert.assertEquals(new File("/path/../to/file"), subDir)
}
@Test
def testResetParent = {
val newFile = IOUtils.resetParent(new File("/new/parent/dir"), new File("/old/parent_dir/file.name"))
Assert.assertEquals(new File("/new/parent/dir/file.name"), newFile)
}
@Test
def testTempDir = {
val tempDir = IOUtils.tempDir("Q-Unit-Test")
Assert.assertTrue(tempDir.exists)
Assert.assertFalse(tempDir.isFile)
Assert.assertTrue(tempDir.isDirectory)
val deleted = tempDir.delete
Assert.assertTrue(deleted)
Assert.assertFalse(tempDir.exists)
}
@Test
def testDirLevel = {
var dir = IOUtils.dirLevel(new File("/path/to/directory"), 1)
Assert.assertEquals(new File("/path"), dir)
dir = IOUtils.dirLevel(new File("/path/to/directory"), 2)
Assert.assertEquals(new File("/path/to"), dir)
dir = IOUtils.dirLevel(new File("/path/to/directory"), 3)
Assert.assertEquals(new File("/path/to/directory"), dir)
dir = IOUtils.dirLevel(new File("/path/to/directory"), 4)
Assert.assertEquals(new File("/path/to/directory"), dir)
}
@Test
def testAbsolute = {
var dir = IOUtils.absolute(new File("/path/./to/./directory/."))
Assert.assertEquals(new File("/path/to/directory"), dir)
dir = IOUtils.absolute(new File("/"))
Assert.assertEquals(new File("/"), dir)
dir = IOUtils.absolute(new File("/."))
Assert.assertEquals(new File("/"), dir)
dir = IOUtils.absolute(new File("/././."))
Assert.assertEquals(new File("/"), dir)
dir = IOUtils.absolute(new File("/./directory/."))
Assert.assertEquals(new File("/directory"), dir)
dir = IOUtils.absolute(new File("/./directory/./"))
Assert.assertEquals(new File("/directory"), dir)
dir = IOUtils.absolute(new File("/./directory./"))
Assert.assertEquals(new File("/directory."), dir)
dir = IOUtils.absolute(new File("/./.directory/"))
Assert.assertEquals(new File("/.directory"), dir)
}
}