Queue UI Improvements:

- Forcing user to set the temp directory via -Djava.io.tmpdir to avoid filling up /tmp.
- By default deleting job outputs tagged as intermediate.
- Defaulting pipeline to scatter count 1 (no reads deleted).
- Cleaning up temp classes even when scripting fails.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4573 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-26 19:49:08 +00:00
parent cedceb33cd
commit 8211cee0b2
7 changed files with 65 additions and 53 deletions

View File

@ -73,7 +73,7 @@ public class UserException extends ReviewedStingException {
public static class BadTmpDir extends UserException {
public BadTmpDir(String message) {
super(String.format("Failure working with the tmp directory %s. Override with -Djava.io.tmpdir=X on the command line to a bigger/better file system. Exact error was %s", System.getProperties().get("java.io. tmpdir"), message));
super(String.format("Failure working with the tmp directory %s. Override with -Djava.io.tmpdir=X on the command line to a bigger/better file system. Exact error was %s", System.getProperties().get("java.io.tmpdir"), message));
}
}

View File

@ -36,10 +36,10 @@ class fullCallingPipeline extends QScript {
@Input(doc="per-sample downsampling level",shortName="dcov",required=false)
var downsampling_coverage = 300
@Input(doc="level of parallelism for IndelRealigner. By default uses number of contigs.", shortName="cleanerScatter", required=false)
var num_cleaner_scatter_jobs: Option[Int] = None
@Input(doc="level of parallelism for IndelRealigner. By default is set to 1.", shortName="cleanerScatter", required=false)
var num_cleaner_scatter_jobs = 1
@Input(doc="level of parallelism for UnifiedGenotyper", shortName="snpScatter", required=false)
@Input(doc="level of parallelism for UnifiedGenotyper. By default is set to 20.", shortName="snpScatter", required=false)
var num_snp_scatter_jobs = 20
//@Input(doc="level of parallelism for IndelGenotyperV2", shortName="indelScatter", required=false)
@ -92,7 +92,8 @@ class fullCallingPipeline extends QScript {
// get contigs (needed for indel cleaning parallelism)
val contigs = IntervalScatterFunction.distinctContigs(
qscript.pipeline.getProject.getReferenceFile)
qscript.pipeline.getProject.getReferenceFile,
List(qscript.pipeline.getProject.getIntervalList.getAbsolutePath))
for ( sample <- recalibratedSamples ) {
val sampleId = sample.getId
@ -122,12 +123,7 @@ class fullCallingPipeline extends QScript {
realigner.targetIntervals = targetCreator.out
realigner.intervals = Nil
realigner.intervalsString = Nil
realigner.scatterCount = {
if (num_cleaner_scatter_jobs.isDefined)
num_cleaner_scatter_jobs.get min contigs.size
else
contigs.size
}
realigner.scatterCount = num_cleaner_scatter_jobs min contigs.size
// if scatter count is > 1, do standard scatter gather, if not, explicitly set up fix mates
if (realigner.scatterCount > 1) {

View File

@ -23,7 +23,7 @@ class QCommandLine extends CommandLineProgram with Logging {
*/
def execute = {
val qGraph = new QGraph
val qGraph = QCommandLine.qGraph
qGraph.settings = settings
qGraph.debugMode = debugMode == true
@ -36,15 +36,6 @@ class QCommandLine extends CommandLineProgram with Logging {
logger.info("Added " + script.functions.size + " functions")
}
Runtime.getRuntime.addShutdownHook(new Thread {
/** Cleanup as the JVM shuts down. */
override def run = {
qGraph.shutdown()
ProcessController.shutdown()
QScriptManager.deleteOutdir()
}
})
qGraph.run
if (qGraph.hasFailed) {
@ -98,11 +89,23 @@ class QCommandLine extends CommandLineProgram with Logging {
* Entry point of Queue. Compiles and runs QScripts passed in to the command line.
*/
object QCommandLine {
private val qGraph = new QGraph
/**
* Main.
* @param argv Arguments.
*/
def main(argv: Array[String]) {
Runtime.getRuntime.addShutdownHook(new Thread {
/** Cleanup as the JVM shuts down. */
override def run = {
qGraph.shutdown()
ProcessController.shutdown()
QScriptManager.deleteOutdir()
}
})
try {
CommandLineProgram.start(new QCommandLine, argv);
if (CommandLineProgram.result != 0)

View File

@ -12,8 +12,8 @@ import org.jgrapht.event.{TraversalListenerAdapter, EdgeTraversalEvent}
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFunction, QFunction}
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.broadinstitute.sting.queue.util.{EmailMessage, JobExitException, LsfKillJob, Logging}
import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.util._
/**
* The internal dependency tracker between sets of function input and output files.
@ -46,6 +46,7 @@ class QGraph extends Logging {
* Checks the functions for missing values and the graph for cyclic dependencies and then runs the functions in the graph.
*/
def run = {
IOUtils.checkTempDir
val numMissingValues = fillGraph
val isReady = numMissingValues == 0
@ -698,7 +699,7 @@ class QGraph extends Logging {
}
private def deleteIntermediateOutputs() = {
if (settings.deleteIntermediates && !hasFailed) {
if (!settings.keepIntermediates && !hasFailed) {
logger.info("Deleting intermediate files.")
traverseFunctions(edge => {
if (edge.function.isIntermediate) {

View File

@ -36,8 +36,8 @@ class QGraphSettings {
@Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
var statusEmailTo: List[String] = Nil
@Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false)
var deleteIntermediates = false
@Argument(fullName="keep_intermediate_outputs", shortName="keepIntermediates", doc="After a successful run keep the outputs of any Function marked as intermediate.", required=false)
var keepIntermediates = false
@Argument(fullName="retry_failed", shortName="retry", doc="Retry the specified number of times after a command fails. Defaults to no retries.", required=false)
var retries = 0

View File

@ -1,7 +1,8 @@
package org.broadinstitute.sting.queue.util
import org.apache.commons.io.FileUtils
import java.io.{FileReader, IOException, File}
import java.io.{FileReader, File}
import org.broadinstitute.sting.utils.exceptions.UserException
/**
* A collection of utilities for modifying java.io.
@ -48,21 +49,23 @@ object IOUtils {
absolute(new File(parentAbs, file.getPath))
}
def checkTempDir = {
val javaTemp = System.getProperty("java.io.tmpdir")
// Keeps the user from leaving the temp directory as the default, and on Macs from having pluses
// in the path which can cause problems with the Google Reflections library.
// see also: http://benjchristensen.com/2009/09/22/mac-osx-10-6-java-java-io-tmpdir/
if (javaTemp.startsWith("/var/folders/") || (javaTemp == "/tmp") || (javaTemp == "/tmp/"))
throw new UserException.BadTmpDir("java.io.tmpdir must be explicitly set")
}
/**
* Returns the temp directory as defined by java.
* @return the temp directory as defined by java.
*/
def javaTempDir() = {
var javaTemp = System.getProperty("java.io.tmpdir")
// Keep the temp directory from having pluses in it, which can cause problems with the Google Reflections library.
// see also: http://benjchristensen.com/2009/09/22/mac-osx-10-6-java-java-io-tmpdir/
if (javaTemp.startsWith("/var/folders/")) {
javaTemp = "/tmp/"
System.setProperty("java.io.tmpdir", javaTemp)
}
val tempDir = new File(javaTemp)
val tempDir = new File(System.getProperty("java.io.tmpdir"))
if (!tempDir.exists && !tempDir.mkdirs)
throw new IOException("Could not create directory: " + tempDir.getAbsolutePath())
throw new UserException.BadTmpDir("Could not create directory: " + tempDir.getAbsolutePath())
absolute(tempDir)
}
@ -71,36 +74,22 @@ object IOUtils {
* @param prefix Prefix for the directory name.
* @param suffix Optional suffix for the directory name. Defaults to "".
* @return The created temporary directory.
* @throws IOException if the directory could not be created.
*/
def tempDir(prefix: String, suffix: String = "") = {
val tempDirParent = javaTempDir()
if (!tempDirParent.exists && !tempDirParent.mkdirs)
throw new IOException("Could not create temp directory: " + tempDirParent)
throw new UserException.BadTmpDir("Could not create temp directory: " + tempDirParent)
val temp = File.createTempFile(prefix + "-", suffix)
if (!temp.delete)
throw new IOException("Could not delete sub file: " + temp.getAbsolutePath())
throw new UserException.BadTmpDir("Could not delete sub file: " + temp.getAbsolutePath())
if (!temp.mkdir)
throw new IOException("Could not create sub directory: " + temp.getAbsolutePath())
throw new UserException.BadTmpDir("Could not create sub directory: " + temp.getAbsolutePath())
absolute(temp)
}
/**
* Returns a temp directory that should be accessible from any network location.
* @return a temp directory that should be accessible from any network location.
*/
def networkTempDir() = {
var tempDir = javaTempDir()
if (tempDir.getAbsolutePath.startsWith("/tmp"))
tempDir = new File(System.getProperty("user.home"), ".queue")
if (!tempDir.exists && !tempDir.mkdirs)
throw new IOException("Could not create directory: " + tempDir.getAbsolutePath())
absolute(tempDir)
}
def writeContents(file: File, content: String) = FileUtils.writeStringToFile(file, content)
def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File = networkTempDir) = {
def writeTempFile(content: String, prefix: String, suffix: String = "", directory: File) = {
val tempFile = absolute(File.createTempFile(prefix, suffix, directory))
writeContents(tempFile, content)
tempFile

View File

@ -3,8 +3,31 @@ package org.broadinstitute.sting.queue.util
import org.broadinstitute.sting.BaseTest
import org.junit.{Assert, Test}
import java.io.File
import org.broadinstitute.sting.utils.exceptions.UserException
class IOUtilsUnitTest extends BaseTest {
@Test
def testGoodTempDir = {
val tmpDir = System.getProperty("java.io.tmpdir")
try {
System.setProperty("java.io.tmpdir", "/tmp/queue")
IOUtils.checkTempDir
} finally {
System.setProperty("java.io.tmpdir", tmpDir)
}
}
@Test(expected=classOf[UserException.BadTmpDir])
def testBadTempDir = {
val tmpDir = System.getProperty("java.io.tmpdir")
try {
System.setProperty("java.io.tmpdir", "/tmp")
IOUtils.checkTempDir
} finally {
System.setProperty("java.io.tmpdir", tmpDir)
}
}
@Test
def testAbsoluteSubDir = {
var subDir = IOUtils.subDir(IOUtils.CURRENT_DIR, new File("/path/to/file"))