diff --git a/build.xml b/build.xml index 3daddf7a9..6c4906b96 100644 --- a/build.xml +++ b/build.xml @@ -60,10 +60,25 @@ + + + + + + + + + + + + + + + - + @@ -191,34 +206,29 @@ - - - - Building Queue... - + - + - - - - - - - Generating Queue GATK extensions... - + + + + + + + Building Queue GATK extensions... - + @@ -395,19 +405,19 @@ - + - + - + @@ -424,19 +434,35 @@ - + + + + + + + - - + + + + + + + + + + + + @@ -484,6 +510,8 @@ + + @@ -497,9 +525,11 @@ - + + + @@ -508,6 +538,10 @@ + + + + @@ -516,19 +550,19 @@ - + - + - + @@ -564,13 +598,33 @@ + + + - + + + + + + + + + + + + + + + + + + @@ -648,7 +702,7 @@ - + diff --git a/java/src/org/broadinstitute/sting/commandline/ArgumentSource.java b/java/src/org/broadinstitute/sting/commandline/ArgumentSource.java index 635780aa5..1ded083bf 100644 --- a/java/src/org/broadinstitute/sting/commandline/ArgumentSource.java +++ b/java/src/org/broadinstitute/sting/commandline/ArgumentSource.java @@ -25,8 +25,6 @@ package org.broadinstitute.sting.commandline; -import org.broadinstitute.sting.gatk.walkers.Hidden; - import java.lang.reflect.Field; import java.util.Arrays; import java.util.List; diff --git a/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java b/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java index 4993ebfe5..1e0e574a9 100644 --- a/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java +++ b/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java @@ -26,7 +26,6 @@ package org.broadinstitute.sting.commandline; import org.broadinstitute.sting.utils.StingException; -import org.broadinstitute.sting.gatk.walkers.Hidden; import org.apache.log4j.Logger; import java.lang.annotation.Annotation; diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/Hidden.java b/java/src/org/broadinstitute/sting/commandline/Hidden.java similarity index 96% rename from java/src/org/broadinstitute/sting/gatk/walkers/Hidden.java rename to java/src/org/broadinstitute/sting/commandline/Hidden.java index c4e0a4efe..07dfa987e 100644 --- a/java/src/org/broadinstitute/sting/gatk/walkers/Hidden.java +++ b/java/src/org/broadinstitute/sting/commandline/Hidden.java @@ -22,7 +22,7 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -package org.broadinstitute.sting.gatk.walkers; +package org.broadinstitute.sting.commandline; import java.lang.annotation.*; diff --git a/java/src/org/broadinstitute/sting/gatk/WalkerManager.java b/java/src/org/broadinstitute/sting/gatk/WalkerManager.java index 066aebbd1..e2e32647b 100755 --- a/java/src/org/broadinstitute/sting/gatk/WalkerManager.java +++ b/java/src/org/broadinstitute/sting/gatk/WalkerManager.java @@ -26,7 +26,7 @@ package org.broadinstitute.sting.gatk; import net.sf.picard.filter.SamRecordFilter; -import org.apache.log4j.Logger; +import org.broadinstitute.sting.commandline.Hidden; import org.broadinstitute.sting.gatk.filters.FilterManager; import org.broadinstitute.sting.gatk.refdata.tracks.RMDTrack; import org.broadinstitute.sting.gatk.walkers.*; diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java b/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java index 9e6f766c1..153138768 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java @@ -28,6 +28,7 @@ package org.broadinstitute.sting.gatk.walkers.indels; import net.sf.samtools.*; import net.sf.samtools.util.StringUtil; import org.broad.tribble.util.variantcontext.VariantContext; +import org.broadinstitute.sting.commandline.Hidden; import org.broadinstitute.sting.utils.interval.IntervalMergingRule; import org.broadinstitute.sting.utils.interval.IntervalUtils; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; @@ -36,7 +37,6 @@ import org.broadinstitute.sting.gatk.refdata.utils.GATKFeature; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Reference; import org.broadinstitute.sting.gatk.walkers.Window; -import org.broadinstitute.sting.gatk.walkers.Hidden; import org.broadinstitute.sting.gatk.filters.BadMateFilter; import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.interval.IntervalFileMergingIterator; diff --git a/java/src/org/broadinstitute/sting/playground/gatk/walkers/duplicates/CountDuplicatesWalker.java b/java/src/org/broadinstitute/sting/playground/gatk/walkers/duplicates/CountDuplicatesWalker.java index d95aaffa8..041b455bb 100644 --- a/java/src/org/broadinstitute/sting/playground/gatk/walkers/duplicates/CountDuplicatesWalker.java +++ b/java/src/org/broadinstitute/sting/playground/gatk/walkers/duplicates/CountDuplicatesWalker.java @@ -51,7 +51,7 @@ class DuplicateCount { * @author mark DePristo */ public class CountDuplicatesWalker extends DuplicateWalker { - @Argument(fullName="quiet", required=false, doc="If true, per locus information isn't printex") + @Argument(fullName="quietLocus", required=false, doc="If true, per locus information isn't printed") public boolean quiet = false; /** diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java index f0816a315..e4523519e 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java @@ -47,10 +47,37 @@ public abstract class ArgumentDefinitionField extends ArgumentField { @Override protected String getExclusiveOf() { return escape(argumentDefinition.exclusiveOf); } @Override protected String getValidation() { return escape(argumentDefinition.validation); } + protected final String getShortFieldGetter() { return getFieldName(getRawShortFieldName()); } + protected final String getShortFieldSetter() { return getFieldName(getRawShortFieldName() + "_="); } + protected String getRawShortFieldName() { return argumentDefinition.shortName; } + @Override protected String getDefineAddition() { + if (argumentDefinition.shortName == null) + return ""; + else if(getShortFieldGetter().equals(getFieldName())) + return ""; + else + return String.format("%n" + + "/** %n" + + " * Short name of %1$s%n" + + " * @return Short name of %1$s%n" + + " */%n" + + "def %3$s = this.%1$s%n" + + "%n" + + "/** %n" + + " * Short name of %1$s%n" + + " * @param value Short name of %1$s%n" + + " */%n" + + "def %4$s(value: %2$s) = this.%1$s = value%n", + getFieldName(), + getFieldType(), + getShortFieldGetter(), + getShortFieldSetter()); + } + protected static final String REQUIRED_TEMPLATE = " + \" %1$s \" + %2$s.format(%3$s)"; protected static final String REPEAT_TEMPLATE = " + repeat(\" %1$s \", %3$s, format=%2$s)"; protected static final String OPTIONAL_TEMPLATE = " + optional(\" %1$s \", %3$s, format=%2$s)"; - protected static final String FLAG_TEMPLATE = " + (if (%3$s) \" %1$s \" else \"\")"; + protected static final String FLAG_TEMPLATE = " + (if (%3$s) \" %1$s\" else \"\")"; public final String getCommandLineAddition() { return String.format(getCommandLineTemplate(), getCommandLineParam(), getCommandLineFormat(), getFieldName()); @@ -169,6 +196,7 @@ public abstract class ArgumentDefinitionField extends ArgumentField { @Override protected Class getInnerType() { return String.class; } @Override protected String getRawFieldName() { return super.getRawFieldName() + "String"; } @Override protected String getFullName() { return super.getFullName() + "String"; } + @Override protected String getRawShortFieldName() { return super.getRawShortFieldName() + "String"; } @Override protected String getFieldType() { return "List[String]"; } @Override protected String getDefaultValue() { return "Nil"; } @Override public String getCommandLineTemplate() { return REPEAT_TEMPLATE; } diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java index ef7f6f729..76a147a43 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java @@ -62,7 +62,8 @@ public abstract class ArgumentField { return String.format("%n" + "/** %s */%n" + "@%s(fullName=\"%s\", shortName=\"%s\", doc=\"%s\", required=%s, exclusiveOf=\"%s\", validation=\"%s\")%n" + - "%svar %s: %s = %s%n", + "%svar %s: %s = %s%n" + + "%s", getDoc(), getAnnotationIOClass().getSimpleName(), getFullName(), @@ -71,9 +72,13 @@ public abstract class ArgumentField { isRequired(), getExclusiveOf(), getValidation(), - getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue()); + getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(), + getDefineAddition()); } + /** @return Scala code with defines to append to the argument definition. */ + protected String getDefineAddition() { return ""; } + /** @return Scala code to append to the command line. */ public abstract String getCommandLineAddition(); @@ -141,7 +146,7 @@ public abstract class ArgumentField { */ protected static String getFieldName(String rawFieldName) { String fieldName = rawFieldName; - if (!StringUtils.isAlpha(fieldName.substring(0,1))) + if (StringUtils.isNumeric(fieldName.substring(0,1))) fieldName = "_" + fieldName; if (isReserved(fieldName) || fieldName.contains("-")) fieldName = "`" + fieldName + "`"; diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java index 207da8a1f..d4339eb14 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java @@ -117,7 +117,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram { argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType)); writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, - walkerName, String.format("analysis_type = \"%s\"%n%n", walkerName), argumentFields); + walkerName, String.format("analysis_type = \"%s\"%n", walkerName), argumentFields); } } } diff --git a/java/test/org/broadinstitute/sting/playground/gatk/walkers/duplicates/DuplicatesWalkersIntegrationTest.java b/java/test/org/broadinstitute/sting/playground/gatk/walkers/duplicates/DuplicatesWalkersIntegrationTest.java index 62f53c29a..98d2cd956 100755 --- a/java/test/org/broadinstitute/sting/playground/gatk/walkers/duplicates/DuplicatesWalkersIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/playground/gatk/walkers/duplicates/DuplicatesWalkersIntegrationTest.java @@ -20,7 +20,7 @@ public class DuplicatesWalkersIntegrationTest extends WalkerTest { List result = executeTest(name, spec).getFirst(); } - @Test public void testChr110Mb() { testCounter("testChr1-10mb", "-L chr1:1-10,000,000 --quiet", "fa8bfdd0b62a13a543bae90f7c674db7"); } + @Test public void testChr110Mb() { testCounter("testChr1-10mb", "-L chr1:1-10,000,000 --quietLocus", "fa8bfdd0b62a13a543bae90f7c674db7"); } @Test public void testIntervalVerbose() { testCounter("testIntervalVerbose", "-L chr1:6,527,154-6,528,292", "1ebcc10b85af16805a54391721776657"); } public void testCombiner(String name, String args, String md51, String md52) { diff --git a/scala/qscript/UnifiedGenotyperExample.scala b/scala/qscript/UnifiedGenotyperExample.scala index 714d4a4fd..e44a3505f 100644 --- a/scala/qscript/UnifiedGenotyperExample.scala +++ b/scala/qscript/UnifiedGenotyperExample.scala @@ -38,15 +38,9 @@ class UnifiedGenotyperExample extends QScript { val vf = new VariantFiltration with UnifiedGenotyperArguments val ve = new VariantEval with UnifiedGenotyperArguments - val pr = new PrintReads with UnifiedGenotyperArguments - pr.input_file :+= bam - pr.outputBamFile = swapExt(bam, "bam", "new.bam") - pr.scatterCount = 2 - pr.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = new File("/path/to/jar") } - add(pr) - // Make sure the Sting/shell folder is in your path to use mergeText.sh and splitIntervals.sh. ug.scatterCount = 3 + ug.cleanupTempDirectories = true ug.input_file :+= bam ug.out = swapExt(bam, "bam", "unfiltered.vcf") @@ -56,8 +50,7 @@ class UnifiedGenotyperExample extends QScript { ve.rodBind :+= RodBind("vcf", "VCF", vf.out) ve.out = swapExt(bam, "bam", "eval") - //add(ug, vf, ve) + add(ug, vf, ve) } - } } diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 1e4a05cad..34e2f09bd 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -3,8 +3,8 @@ package org.broadinstitute.sting.queue import java.io.File import java.util.Arrays import org.broadinstitute.sting.queue.engine.QGraph -import org.broadinstitute.sting.commandline.{ClassType, Input, Argument, CommandLineProgram} import org.broadinstitute.sting.queue.util.{Logging, ScalaCompoundArgumentTypeDescriptor} +import org.broadinstitute.sting.commandline._ /** * Entry point of Queue. Compiles and runs QScripts passed in to the command line. @@ -20,11 +20,23 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="bsub_wait_jobs", shortName="bsubWait", doc="Wait for bsub submitted jobs before exiting", required=false) private var bsubWaitJobs = false - @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts", required=false) + @Argument(fullName="run_scripts", shortName="run", doc="Run QScripts. Without this flag set only performs a dry run.", required=false) private var run = false @Argument(fullName="dot_graph", shortName="dot", doc="Outputs the queue graph to a .dot file. See: http://en.wikipedia.org/wiki/DOT_language", required=false) - private var queueDot: File = _ + private var dotFile: File = _ + + @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) + private var expandedDotFile: File = _ + + @Argument(fullName="skip_up_to_date", shortName="skipUpToDate", doc="Does not run command line functions that don't depend on other jobs if the outputs exist and are older than the inputs.", required=false) + private var skipUpToDate = false + + @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden + private var runScripts = false + + @ArgumentCollection + private val qSettings = new QSettings /** * Takes the QScripts passed in, runs their script() methods, retrieves their generated @@ -32,9 +44,13 @@ class QCommandLine extends CommandLineProgram with Logging { */ def execute = { val qGraph = new QGraph - qGraph.dryRun = !run + qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs qGraph.bsubWaitJobs = bsubWaitJobs + qGraph.skipUpToDateJobs = skipUpToDate + qGraph.dotFile = dotFile + qGraph.expandedDotFile = expandedDotFile + qGraph.qSettings = qSettings val scripts = qScriptManager.createScripts() for (script <- scripts) { @@ -45,15 +61,9 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } - logger.info("Binding functions") - qGraph.fillIn - if (queueDot != null) { - logger.info("Generating " + queueDot) - qGraph.renderToDot(queueDot) - } - logger.info("Running generated graph") qGraph.run + logger.info("Done") 0 } diff --git a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala index 1b8a00d91..1a8fdeff9 100644 --- a/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScriptManager.scala @@ -48,7 +48,7 @@ object QScriptManager extends Logging { if (scripts.size > 0) { val settings = new Settings((error: String) => logger.error(error)) - val outdir = IOUtils.tempDir("Q-classes").getAbsoluteFile + val outdir = IOUtils.tempDir("Q-classes") settings.outdir.value = outdir.getPath // Set the classpath to the current class path. diff --git a/scala/src/org/broadinstitute/sting/queue/QSettings.scala b/scala/src/org/broadinstitute/sting/queue/QSettings.scala new file mode 100644 index 000000000..1ed32f1c7 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/QSettings.scala @@ -0,0 +1,42 @@ +package org.broadinstitute.sting.queue + +import org.broadinstitute.sting.commandline.Argument +import java.io.File +import java.lang.management.ManagementFactory + +/** + * Default settings settable on the command line and passed to CommandLineFunctions. + */ +class QSettings { + @Argument(fullName="job_name_prefix", shortName="jobPrefix", doc="Default name prefix for compute farm jobs.", required=false) + var jobNamePrefix: String = QSettings.processNamePrefix + + @Argument(fullName="job_queue", shortName="jobQueue", doc="Default queue for compute farm jobs.", required=false) + var jobQueue: String = "broad" + + @Argument(fullName="job_project", shortName="jobProject", doc="Default project for compute farm jobs.", required=false) + var jobProject: String = "Queue" + + @Argument(fullName="job_scatter_gather_directory", shortName="jobSGDir", doc="Default directory to place scatter gather output for compute farm jobs.", required=false) + var jobScatterGatherDirectory: File = _ + + @Argument(fullName="default_memory_limit", shortName="memLimit", doc="Default memory limit for jobs, in gigabytes.", required=false) + var memoryLimit: Option[Int] = None + + @Argument(fullName="runJobsIfPrecedingFail", shortName="runIfFail", doc="If this flag is set then ALL jobs will run even if the previous jobs fail.", required=false) + var runJobsIfPrecedingFail = false +} + +/** + * Default settings settable on the command line and passed to CommandLineFunctions. + */ +object QSettings { + /** A semi-unique job prefix using the host name and the process id. */ + private val processNamePrefix = "Q-" + { + var prefix = ManagementFactory.getRuntimeMXBean.getName + val index = prefix.indexOf(".") + if (index >= 0) + prefix = prefix.substring(0, index) + prefix + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala index d1d80d99b..640596de9 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/DispatchJobRunner.scala @@ -1,13 +1,14 @@ package org.broadinstitute.sting.queue.engine -import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.{CommandLineFunction, QFunction} import scala.collection.immutable.ListSet +import org.broadinstitute.sting.queue.util.IOUtils +import java.io.File /** * Dispatches jobs to a compute cluster. */ -trait DispatchJobRunner { +trait DispatchJobRunner extends JobRunner { /** Type of the job. */ type DispatchJobType /** An internal cache of all the jobs that have run by command line function. */ @@ -15,14 +16,6 @@ trait DispatchJobRunner { /** An internal list of functions that have no other dependencies. */ private var waitJobsByGraph = Map.empty[QGraph, ListSet[DispatchJobType]] - /** - * Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction - * in which case it waits for all other terminal functions to complete. - * @param function Command to run. - * @param qGraph graph that holds the job, and if this is a dry run. - */ - def dispatch(function: CommandLineFunction, qGraph: QGraph) - /** * Adds the job to the internal cache of previous jobs and removes the previous jobs that * the job was dependent on from the list of function that have no dependencies. @@ -47,22 +40,8 @@ trait DispatchJobRunner { * @param qGraph The graph that contains the jobs. * @return A list of prior jobs. */ - protected def previousJobs(function: QFunction, qGraph: QGraph) : List[DispatchJobType] = { - var previous = List.empty[DispatchJobType] - - val source = qGraph.jobGraph.getEdgeSource(function) - for (incomingEdge <- qGraph.jobGraph.incomingEdgesOf(source)) { - incomingEdge match { - - // Stop recursing when we find a job along the edge and return its job id - case dispatchFunction: CommandLineFunction => previous :+= dispatchJobs(dispatchFunction) - - // For any other type of edge find the LSF jobs preceding the edge - case qFunction: QFunction => previous ++= previousJobs(qFunction, qGraph) - } - } - previous - } + protected def previousJobs(function: CommandLineFunction, qGraph: QGraph) : List[DispatchJobType] = + qGraph.previousJobs(function).map(dispatchJobs(_)) /** * Returns a set of jobs that have no following jobs in the graph. @@ -81,7 +60,9 @@ trait DispatchJobRunner { * @return A "cd [&& cd ]" command. */ protected def mountCommand(function: CommandLineFunction) = { - val dirs = function.jobDirectories + var dirs = Set.empty[File] + for (dir <- function.jobDirectories) + dirs += IOUtils.dirLevel(dir, 2) if (dirs.size > 0) Some("\'" + dirs.mkString("cd ", " && cd ", "") + "\'") else diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala new file mode 100644 index 000000000..93ff4cc1c --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -0,0 +1,16 @@ +package org.broadinstitute.sting.queue.engine + +import org.broadinstitute.sting.queue.function.CommandLineFunction + +/** + * Base interface for job runners. + */ +trait JobRunner { + /** + * Dispatches a function to the queue and returns immediately, unless the function is a DispatchWaitFunction + * in which case it waits for all other terminal functions to complete. + * @param function Command to run. + * @param qGraph graph that holds the job, and if this is a dry run. + */ + def run(function: CommandLineFunction, qGraph: QGraph) +} \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index d49534a24..f281cb328 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -6,7 +6,7 @@ import org.broadinstitute.sting.queue.util.{IOUtils, LsfJob, Logging} /** * Runs jobs on an LSF compute cluster. */ -trait LsfJobRunner extends DispatchJobRunner with Logging { +class LsfJobRunner extends DispatchJobRunner with Logging { type DispatchJobType = LsfJob /** @@ -14,7 +14,7 @@ trait LsfJobRunner extends DispatchJobRunner with Logging { * @param function Command to run. * @param qGraph graph that holds the job, and if this is a dry run. */ - def dispatch(function: CommandLineFunction, qGraph: QGraph) = { + def run(function: CommandLineFunction, qGraph: QGraph) = { val job = new LsfJob job.name = function.jobName job.outputFile = function.jobOutputFile diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 9e1d68d86..c5e590c9e 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -4,14 +4,15 @@ import org.jgrapht.traverse.TopologicalOrderIterator import org.jgrapht.graph.SimpleDirectedGraph import scala.collection.JavaConversions import scala.collection.JavaConversions._ -import org.broadinstitute.sting.queue.function.{MappingFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction import org.broadinstitute.sting.queue.util.Logging -import org.broadinstitute.sting.queue.QException import org.jgrapht.alg.CycleDetector import org.jgrapht.EdgeFactory import org.jgrapht.ext.DOTExporter import java.io.File +import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} +import org.broadinstitute.sting.queue.{QSettings, QException} +import org.broadinstitute.sting.queue.function.{DispatchWaitFunction, MappingFunction, CommandLineFunction, QFunction} /** * The internal dependency tracker between sets of function input and output files. @@ -20,8 +21,11 @@ class QGraph extends Logging { var dryRun = true var bsubAllJobs = false var bsubWaitJobs = false - val jobGraph = newGraph - def numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size + var skipUpToDateJobs = false + var dotFile: File = _ + var expandedDotFile: File = _ + var qSettings: QSettings = _ + private val jobGraph = newGraph /** * Adds a QScript created CommandLineFunction to the graph. @@ -32,15 +36,130 @@ class QGraph extends Logging { } /** - * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs. + * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ - def fillIn = { - // clone since edgeSet is backed by the graph - for (function <- JavaConversions.asSet(jobGraph.edgeSet).clone) { - addCollectionOutputs(function.outputs) - addCollectionInputs(function.inputs) + def run = { + fill + if (dotFile != null) + renderToDot(dotFile) + var numMissingValues = validate + + if (numMissingValues == 0 && bsubAllJobs) { + logger.debug("Scatter gathering jobs.") + var scatterGathers = List.empty[ScatterGatherableFunction] + loop({ + case scatterGather: ScatterGatherableFunction if (scatterGather.scatterGatherable) => + scatterGathers :+= scatterGather + }) + + var addedFunctions = List.empty[CommandLineFunction] + for (scatterGather <- scatterGathers) { + val functions = scatterGather.generateFunctions() + if (logger.isTraceEnabled) + logger.trace("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) + addedFunctions ++= functions + } + + this.jobGraph.removeAllEdges(scatterGathers) + prune + addedFunctions.foreach(this.addFunction(_)) + + fill + val scatterGatherDotFile = if (expandedDotFile != null) expandedDotFile else dotFile + if (scatterGatherDotFile != null) + renderToDot(scatterGatherDotFile) + numMissingValues = validate } + val isReady = numMissingValues == 0 + + if (isReady || this.dryRun) + runJobs + + if (numMissingValues > 0) { + logger.error("Total missing values: " + numMissingValues) + } + + if (isReady && this.dryRun) { + logger.info("Dry run completed successfully!") + logger.info("Re-run with \"-run\" to execute the functions.") + } + } + + /** + * Walks up the graph looking for the previous LsfJobs. + * @param function Function to examine for a previous command line job. + * @param qGraph The graph that contains the jobs. + * @return A list of prior jobs. + */ + def previousJobs(function: QFunction) : List[CommandLineFunction] = { + var previous = List.empty[CommandLineFunction] + + val source = this.jobGraph.getEdgeSource(function) + for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { + incomingEdge match { + + // Stop recursing when we find a job along the edge and return its job id + case commandLineFunction: CommandLineFunction => previous :+= commandLineFunction + + // For any other type of edge find the LSF jobs preceding the edge + case qFunction: QFunction => previous ++= previousJobs(qFunction) + } + } + previous + } + + /** + * Fills in the graph using mapping functions, then removes out of date + * jobs, then cleans up mapping functions and nodes that aren't need. + */ + private def fill = { + fillIn + if (skipUpToDateJobs) + removeUpToDate + prune + } + + /** + * Looks through functions with multiple inputs and outputs and adds mapping functions for single inputs and outputs. + */ + private def fillIn = { + // clone since edgeSet is backed by the graph + JavaConversions.asSet(jobGraph.edgeSet).clone.foreach { + case cmd: CommandLineFunction => { + addCollectionOutputs(cmd.outputs) + addCollectionInputs(cmd.inputs) + } + case map: MappingFunction => /* do nothing for mapping functions */ + } + } + + /** + * Removes functions that are up to date. + */ + private def removeUpToDate = { + var upToDateJobs = Set.empty[CommandLineFunction] + loop({ + case f if (upToDate(f, upToDateJobs)) => { + logger.info("Skipping command because it is up to date: %n%s".format(f.commandLine)) + upToDateJobs += f + } + }) + for (upToDateJob <- upToDateJobs) + jobGraph.removeEdge(upToDateJob) + } + + /** + * Returns true if the all previous functions in the graph are up to date, and the function is up to date. + */ + private def upToDate(commandLineFunction: CommandLineFunction, upToDateJobs: Set[CommandLineFunction]) = { + this.previousJobs(commandLineFunction).forall(upToDateJobs.contains(_)) && commandLineFunction.upToDate + } + + /** + * Removes mapping edges that aren't being used, and nodes that don't belong to anything. + */ + private def prune = { var pruning = true while (pruning) { pruning = false @@ -55,27 +174,21 @@ class QGraph extends Logging { } /** - * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. + * Validates that the functions in the graph have no missing values and that there are no cycles. + * @return Number of missing values. */ - def run = { - var isReady = true - var totalMissingValues = 0 - for (function <- JavaConversions.asSet(jobGraph.edgeSet)) { - function match { - case cmd: CommandLineFunction => - val missingFieldValues = cmd.missingFields - if (missingFieldValues.size > 0) { - totalMissingValues += missingFieldValues.size - logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine)) - for (missing <- missingFieldValues) - logger.error(" " + missing) - } - case _ => - } - } - - if (totalMissingValues > 0) { - isReady = false + private def validate = { + var numMissingValues = 0 + JavaConversions.asSet(jobGraph.edgeSet).foreach { + case cmd: CommandLineFunction => + val missingFieldValues = cmd.missingFields + if (missingFieldValues.size > 0) { + numMissingValues += missingFieldValues.size + logger.error("Missing %s values for function: %s".format(missingFieldValues.size, cmd.commandLine)) + for (missing <- missingFieldValues) + logger.error(" " + missing) + } + case map: MappingFunction => /* do nothing for mapping functions */ } val detector = new CycleDetector(jobGraph) @@ -83,19 +196,46 @@ class QGraph extends Logging { logger.error("Cycles were detected in the graph:") for (cycle <- detector.findCycles) logger.error(" " + cycle) - isReady = false + throw new QException("Cycles were detected in the graph.") } - if (isReady || this.dryRun) - (new TopologicalJobScheduler(this) with LsfJobRunner).runJobs + numMissingValues + } - if (totalMissingValues > 0) { - logger.error("Total missing values: " + totalMissingValues) + /** + * Runs the jobs by traversing the graph. + */ + private def runJobs = { + val runner = if (bsubAllJobs) new LsfJobRunner else new ShellJobRunner + + val numJobs = JavaConversions.asSet(jobGraph.edgeSet).filter(_.isInstanceOf[CommandLineFunction]).size + + logger.info("Number of jobs: %s".format(numJobs)) + if (logger.isTraceEnabled) { + val numNodes = jobGraph.vertexSet.size + logger.trace("Number of nodes: %s".format(numNodes)) } + var numNodes = 0 - if (isReady && this.dryRun) { - logger.info("Dry run completed successfully!") - logger.info("Re-run with \"-run\" to execute the functions.") + loop( + edgeFunction = { case f => runner.run(f, this) }, + nodeFunction = { + case node => { + if (logger.isTraceEnabled) + logger.trace("Visiting: " + node) + numNodes += 1 + } + }) + + if (logger.isTraceEnabled) + logger.trace("Done walking %s nodes.".format(numNodes)) + + if (bsubAllJobs && bsubWaitJobs) { + logger.info("Waiting for jobs to complete.") + val wait = new DispatchWaitFunction + wait.qSettings = this.qSettings + wait.freeze + runner.run(wait, this) } } @@ -108,35 +248,29 @@ class QGraph extends Logging { /** * Adds a generic QFunction to the graph. - * If the function is scatterable and the jobs request bsub, splits the job into parts and adds the parts instead. * @param f Generic QFunction to add to the graph. */ private def addFunction(f: QFunction): Unit = { try { - f.freeze - f match { - case scatterGather: ScatterGatherableFunction if (bsubAllJobs && scatterGather.scatterGatherable) => - val functions = scatterGather.generateFunctions() - if (logger.isTraceEnabled) - logger.trace("Scattered into %d parts: %s".format(functions.size, functions)) - functions.foreach(addFunction(_)) - case _ => - val inputs = QNode(f.inputs) - val outputs = QNode(f.outputs) - val newSource = jobGraph.addVertex(inputs) - val newTarget = jobGraph.addVertex(outputs) - val removedEdges = jobGraph.removeAllEdges(inputs, outputs) - val added = jobGraph.addEdge(inputs, outputs, f) - if (logger.isTraceEnabled) { - logger.trace("Mapped from: " + inputs) - logger.trace("Mapped to: " + outputs) - logger.trace("Mapped via: " + f) - logger.trace("Removed edges: " + removedEdges) - logger.trace("New source?: " + newSource) - logger.trace("New target?: " + newTarget) - logger.trace("") - } + case cmd: CommandLineFunction => cmd.qSettings = this.qSettings + case map: MappingFunction => /* do nothing for mapping functions */ + } + f.freeze + val inputs = QNode(f.inputs) + val outputs = QNode(f.outputs) + val newSource = jobGraph.addVertex(inputs) + val newTarget = jobGraph.addVertex(outputs) + val removedEdges = jobGraph.removeAllEdges(inputs, outputs) + val added = jobGraph.addEdge(inputs, outputs, f) + if (logger.isTraceEnabled) { + logger.trace("Mapped from: " + inputs) + logger.trace("Mapped to: " + outputs) + logger.trace("Mapped via: " + f) + logger.trace("Removed edges: " + removedEdges) + logger.trace("New source?: " + newSource) + logger.trace("New target?: " + newTarget) + logger.trace("") } } catch { case e: Exception => @@ -209,12 +343,28 @@ class QGraph extends Logging { private def isOrphan(node: QNode) = (jobGraph.incomingEdgesOf(node).size + jobGraph.outgoingEdgesOf(node).size) == 0 + /** + * Utility function for looping over the internal graph and running functions. + * @param edgeFunction Optional function to run for each edge visited. + * @param nodeFunction Optional function to run for each node visited. + */ + private def loop(edgeFunction: PartialFunction[CommandLineFunction, Unit] = null, nodeFunction: PartialFunction[QNode, Unit] = null) = { + val iterator = new TopologicalOrderIterator(this.jobGraph) + iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { + override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { + case cmd: CommandLineFunction => if (edgeFunction != null && edgeFunction.isDefinedAt(cmd)) edgeFunction(cmd) + case map: MappingFunction => /* do nothing for mapping functions */ + } + }) + iterator.foreach(node => if (nodeFunction != null && nodeFunction.isDefinedAt(node)) nodeFunction(node)) + } + /** * Outputs the graph to a .dot file. * http://en.wikipedia.org/wiki/DOT_language * @param file Path to output the .dot file. */ - def renderToDot(file: java.io.File) = { + private def renderToDot(file: java.io.File) = { val out = new java.io.FileWriter(file) // todo -- we need a nice way to visualize the key pieces of information about commands. Perhaps a diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index abffa3c08..834ae64a2 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -6,7 +6,7 @@ import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs one at a time locally */ -trait ShellJobRunner extends Logging { +class ShellJobRunner extends JobRunner with Logging { /** * Runs the function on the local shell. * @param function Command to run. diff --git a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala b/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala deleted file mode 100755 index 23a69846f..000000000 --- a/scala/src/org/broadinstitute/sting/queue/engine/TopologicalJobScheduler.scala +++ /dev/null @@ -1,54 +0,0 @@ -package org.broadinstitute.sting.queue.engine - -import org.jgrapht.traverse.TopologicalOrderIterator -import org.jgrapht.event.{EdgeTraversalEvent, TraversalListenerAdapter} -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.util.Logging -import org.broadinstitute.sting.queue.function._ - -/** - * Loops over the job graph running jobs as the edges are traversed. - * @param val The graph that contains the jobs to be run. - */ -abstract class TopologicalJobScheduler(private val qGraph: QGraph) - extends ShellJobRunner with DispatchJobRunner with Logging { - - protected val iterator = new TopologicalOrderIterator(qGraph.jobGraph) - - iterator.addTraversalListener(new TraversalListenerAdapter[QNode, QFunction] { - /** - * As each edge is traversed, either dispatch the job or run it locally. - * @param event Event holding the edge that was passed. - */ - override def edgeTraversed(event: EdgeTraversalEvent[QNode, QFunction]) = event.getEdge match { - case f: CommandLineFunction if (qGraph.bsubAllJobs) => dispatch(f, qGraph) - case f: CommandLineFunction => run(f, qGraph) - case f: MappingFunction => /* do nothing for mapping functions */ - } - }) - - /** - * Runs the jobs by traversing the graph. - */ - def runJobs = { - logger.info("Number of jobs: %s".format(qGraph.numJobs)) - if (logger.isTraceEnabled) - logger.trace("Number of nodes: %s".format(qGraph.jobGraph.vertexSet.size)) - var numNodes = 0 - for (target <- iterator) { - if (logger.isTraceEnabled) - logger.trace("Visiting: " + target) - numNodes += 1 - // Do nothing for now, let event handler respond - } - if (logger.isTraceEnabled) - logger.trace("Done walking %s nodes.".format(numNodes)) - - if (qGraph.bsubAllJobs && qGraph.bsubWaitJobs) { - logger.info("Waiting for jobs to complete.") - val wait = new DispatchWaitFunction - wait.freeze - dispatch(wait, qGraph) - } - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 847103f10..4a97c5a49 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -6,8 +6,7 @@ import org.broadinstitute.sting.commandline._ import java.io.File import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather} -import java.lang.management.ManagementFactory -import org.broadinstitute.sting.queue.QException +import org.broadinstitute.sting.queue.{QSettings, QException} /** * A command line that will be run in a pipeline. @@ -15,6 +14,9 @@ import org.broadinstitute.sting.queue.QException trait CommandLineFunction extends QFunction with Logging { def commandLine: String + /** Default settings */ + var qSettings: QSettings = _ + /** Upper memory limit */ var memoryLimit: Option[Int] = None @@ -25,16 +27,16 @@ trait CommandLineFunction extends QFunction with Logging { var commandDirectory: File = IOUtils.CURRENT_DIR /** Prefix for automatic job name creation */ - var jobNamePrefix: String = CommandLineFunction.processNamePrefix + var jobNamePrefix: String = _ /** The name name of the job */ var jobName: String = _ /** Job project to run the command */ - var jobProject = "Queue" + var jobProject: String = _ /** Job queue to run the command */ - var jobQueue = "broad" + var jobQueue: String = _ /** Temporary directory to write any files */ var jobTempDir: File = new File(System.getProperty("java.io.tmpdir")) @@ -99,6 +101,21 @@ trait CommandLineFunction extends QFunction with Logging { files } + /** + * Returns true if all outputs already exist and are older that the inputs. + * If there are no outputs then returns false. + * @return true if all outputs already exist and are older that the inputs. + */ + def upToDate = { + val inputFiles = inputs + val outputFiles = outputs.filterNot(file => (file == jobOutputFile || file == jobErrorFile)) + if (outputFiles.size > 0 && outputFiles.forall(_.exists)) { + val maxInput = inputFiles.foldLeft(Long.MinValue)((date, file) => date.max(file.lastModified)) + val minOutput = outputFiles.foldLeft(Long.MaxValue)((date, file) => date.min(file.lastModified)) + maxInput < minOutput + } else false + } + /** * Gets the files from the field. The field must be a File, a FileProvider, or a List or Set of either. * @param fields Field to get files. @@ -177,6 +194,21 @@ trait CommandLineFunction extends QFunction with Logging { * Sets all field values. */ def freezeFieldValues = { + if (jobNamePrefix == null) + jobNamePrefix = qSettings.jobNamePrefix + + if (jobQueue == null) + jobQueue = qSettings.jobQueue + + if (jobProject == null) + jobProject = qSettings.jobProject + + if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined) + memoryLimit = qSettings.memoryLimit + + if (qSettings.runJobsIfPrecedingFail) + jobRunOnlyIfPreviousSucceed = false + if (jobName == null) jobName = CommandLineFunction.nextJobName(jobNamePrefix) @@ -202,7 +234,7 @@ trait CommandLineFunction extends QFunction with Logging { * Set value to a uniform value across functions. * Base implementation changes any relative path to an absolute path. * @param value to be updated - * @returns the modified value, or a copy if the value is immutable + * @return the modified value, or a copy if the value is immutable */ protected def canon(value: Any) = { value match { @@ -276,20 +308,8 @@ trait CommandLineFunction extends QFunction with Logging { * Scala sugar type for checking annotation required and exclusiveOf. */ private type ArgumentAnnotation = { - /** - * Returns true if the field is required. - * @return true if the field is required. - */ def required(): Boolean - /** - * Returns the comma separated list of fields that may be set instead of this field. - * @return the comma separated list of fields that may be set instead of this field. - */ def exclusiveOf(): String - /** - * Returns the documentation for this field. - * @return the documentation for this field. - */ def doc(): String } @@ -378,15 +398,6 @@ trait CommandLineFunction extends QFunction with Logging { * A command line that will be run in a pipeline. */ object CommandLineFunction { - /** A semi-unique job prefix using the host name and the process id. */ - private val processNamePrefix = "Q-" + { - var prefix = ManagementFactory.getRuntimeMXBean.getName - val index = prefix.indexOf(".") - if (index >= 0) - prefix = prefix.substring(0, index) - prefix - } - /** Job index counter for this run of Queue. */ private var jobIndex = 0 diff --git a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala index a1d28df21..c43567809 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/MappingFunction.scala @@ -9,7 +9,7 @@ import java.io.File class MappingFunction(val inputs: Set[File], val outputs: Set[File]) extends QFunction { /** * For debugging purposes returns . - * @returns + * @return */ override def toString = "" } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala index cd6b9bf38..7b8e49501 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala @@ -20,5 +20,7 @@ class CleanupTempDirsFunction extends CommandLineFunction { @Argument(doc="rmdir script or command") var rmdirScript = "rm -rf" + override def upToDate = tempDirectories.forall(!_.exists) + def commandLine = "%s%s".format(rmdirScript, repeat(" '", tempDirectories, "'")) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala index 67161169b..2b2ae420f 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala @@ -20,6 +20,8 @@ class CreateTempDirsFunction extends CommandLineFunction { @Argument(doc="mkdir script or command") var mkdirScript = "mkdir -pv" + override def upToDate = tempDirectories.forall(_.exists) + def commandLine = "%s%s".format(mkdirScript, repeat(" '", tempDirectories, "'")) /** diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index aa54f5672..b7c787476 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -154,8 +154,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ override def freezeFieldValues = { super.freezeFieldValues - if (this.scatterGatherDirectory == null) - this.scatterGatherDirectory = this.commandDirectory + + if (this.scatterGatherDirectory == null) { + this.scatterGatherDirectory = qSettings.jobScatterGatherDirectory + if (this.scatterGatherDirectory == null) + this.scatterGatherDirectory = this.commandDirectory + } } /** @@ -177,7 +181,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the CreateTempDirsFunction that will create the temporary directories. - * The initializeFunction jobNamePrefix is set so that the CreateTempDirsFunction runs with the same prefix as this ScatterGatherableFunction. + * The initializeFunction qSettings is set so that the CreateTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction. * The initializeFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction. * The initializeFunction is modified to become dependent on the input files for this ScatterGatherableFunction. * Calls setupInitializeFunction with initializeFunction. @@ -185,7 +189,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param inputFields The input fields that the original function was dependent on. */ protected def initInitializeFunction(initializeFunction: CreateTempDirsFunction, inputFields: List[ArgumentSource]) = { - initializeFunction.jobNamePrefix = this.jobNamePrefix + initializeFunction.qSettings = this.qSettings initializeFunction.commandDirectory = this.commandDirectory for (inputField <- inputFields) initializeFunction.originalInputs ++= this.getFieldFiles(inputField) @@ -209,7 +213,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories. - * The scatterFunction jobNamePrefix is set so that the ScatterFunction runs with the same prefix as this ScatterGatherableFunction. + * The scatterFunction qSettings is set so that the ScatterFunction runs with the same prefix, etc. as this ScatterGatherableFunction. * The scatterFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory. * The scatterFunction has it's originalInput set with the file to be scattered into scatterCount pieces. * Calls scatterFunction.setOriginalFunction with this ScatterGatherableFunction. @@ -218,7 +222,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param scatterField The input field being scattered. */ protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = { - scatterFunction.jobNamePrefix = this.jobNamePrefix + scatterFunction.qSettings = this.qSettings scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) scatterFunction.originalInput = this.getFieldFile(scatterField) scatterFunction.setOriginalFunction(this, scatterField) @@ -245,7 +249,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the GatherFunction created by newGatherFunction() that will collect the gather pieces in the temporary directories. - * The gatherFunction jobNamePrefix is set so that the GatherFunction runs with the same prefix as this ScatterGatherableFunction. + * The gatherFunction qSettings is set so that the GatherFunction runs with the same prefix, etc. as this ScatterGatherableFunction. * The gatherFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory. * The gatherFunction has it's originalOutput set with the file to be gathered from the scatterCount pieces. * Calls the gatherFunction.setOriginalFunction with this ScatterGatherableFunction. @@ -254,7 +258,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param gatherField The output field being gathered. */ protected def initGatherFunction(gatherFunction: GatherFunction, gatherField: ArgumentSource) = { - gatherFunction.jobNamePrefix = this.jobNamePrefix + gatherFunction.qSettings = this.qSettings gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) gatherFunction.originalOutput = this.getFieldFile(gatherField) gatherFunction.setOriginalFunction(this, gatherField) @@ -332,7 +336,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the CleanupTempDirsFunction created by newCleanupFunction() that will remove the temporary directories. - * The cleanupFunction jobNamePrefix is set so that the CleanupTempDirsFunction runs with the same prefix as this ScatterGatherableFunction. + * The cleanupFunction qSettings is set so that the CleanupTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction. * The cleanupFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction. * The initializeFunction is modified to become dependent on the output files for this ScatterGatherableFunction. * Calls setupCleanupFunction with cleanupFunction. @@ -341,7 +345,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param outputFields The output fields that the original function was dependent on. */ protected def initCleanupFunction(cleanupFunction: CleanupTempDirsFunction, gatherFunctions: Map[ArgumentSource, GatherFunction], outputFields: List[ArgumentSource]) = { - cleanupFunction.jobNamePrefix = this.jobNamePrefix + cleanupFunction.qSettings = this.qSettings cleanupFunction.commandDirectory = this.commandDirectory for (gatherField <- outputFields) cleanupFunction.originalOutputs += gatherFunctions(gatherField).originalOutput diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index 4a9fa8f4a..8ba707e1b 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -9,7 +9,6 @@ object IOUtils { /** The current directory "." */ val CURRENT_DIR = new File(".") - /** * Returns the sub path rooted at the parent. * If the sub path is already absolute, returns the sub path. @@ -21,27 +20,30 @@ object IOUtils { * @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path. */ def subDir(dir: File, path: String): File = - subDir(dir.getAbsoluteFile, new File(path)) + subDir(dir, new File(path)) /** * Returns the sub path rooted at the parent. * If the sub path is already absolute, returns the sub path. * If the parent is the current directory, returns the sub path. - * If the sub bath is the current directory, returns the parent. + * If the sub path is the current directory, returns the parent. * Else returns new File(parent, subPath) * @param parent The parent directory * @param file The sub path to append to the parent, if the path is not absolute. * @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path. */ def subDir(parent: File, file: File): File = { - if (parent == CURRENT_DIR && file == CURRENT_DIR) - CURRENT_DIR.getCanonicalFile.getAbsoluteFile - else if (parent == CURRENT_DIR || file.isAbsolute) - file.getAbsoluteFile - else if (file == CURRENT_DIR) - parent.getAbsoluteFile + val parentAbs = absolute(parent) + val fileAbs = absolute(file) + val currentAbs = absolute(CURRENT_DIR) + if (parentAbs == currentAbs && fileAbs == currentAbs) + absolute(CURRENT_DIR.getCanonicalFile) + else if (parentAbs == currentAbs || file.isAbsolute) + fileAbs + else if (fileAbs == currentAbs) + parentAbs else - new File(parent, file.getPath).getAbsoluteFile + absolute(new File(parentAbs, file.getPath)) } /** @@ -50,7 +52,7 @@ object IOUtils { * @param file Path to the file to be re-rooted. * @return Absolute path to the new file. */ - def resetParent(dir: File, file: File) = subDir(dir.getAbsoluteFile, file.getName).getAbsoluteFile + def resetParent(dir: File, file: File) = absolute(subDir(dir, file.getName)) /** * Creates a scatterGatherTempDir directory with the prefix and optional suffix. @@ -65,6 +67,61 @@ object IOUtils { throw new IOException("Could not delete sub file: " + temp.getAbsolutePath()) if(!temp.mkdir) throw new IOException("Could not create sub directory: " + temp.getAbsolutePath()) - temp + absolute(temp) + } + + /** + * Returns the directory at the number of levels deep. + * For example 2 levels of /path/to/dir will return /path/to + * @param dir Directory path. + * @param level how many levels deep from the root. + * @return The path to the parent directory that is level-levels deep. + */ + def dirLevel(dir: File, level: Int): File = { + var directories = List.empty[File] + var parentDir = absolute(dir) + while (parentDir != null) { + directories +:= parentDir + parentDir = parentDir.getParentFile + } + if (directories.size <= level) + directories.last + else + directories(level) + } + + def absolute(file: File) = { + var fileAbs = file.getAbsoluteFile + var names = List.empty[String] + while (fileAbs != null) { + val name = fileAbs.getName + fileAbs = fileAbs.getParentFile + + if (name == ".") { + /* skip */ + + /* TODO: What do we do for ".."? + } else if (name == "..") { + + CentOS tcsh says use getCanonicalFile: + ~ $ mkdir -p test1/test2 + ~ $ ln -s test1/test2 test3 + ~ $ cd test3/.. + ~/test1 $ + + Mac bash says keep going with getAbsoluteFile: + ~ $ mkdir -p test1/test2 + ~ $ ln -s test1/test2 test3 + ~ $ cd test3/.. + ~ $ + + For now, leave it and let the shell figure it out. + */ + } else { + names +:= name + } + } + + new File(names.mkString("/", "/", "")) } } diff --git a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala new file mode 100644 index 000000000..255ebe79b --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala @@ -0,0 +1,102 @@ +package org.broadinstitute.sting.queue.util + +import org.broadinstitute.sting.BaseTest +import org.junit.{Assert, Test} +import java.io.File + +class IOUtilsUnitTest extends BaseTest { + @Test + def testAbsoluteSubDir = { + var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/to/file")) + Assert.assertEquals(new File("/path/to/file"), subDir) + + subDir = IOUtils.subDir(new File("/different/path"), new File("/path/to/file")) + Assert.assertEquals(new File("/path/to/file"), subDir) + + subDir = IOUtils.subDir(new File("/different/path"), IOUtils.CURRENT_DIR) + Assert.assertEquals(new File("/different/path"), subDir) + } + + @Test + def testRelativeSubDir = { + var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("path/to/file")) + Assert.assertEquals(new File("path/to/file").getCanonicalFile, subDir.getCanonicalFile) + + subDir = IOUtils.subDir(new File("/different/path"), new File("path/to/file")) + Assert.assertEquals(new File("/different/path/path/to/file"), subDir) + } + + @Test + def testDottedSubDir = { + var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("path/../to/file")) + Assert.assertEquals(new File("path/../to/./file").getCanonicalFile, subDir.getCanonicalFile) + + subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/../to/file")) + Assert.assertEquals(new File("/path/../to/file"), subDir) + + subDir = IOUtils.subDir(new File("/different/../path"), new File("path/to/file")) + Assert.assertEquals(new File("/different/../path/path/to/file"), subDir) + + subDir = IOUtils.subDir(new File("/different/./path"), new File("/path/../to/file")) + Assert.assertEquals(new File("/path/../to/file"), subDir) + } + + @Test + def testResetParent = { + val newFile = IOUtils.resetParent(new File("/new/parent/dir"), new File("/old/parent_dir/file.name")) + Assert.assertEquals(new File("/new/parent/dir/file.name"), newFile) + } + + @Test + def testTempDir = { + val tempDir = IOUtils.tempDir("Q-Unit-Test") + Assert.assertTrue(tempDir.exists) + Assert.assertFalse(tempDir.isFile) + Assert.assertTrue(tempDir.isDirectory) + val deleted = tempDir.delete + Assert.assertTrue(deleted) + Assert.assertFalse(tempDir.exists) + } + + @Test + def testDirLevel = { + var dir = IOUtils.dirLevel(new File("/path/to/directory"), 1) + Assert.assertEquals(new File("/path"), dir) + + dir = IOUtils.dirLevel(new File("/path/to/directory"), 2) + Assert.assertEquals(new File("/path/to"), dir) + + dir = IOUtils.dirLevel(new File("/path/to/directory"), 3) + Assert.assertEquals(new File("/path/to/directory"), dir) + + dir = IOUtils.dirLevel(new File("/path/to/directory"), 4) + Assert.assertEquals(new File("/path/to/directory"), dir) + } + + @Test + def testAbsolute = { + var dir = IOUtils.absolute(new File("/path/./to/./directory/.")) + Assert.assertEquals(new File("/path/to/directory"), dir) + + dir = IOUtils.absolute(new File("/")) + Assert.assertEquals(new File("/"), dir) + + dir = IOUtils.absolute(new File("/.")) + Assert.assertEquals(new File("/"), dir) + + dir = IOUtils.absolute(new File("/././.")) + Assert.assertEquals(new File("/"), dir) + + dir = IOUtils.absolute(new File("/./directory/.")) + Assert.assertEquals(new File("/directory"), dir) + + dir = IOUtils.absolute(new File("/./directory/./")) + Assert.assertEquals(new File("/directory"), dir) + + dir = IOUtils.absolute(new File("/./directory./")) + Assert.assertEquals(new File("/directory."), dir) + + dir = IOUtils.absolute(new File("/./.directory/")) + Assert.assertEquals(new File("/.directory"), dir) + } +}