From 8211cee0b2fdb189270d10e24d28213c82f8564a Mon Sep 17 00:00:00 2001 From: kshakir Date: Tue, 26 Oct 2010 19:49:08 +0000 Subject: [PATCH] Queue UI Improvements: - Forcing user to set the temp directory via -Djava.io.tmpdir to avoid filling up /tmp. - By default deleting job outputs tagged as intermediate. - Defaulting pipeline to scatter count 1 (no reads deleted). - Cleaning up temp classes even when scripting fails. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4573 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/utils/exceptions/UserException.java | 2 +- scala/qscript/fullCallingPipeline.q | 16 +++---- .../sting/queue/QCommandLine.scala | 23 +++++----- .../sting/queue/engine/QGraph.scala | 5 ++- .../sting/queue/engine/QGraphSettings.scala | 4 +- .../sting/queue/util/IOUtils.scala | 45 +++++++------------ .../sting/queue/util/IOUtilsUnitTest.scala | 23 ++++++++++ 7 files changed, 65 insertions(+), 53 deletions(-) diff --git a/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java b/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java index 146bf75bd..ac67bd94b 100755 --- a/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java +++ b/java/src/org/broadinstitute/sting/utils/exceptions/UserException.java @@ -73,7 +73,7 @@ public class UserException extends ReviewedStingException { public static class BadTmpDir extends UserException { public BadTmpDir(String message) { - super(String.format("Failure working with the tmp directory %s. Override with -Djava.io.tmpdir=X on the command line to a bigger/better file system. Exact error was %s", System.getProperties().get("java.io. tmpdir"), message)); + super(String.format("Failure working with the tmp directory %s. Override with -Djava.io.tmpdir=X on the command line to a bigger/better file system. Exact error was %s", System.getProperties().get("java.io.tmpdir"), message)); } } diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index cc51cd09c..6f98e0547 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -36,10 +36,10 @@ class fullCallingPipeline extends QScript { @Input(doc="per-sample downsampling level",shortName="dcov",required=false) var downsampling_coverage = 300 - @Input(doc="level of parallelism for IndelRealigner. By default uses number of contigs.", shortName="cleanerScatter", required=false) - var num_cleaner_scatter_jobs: Option[Int] = None + @Input(doc="level of parallelism for IndelRealigner. By default is set to 1.", shortName="cleanerScatter", required=false) + var num_cleaner_scatter_jobs = 1 - @Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false) + @Input(doc="level of parallelism for UnifiedGenotyper. By default is set to 20.", shortName="snpScatter", required=false) var num_snp_scatter_jobs = 20 //@Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false) @@ -92,7 +92,8 @@ class fullCallingPipeline extends QScript { // get contigs (needed for indel cleaning parallelism) val contigs = IntervalScatterFunction.distinctContigs( - qscript.pipeline.getProject.getReferenceFile) + qscript.pipeline.getProject.getReferenceFile, + List(qscript.pipeline.getProject.getIntervalList.getAbsolutePath)) for ( sample <- recalibratedSamples ) { val sampleId = sample.getId @@ -122,12 +123,7 @@ class fullCallingPipeline extends QScript { realigner.targetIntervals = targetCreator.out realigner.intervals = Nil realigner.intervalsString = Nil - realigner.scatterCount = { - if (num_cleaner_scatter_jobs.isDefined) - num_cleaner_scatter_jobs.get min contigs.size - else - contigs.size - } + realigner.scatterCount = num_cleaner_scatter_jobs min contigs.size // if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates if (realigner.scatterCount > 1) { diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index e10d5176e..35674afa3 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -23,7 +23,7 @@ class QCommandLine extends CommandLineProgram with Logging { */ def execute = { - val qGraph = new QGraph + val qGraph = QCommandLine.qGraph qGraph.settings = settings qGraph.debugMode = debugMode == true @@ -36,15 +36,6 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } - Runtime.getRuntime.addShutdownHook(new Thread { - /** Cleanup as the JVM shuts down. */ - override def run = { - qGraph.shutdown() - ProcessController.shutdown() - QScriptManager.deleteOutdir() - } - }) - qGraph.run if (qGraph.hasFailed) { @@ -98,11 +89,23 @@ class QCommandLine extends CommandLineProgram with Logging { * Entry point of Queue. Compiles and runs QScripts passed in to the command line. */ object QCommandLine { + private val qGraph = new QGraph + + /** * Main. * @param argv Arguments. */ def main(argv: Array[String]) { + Runtime.getRuntime.addShutdownHook(new Thread { + /** Cleanup as the JVM shuts down. */ + override def run = { + qGraph.shutdown() + ProcessController.shutdown() + QScriptManager.deleteOutdir() + } + }) + try { CommandLineProgram.start(new QCommandLine, argv); if (CommandLineProgram.result != 0) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 59f181da6..5479ef535 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -12,8 +12,8 @@ import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent} import org.broadinstitute.sting.queue.QException import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} -import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging} import org.apache.commons.lang.StringUtils +import org.broadinstitute.sting.queue.util._ /** * The internal dependency tracker between sets of function input and output files. @@ -46,6 +46,7 @@ class QGraph extends Logging { * Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph. */ def run = { + IOUtils.checkTempDir val numMissingValues = fillGraph val isReady = numMissingValues == 0 @@ -698,7 +699,7 @@ class QGraph extends Logging { } private def deleteIntermediateOutputs() = { - if (settings.deleteIntermediates && !hasFailed) { + if (!settings.keepIntermediates && !hasFailed) { logger.info("Deleting intermediate files.") traverseFunctions(edge => { if (edge.function.isIntermediate) { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index d2389eef3..0056e1a2b 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -36,8 +36,8 @@ class QGraphSettings { @Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false) var statusEmailTo: List[String] = Nil - @Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false) - var deleteIntermediates = false + @Argument(fullName="keep_intermediate_outputs", shortName="keepIntermediates", doc="After a successful run keep the outputs of any Function marked as intermediate.", required=false) + var keepIntermediates = false @Argument(fullName="retry_failed", shortName="retry", doc="Retry the specified number of times after a command fails. Defaults to no retries.", required=false) var retries = 0 diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index 56c6133a1..fcd8a3d0f 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -1,7 +1,8 @@ package org.broadinstitute.sting.queue.util import org.apache.commons.io.FileUtils -import java.io.{FileReader, IOException, File} +import java.io.{FileReader, File} +import org.broadinstitute.sting.utils.exceptions.UserException /** * A collection of utilities for modifying java.io. @@ -48,21 +49,23 @@ object IOUtils { absolute(new File(parentAbs, file.getPath)) } + def checkTempDir = { + val javaTemp = System.getProperty("java.io.tmpdir") + // Keeps the user from leaving the temp directory as the default, and on Macs from having pluses + // in the path which can cause problems with the Google Reflections library. + // see also: http://benjchristensen.com/2009/09/22/mac-osx-10-6-java-java-io-tmpdir/ + if (javaTemp.startsWith("/var/folders/") || (javaTemp == "/tmp") || (javaTemp == "/tmp/")) + throw new UserException.BadTmpDir("java.io.tmpdir must be explicitly set") + } + /** * Returns the temp directory as defined by java. * @return the temp directory as defined by java. */ def javaTempDir() = { - var javaTemp = System.getProperty("java.io.tmpdir") - // Keep the temp directory from having pluses in it, which can cause problems with the Google Reflections library. - // see also: http://benjchristensen.com/2009/09/22/mac-osx-10-6-java-java-io-tmpdir/ - if (javaTemp.startsWith("/var/folders/")) { - javaTemp = "/tmp/" - System.setProperty("java.io.tmpdir", javaTemp) - } - val tempDir = new File(javaTemp) + val tempDir = new File(System.getProperty("java.io.tmpdir")) if (!tempDir.exists && !tempDir.mkdirs) - throw new IOException("Could not create directory: " + tempDir.getAbsolutePath()) + throw new UserException.BadTmpDir("Could not create directory: " + tempDir.getAbsolutePath()) absolute(tempDir) } @@ -71,36 +74,22 @@ object IOUtils { * @param prefix Prefix for the directory name. * @param suffix Optional suffix for the directory name. Defaults to "". * @return The created temporary directory. - * @throws IOException if the directory could not be created. */ def tempDir(prefix: String, suffix: String = "") = { val tempDirParent = javaTempDir() if (!tempDirParent.exists && !tempDirParent.mkdirs) - throw new IOException("Could not create temp directory: " + tempDirParent) + throw new UserException.BadTmpDir("Could not create temp directory: " + tempDirParent) val temp = File.createTempFile(prefix + "-", suffix) if (!temp.delete) - throw new IOException("Could not delete sub file: " + temp.getAbsolutePath()) + throw new UserException.BadTmpDir("Could not delete sub file: " + temp.getAbsolutePath()) if (!temp.mkdir) - throw new IOException("Could not create sub directory: " + temp.getAbsolutePath()) + throw new UserException.BadTmpDir("Could not create sub directory: " + temp.getAbsolutePath()) absolute(temp) } - /** - * Returns a temp directory that should be accessible from any network location. - * @return a temp directory that should be accessible from any network location. - */ - def networkTempDir() = { - var tempDir = javaTempDir() - if (tempDir.getAbsolutePath.startsWith("/tmp")) - tempDir = new File(System.getProperty("user.home"), ".queue") - if (!tempDir.exists && !tempDir.mkdirs) - throw new IOException("Could not create directory: " + tempDir.getAbsolutePath()) - absolute(tempDir) - } - def writeContents(file: File, content: String) = FileUtils.writeStringToFile(file, content) - def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File = networkTempDir) = { + def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File) = { val tempFile = absolute(File.createTempFile(prefix, suffix, directory)) writeContents(tempFile, content) tempFile diff --git a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala index 8c0428a76..99396954c 100644 --- a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala @@ -3,8 +3,31 @@ package org.broadinstitute.sting.queue.util import org.broadinstitute.sting.BaseTest import org.junit.{Assert, Test} import java.io.File +import org.broadinstitute.sting.utils.exceptions.UserException class IOUtilsUnitTest extends BaseTest { + @Test + def testGoodTempDir = { + val tmpDir = System.getProperty("java.io.tmpdir") + try { + System.setProperty("java.io.tmpdir", "/tmp/queue") + IOUtils.checkTempDir + } finally { + System.setProperty("java.io.tmpdir", tmpDir) + } + } + + @Test(expected=classOf[UserException.BadTmpDir]) + def testBadTempDir = { + val tmpDir = System.getProperty("java.io.tmpdir") + try { + System.setProperty("java.io.tmpdir", "/tmp") + IOUtils.checkTempDir + } finally { + System.setProperty("java.io.tmpdir", tmpDir) + } + } + @Test def testAbsoluteSubDir = { var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/to/file"))