diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index 416a5d57a..8a6cbc537 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -1,10 +1,13 @@ import net.sf.picard.reference.FastaSequenceFile +import org.broadinstitute.sting.commandline.ArgumentSource import org.broadinstitute.sting.datasources.pipeline.Pipeline import org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeCalculationModel.Model -import org.broadinstitute.sting.gatk.{CommandLineGATK, DownsampleType} +import org.broadinstitute.sting.gatk.DownsampleType import org.broadinstitute.sting.queue.extensions.gatk._ import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction import org.broadinstitute.sting.queue.extensions.samtools._ +import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction} +import org.broadinstitute.sting.queue.util.IOUtils import org.broadinstitute.sting.queue.{QException, QScript} import collection.JavaConversions._ import org.broadinstitute.sting.utils.yaml.YamlUtils @@ -61,7 +64,7 @@ class fullCallingPipeline extends QScript { this.intervals = qscript.pipeline.getProject.getIntervalList this.jarFile = qscript.gatkJar this.reference_sequence = qscript.pipeline.getProject.getReferenceFile - this.memoryLimit = Some(6) + this.memoryLimit = Some(4) } @@ -88,30 +91,36 @@ class fullCallingPipeline extends QScript { } for ( sample <- recalibratedSamples ) { + val sampleId = sample.getId // put unclean bams in unclean genotypers in advance, create the extension files val bam = sample.getBamFiles.get("recalibrated") if (!sample.getBamFiles.contains("cleaned")) { - sample.getBamFiles.put("cleaned", swapExt(bam,"bam","cleaned.bam")) + sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam")) } val cleaned_bam = sample.getBamFiles.get("cleaned") - val indel_targets = swapExt(bam,"bam","realigner_targets.interval_list") + val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list") // create the cleaning commands val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs - targetCreator.analysisName = "CreateTargets_"+bam.getName + targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId)) + targetCreator.jobName = "CreateTargets_"+sampleId + targetCreator.analysisName = "CreateTargets_"+sampleId targetCreator.input_file :+= bam targetCreator.out = indel_targets + targetCreator.memoryLimit = Some(2) val realigner = new IndelRealigner with CommandLineGATKArgs - realigner.analysisName = "RealignBam_"+bam.getName + realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId)) + realigner.analysisName = "RealignBam_"+sampleId realigner.input_file = targetCreator.input_file realigner.intervals = qscript.contigIntervals - realigner.targetIntervals = new java.io.File(targetCreator.out.getAbsolutePath) + realigner.targetIntervals = targetCreator.out realigner.scatterCount = contigCount // may need to explicitly run fix mates var fixMates = new PicardBamJarFunction { + jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) // Declare inputs/outputs for dependency tracking. @Input(doc="unfixed bam") var unfixed: File = _ @Output(doc="fixed bam") var fixed: File = _ @@ -129,15 +138,30 @@ class fullCallingPipeline extends QScript { realigner.out = cleaned_bam // While gathering run fix mates. realigner.scatterClass = classOf[ContigScatterFunction] + realigner.setupScatterFunction = { + case (scatter: ScatterFunction, _) => + scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId)) + scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId)) + } + realigner.setupCloneFunction = { + case (clone: CloneFunction, index: Int) => + clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index)) + clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index)) + } realigner.setupGatherFunction = { - case (gather: BamGatherFunction, _) => + case (gather: BamGatherFunction, source: ArgumentSource) => + gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) + gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId)) gather.memoryLimit = Some(6) gather.jarFile = qscript.picardFixMatesJar // Don't pass this AS=true to fix mates! gather.assumeSorted = None + case (gather: GatherFunction, source: ArgumentSource) => + gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName)) + gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName)) } } else { - realigner.out = swapExt(bam,"bam","unfixed.cleaned.bam") + realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam") // Explicitly run fix mates if the function won't be scattered. @@ -145,13 +169,14 @@ class fullCallingPipeline extends QScript { fixMates.jarFile = qscript.picardFixMatesJar fixMates.unfixed = realigner.out fixMates.fixed = cleaned_bam - fixMates.analysisName = "FixMates_"+bam.getName + fixMates.analysisName = "FixMates_"+sampleId // Add the fix mates explicitly } var samtoolsindex = new SamtoolsIndexFunction + samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId)) samtoolsindex.bamFile = cleaned_bam - samtoolsindex.analysisName = "index_"+cleaned_bam.getName + samtoolsindex.analysisName = "index_cleaned_"+sampleId if (!qscript.skip_cleaning) { if ( realigner.scatterCount > 1 ) { @@ -162,35 +187,30 @@ class fullCallingPipeline extends QScript { } } - val recalibratedBamFiles = recalibratedSamples - .map(_.getBamFiles.get("recalibrated")) - .toList - - val cleanBamFiles = qscript.pipeline.getSamples - .filter(_.getBamFiles.contains("cleaned")) - .map(_.getBamFiles.get("cleaned")) - .toList - // actually make calls if (!qscript.skip_cleaning) { - //endToEnd(cleanedBase, cleanBamFiles, adprRscript, seq, expKind) - endToEnd(cleanedBase, cleanBamFiles) + //endToEnd(cleanedBase, "cleaned", adprRscript, seq, expKind) + endToEnd(cleanedBase, "cleaned") } else { - //endToEnd(uncleanedBase, recalibratedBamFiles, adprRscript, seq, expKind) - endToEnd(uncleanedBase, recalibratedBamFiles) + //endToEnd(uncleanedBase, "recalibrated", adprRscript, seq, expKind) + endToEnd(uncleanedBase, "recalibrated") } } - //def endToEnd(base: String, bamFiles: List[File], adprthing: File, seqinfo: String, exptype: String) = { - def endToEnd(base: String, bamFiles: List[File]) = { + //def endToEnd(base: String, bamType: String, adprthing: File, seqinfo: String, exptype: String) = { + def endToEnd(base: String, bamType: String) = { + + val samples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains(bamType)).toList + val bamFiles = samples.map(_.getBamFiles.get(bamType)) // step through the un-indel-cleaned graph: // 1a. call snps and indels val snps = new UnifiedGenotyper with CommandLineGATKArgs + snps.jobOutputFile = new File(".queue/logs/SNPCalling/UnifiedGenotyper.out") snps.analysisName = base+"_SNP_calls" snps.input_file = bamFiles snps.group :+= "Standard" - snps.out = new File(base+".vcf") + snps.out = new File("SnpCalls", base+".vcf") snps.standard_min_confidence_threshold_for_emitting = Some(10) snps.min_mapping_quality_score = Some(20) snps.min_base_quality_score = Some(20) @@ -210,17 +230,36 @@ class fullCallingPipeline extends QScript { //} snps.scatterCount = qscript.num_snp_scatter_jobs + snps.setupScatterFunction = { + case (scatter: ScatterFunction, _) => + scatter.commandDirectory = new File("SnpCalls/ScatterGather") + scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter.out") + } + snps.setupCloneFunction = { + case (clone: CloneFunction, index: Int) => + clone.commandDirectory = new File("SnpCalls/ScatterGather/Scatter_%s".format(index)) + clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter_%s.out".format(index)) + } + snps.setupGatherFunction = { + case (gather: GatherFunction, source: ArgumentSource) => + gather.commandDirectory = new File("SnpCalls/ScatterGather/Gather_%s".format(source.field.getName)) + gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Gather_%s.out".format(source.field.getName)) + } // indel genotyper does one sample at a time var indelCallFiles = List.empty[RodBind] var indelGenotypers = List.empty[IndelGenotyperV2 with CommandLineGATKArgs] var loopNo = 0 var priority = "" - for ( bam <- bamFiles ) { + for ( sample <- samples ) { + val sampleId = sample.getId + val bam = sample.getBamFiles.get(bamType) + var indel = new IndelGenotyperV2 with CommandLineGATKArgs - indel.analysisName = "IndelGenotyper_"+bam.getName + indel.jobOutputFile = new File(".queue/logs/IndelCalling/%s/IndelGenotyperV2.out".format(sampleId)) + indel.analysisName = "IndelGenotyper_"+sampleId indel.input_file :+= bam - indel.out = swapExt(bam,".bam",".indels.vcf") + indel.out = swapExt("IndelCalls/IntermediateFiles/" + sampleId, bam,".bam",".indels.vcf") indel.downsample_to_coverage = Some(qscript.downsampling_coverage) indelCallFiles :+= RodBind("v"+loopNo.toString, "VCF", indel.out) //indel.scatterCount = qscript.num_indel_scatter_jobs @@ -235,49 +274,56 @@ class fullCallingPipeline extends QScript { loopNo += 1 } val mergeIndels = new CombineVariants with CommandLineGATKArgs - mergeIndels.out = new TaggedFile(qscript.pipeline.getProject.getName+".indels.vcf","vcf") + mergeIndels.jobOutputFile = new File(".queue/logs/IndelCalling/CombineVariants.out") + mergeIndels.out = new TaggedFile("IndelCalls/" + qscript.pipeline.getProject.getName+".indels.vcf","vcf") mergeIndels.genotypemergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.GenotypeMergeType.UNIQUIFY) mergeIndels.priority = priority mergeIndels.variantmergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION) mergeIndels.rodBind = indelCallFiles mergeIndels.analysisName = base+"_MergeIndels" + mergeIndels.memoryLimit = Some(16) + mergeIndels.jobQueue = "gsa" // 1b. genomically annotate SNPs -- no longer slow val annotated = new GenomicAnnotator with CommandLineGATKArgs + annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out") annotated.rodBind :+= RodBind("variant", "VCF", snps.out) annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.refseqTable) //annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable) - annotated.out = swapExt(snps.out,".vcf",".annotated.vcf") + annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf") //annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet" annotated.rodToIntervalTrackName = "variant" annotated.analysisName = base+"_GenomicAnnotator" // 2.a filter on cluster and near indels val masker = new VariantFiltration with CommandLineGATKArgs + masker.jobOutputFile = new File(".queue/logs/SNPCalling/Masker.out") masker.variantVCF = annotated.out masker.rodBind :+= RodBind("mask", "VCF", mergeIndels.out) masker.maskName = "NearIndel" masker.clusterWindowSize = Some(10) masker.clusterSize = Some(3) - masker.out = swapExt(annotated.out,".vcf",".indel.masked.vcf") + masker.out = swapExt("SnpCalls",annotated.out,".vcf",".indel.masked.vcf") masker.analysisName = base+"_Cluster_and_Indel_filter" // 2.b hand filter with standard filter val handFilter = new VariantFiltration with CommandLineGATKArgs + handFilter.jobOutputFile = new File(".queue/logs/SNPCalling/HandFilter.out") handFilter.variantVCF = masker.out handFilter.rodBind :+= RodBind("mask", "VCF", mergeIndels.out) handFilter.filterName ++= List("StrandBias","AlleleBalance","QualByDepth","HomopolymerRun") handFilter.filterExpression ++= List("\"SB>=0.10\"","\"AB>=0.75\"","\"QD<5.0\"","\"HRun>=4\"") - handFilter.out = swapExt(annotated.out,".vcf",".handfiltered.vcf") + handFilter.out = swapExt("SnpCalls",annotated.out,".vcf",".handfiltered.vcf") handFilter.analysisName = base+"_HandFilter" // 3.i generate gaussian clusters on the masked vcf // todo -- args for annotations? // todo -- args for resources (properties file) val clusters = new GenerateVariantClusters with CommandLineGATKArgs + clusters.jobOutputFile = new File(".queue/logs/SNPCalling/Clusters.out") clusters.rodBind :+= RodBind("input", "VCF", masker.out) clusters.DBSNP = qscript.pipeline.getProject.getDbsnpFile - val clusters_clusterFile = swapExt(new File(snps.out.getAbsolutePath),".vcf",".cluster") + val clusters_clusterFile = swapExt("SnpCalls/IntermediateFiles",snps.out,".vcf",".cluster") clusters.clusterFile = clusters_clusterFile clusters.memoryLimit = Some(6) clusters.jobQueue = "gsa" @@ -287,19 +333,21 @@ class fullCallingPipeline extends QScript { // 3.ii apply gaussian clusters to the masked vcf val recalibrate = new VariantRecalibrator with CommandLineGATKArgs + recalibrate.jobOutputFile = new File(".queue/logs/SNPCalling/Recalibrator.out") recalibrate.clusterFile = clusters.clusterFile recalibrate.DBSNP = qscript.pipeline.getProject.getDbsnpFile recalibrate.rodBind :+= RodBind("input", "VCF", masker.out) - recalibrate.out = swapExt(masker.out,".vcf",".recalibrated.vcf") + recalibrate.out = swapExt("SnpCalls",masker.out,".vcf",".recalibrated.vcf") recalibrate.target_titv = qscript.target_titv - recalibrate.report_dat_file = swapExt(masker.out,".vcf",".recalibrate.dat") - recalibrate.tranches_file = swapExt(masker.out,".vcf",".recalibrate.tranches") + recalibrate.report_dat_file = swapExt("SnpCalls/IntermediateFiles", masker.out,".vcf",".recalibrate.dat") + recalibrate.tranches_file = swapExt("SnpCalls/IntermediateFiles", masker.out,".vcf",".recalibrate.tranches") recalibrate.analysisName = base+"_VariantRecalibrator" // 3.iii apply variant cuts to the clusters val cut = new ApplyVariantCuts with CommandLineGATKArgs + cut.jobOutputFile = new File(".queue/logs/SNPCalling/VariantCuts.out") cut.rodBind :+= RodBind("input", "VCF", recalibrate.out) - cut.out = swapExt(recalibrate.out,".vcf",".tranched.vcf") + cut.out = swapExt("SnpCalls",recalibrate.out,".vcf",".tranched.vcf") cut.tranches_file = recalibrate.tranches_file // todo -- fdr inputs, etc cut.fdr_filter_level = Some(1) @@ -307,13 +355,14 @@ class fullCallingPipeline extends QScript { // 4. Variant eval the cut and the hand-filtered vcf files val eval = new VariantEval with CommandLineGATKArgs + eval.jobOutputFile = new File(".queue/logs/SNPCalling/VariantEval.out") eval.rodBind :+= RodBind("evalOptimized", "VCF", cut.out) eval.rodBind :+= RodBind("evalHandFiltered", "VCF", handFilter.out) eval.evalModule ++= List("CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator") - eval.reportLocation = new File(base+".eval") + eval.reportLocation = new File("SnpCalls", base+".eval") eval.reportType = Option(org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType.R) eval.analysisName = base+"_VariantEval" - eval.DBSNP = qscript.pipeline.getProject.getDBSNP + eval.DBSNP = qscript.pipeline.getProject.getDbsnpFile add(snps) @@ -335,7 +384,8 @@ class fullCallingPipeline extends QScript { // adpr.sequencer = seqinfo // adpr.protocol = exptype // adpr.dependents = eval.reportLocation -// adpr.out = new File(base + "_adpr.pdf") +// adpr.jobOutputFile = new File(".queue/logs/Reporting/ADPR.out") +// adpr.out = new File("Reporting", base + "_adpr.pdf") // adpr.analysisName = base + "_ADPR" //In order for ADPR to finish successfully, a squid file for both the lane and sample level data needs to be // produced, reformatted and named _lanes.txt or _samps.txt, respectively. These files diff --git a/scala/qscript/kshakir/CleanBamFile.scala b/scala/qscript/kshakir/CleanBamFile.scala index 77da34e1a..7d9e92da1 100644 --- a/scala/qscript/kshakir/CleanBamFile.scala +++ b/scala/qscript/kshakir/CleanBamFile.scala @@ -86,7 +86,6 @@ class CleanBamFile extends QScript { this.reference_sequence = qscript.referenceFile this.intervals = qscript.intervals this.input_file :+= recalibratedBam - this.cleanupTempDirectories = true } def baseFile(suffix: String) = new File(baseName + suffix) diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index 792e1c5e2..1d3d828a6 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -30,12 +30,39 @@ trait QScript extends Logging { /** * Exchanges the extension on a file. + * @param file File to look for the extension. + * @param oldExtension Old extension to strip off, if present. + * @param newExtension New extension to append. + * @return new File with the new extension in the current directory. */ protected def swapExt(file: File, oldExtension: String, newExtension: String) = new File(file.getName.stripSuffix(oldExtension) + newExtension) + /** + * Exchanges the extension on a file. + * @param dir New directory for the file. + * @param file File to look for the extension. + * @param oldExtension Old extension to strip off, if present. + * @param newExtension New extension to append. + * @return new File with the new extension in dir. + */ + protected def swapExt(dir: String, file: File, oldExtension: String, newExtension: String) = + new File(dir, file.getName.stripSuffix(oldExtension) + newExtension) + + /** + * Exchanges the extension on a file. + * @param dir New directory for the file. + * @param file File to look for the extension. + * @param oldExtension Old extension to strip off, if present. + * @param newExtension New extension to append. + * @return new File with the new extension in dir. + */ + protected def swapExt(dir: File, file: File, oldExtension: String, newExtension: String) = + new File(dir, file.getName.stripSuffix(oldExtension) + newExtension) + /** * Adds one or more command line functions to be run. + * @param functions Functions to add. */ def add(functions: CommandLineFunction*) = this.functions ++= List(functions:_*) diff --git a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index ea372a439..73f4be6dd 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -1,27 +1,29 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.util.Logging +import org.broadinstitute.sting.queue.util.{IOUtils, Logging} /** * Runs a function that executes in process and does not fork out an external process. */ -class InProcessRunner(function: InProcessFunction) extends JobRunner with Logging { +class InProcessRunner(val function: InProcessFunction) extends JobRunner with Logging { private var runStatus: RunnerStatus.Value = _ def start() = { - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + function.description) - } else { - logger.info("Starting: " + function.description) - } - - function.doneOutputs.foreach(_.delete()) - function.failOutputs.foreach(_.delete()) - runStatus = RunnerStatus.RUNNING try { + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + function.description) + } else { + logger.info("Starting: " + function.description) + } + + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) + runStatus = RunnerStatus.RUNNING + function.mkOutputDirectories() function.run() function.doneOutputs.foreach(_.createNewFile()) + writeDone() runStatus = RunnerStatus.DONE logger.info("Done: " + function.description) } catch { @@ -29,6 +31,7 @@ class InProcessRunner(function: InProcessFunction) extends JobRunner with Loggin runStatus = RunnerStatus.FAILED try { function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 74625992f..3be4fc543 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -1,5 +1,9 @@ package org.broadinstitute.sting.queue.engine +import org.broadinstitute.sting.queue.util.IOUtils +import java.io.{PrintWriter, StringWriter} +import org.broadinstitute.sting.queue.function.QFunction + /** * Base interface for job runners. */ @@ -18,4 +22,24 @@ trait JobRunner { * @return RUNNING, DONE, or FAILED. */ def status: RunnerStatus.Value + + /** + * Returns the function to be run. + */ + def function: QFunction + + protected def writeDone() = { + val content = "%s%nDone.".format(function.description) + IOUtils.writeContents(function.jobOutputFile, content) + } + + protected def writeStackTrace(e: Throwable) = { + val stackTrace = new StringWriter + val printWriter = new PrintWriter(stackTrace) + printWriter.println(function.description) + e.printStackTrace(printWriter) + printWriter.close + val outputFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile + IOUtils.writeContents(outputFile, stackTrace.toString) + } } diff --git a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala index 7c2910e69..30b73c412 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/LsfJobRunner.scala @@ -7,10 +7,10 @@ import org.broadinstitute.sting.queue.util._ /** * Runs jobs on an LSF compute cluster. */ -class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with Logging { +class LsfJobRunner(val function: CommandLineFunction) extends DispatchJobRunner with Logging { private var runStatus: RunnerStatus.Value = _ - var job: LsfJob = _ + var job: LsfJob = new LsfJob /** A file to look for to validate that the function ran to completion. */ private var jobStatusPath: String = _ @@ -35,59 +35,62 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with * @param function Command to run. */ def start() = { - job = new LsfJob - // job.name = function.jobName TODO: Make setting the job name optional. - job.outputFile = function.jobOutputFile - job.errorFile = function.jobErrorFile - job.project = function.jobProject - job.queue = function.jobQueue - - if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory)) - job.workingDir = function.commandDirectory - - job.extraBsubArgs ++= function.extraArgs - - if (function.jobRestartable) - job.extraBsubArgs :+= "-r" - - if (function.memoryLimit.isDefined) - job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") - - job.name = function.commandLine.take(1000) - - // TODO: Look into passing in a single chained script as recommended by Doug instead of pre, exec, and post. - exec = writeExec() - job.command = "sh " + exec - - preExec = writePreExec() - job.preExecCommand = "sh " + preExec - - postExec = writePostExec() - job.postExecCommand = "sh " + postExec - - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) - } else { - logger.info("Starting: " + job.bsubCommand.mkString(" ")) - } - - function.jobOutputFile.delete() - if (function.jobErrorFile != null) - function.jobErrorFile.delete() - - runStatus = RunnerStatus.RUNNING try { + function.mkOutputDirectories() + + // job.name = function.jobName TODO: Make setting the job name optional. + job.outputFile = function.jobOutputFile + job.errorFile = function.jobErrorFile + job.project = function.jobProject + job.queue = function.jobQueue + + if (!IOUtils.CURRENT_DIR_ABS.equals(function.commandDirectory)) + job.workingDir = function.commandDirectory + + job.extraBsubArgs ++= function.extraArgs + + if (function.jobRestartable) + job.extraBsubArgs :+= "-r" + + if (function.memoryLimit.isDefined) + job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]") + + job.name = function.commandLine.take(1000) + + exec = writeExec() + job.command = "sh " + exec + + preExec = writePreExec() + job.preExecCommand = "sh " + preExec + + postExec = writePostExec() + job.postExecCommand = "sh " + postExec + + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" ")) + } else { + logger.info("Starting: " + job.bsubCommand.mkString(" ")) + } + + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + + runStatus = RunnerStatus.RUNNING Retry.attempt(() => job.run(), 1, 5, 10) jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString logger.info("Submitted LSF job id: " + job.bsubJobId) } catch { - case re: RetryException => - removeTemporaryFiles() - runStatus = RunnerStatus.FAILED case e => - logger.error("Error trying to start job.", e) - removeTemporaryFiles() runStatus = RunnerStatus.FAILED + try { + removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + job.bsubCommand.mkString(" "), e) } } @@ -100,20 +103,33 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with * .done files used to determine if a file has been created successfully. */ def status = { - if (logger.isDebugEnabled) { - logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists)) - logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists)) - } + try { + if (logger.isDebugEnabled) { + logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists)) + logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists)) + } - if (jobFailFile.exists) { - removeTemporaryFiles() - runStatus = RunnerStatus.FAILED - logger.info("Error: " + job.bsubCommand.mkString(" ")) - tailError() - } else if (jobDoneFile.exists) { - removeTemporaryFiles() - runStatus = RunnerStatus.DONE - logger.info("Done: " + job.bsubCommand.mkString(" ")) + if (jobFailFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.FAILED + logger.info("Error: " + job.bsubCommand.mkString(" ")) + tailError() + } else if (jobDoneFile.exists) { + removeTemporaryFiles() + runStatus = RunnerStatus.DONE + logger.info("Done: " + job.bsubCommand.mkString(" ")) + } + } catch { + case e => + runStatus = RunnerStatus.FAILED + try { + removeTemporaryFiles() + function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) + } catch { + case _ => /* ignore errors in the exception handler */ + } + logger.error("Error: " + job.bsubCommand.mkString(" "), e) } runStatus diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index fc2e9aeee..30cbb9121 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -29,6 +29,8 @@ class QGraph extends Logging { var statusEmailTo: List[String] = _ private val jobGraph = newGraph + private var shuttingDown = false + private val nl = "%n".format() /** * Adds a QScript created CommandLineFunction to the graph. @@ -86,7 +88,7 @@ class QGraph extends Logging { .function.asInstanceOf[ScatterGatherableFunction] .generateFunctions() if (this.debugMode) - logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format()))) + logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString(nl))) addedFunctions ++= functions } @@ -240,23 +242,20 @@ class QGraph extends Logging { private def dryRunJobs() = { traverseFunctions(edge => { edge.function match { - case clf: CommandLineFunction => { + case qFunction => { if (logger.isDebugEnabled) { - logger.debug(clf.commandDirectory + " > " + clf.commandLine) + logger.debug(qFunction.commandDirectory + " > " + qFunction.description) } else { - logger.info(clf.commandLine) + logger.info(qFunction.description) } - logger.info("Output written to " + clf.jobOutputFile) - if (clf.jobErrorFile != null) { - logger.info("Errors written to " + clf.jobErrorFile) + logger.info("Output written to " + qFunction.jobOutputFile) + if (qFunction.jobErrorFile != null) { + logger.info("Errors written to " + qFunction.jobErrorFile) } else { if (logger.isDebugEnabled) - logger.info("Errors also written to " + clf.jobOutputFile) + logger.info("Errors also written to " + qFunction.jobOutputFile) } } - case qFunction => { - logger.info(qFunction.description) - } } }) } @@ -276,7 +275,7 @@ class QGraph extends Logging { var readyJobs = getReadyJobs var runningJobs = Set.empty[FunctionEdge] - while (readyJobs.size + runningJobs.size > 0) { + while (!shuttingDown && readyJobs.size + runningJobs.size > 0) { var exitedJobs = List.empty[FunctionEdge] var failedJobs = List.empty[FunctionEdge] @@ -304,12 +303,12 @@ class QGraph extends Logging { Thread.sleep(30000L) readyJobs = getReadyJobs } - - emailStatus() } catch { case e => logger.error("Uncaught error running jobs.", e) throw e + } finally { + emailStatus() } } @@ -327,54 +326,64 @@ class QGraph extends Logging { } } - private def emailFailedJobs(jobs: List[FunctionEdge]) = { + private def emailFailedJobs(failed: List[FunctionEdge]) = { if (statusEmailTo.size > 0) { val emailMessage = new EmailMessage emailMessage.from = statusEmailFrom emailMessage.to = statusEmailTo - emailMessage.body = getStatus emailMessage.subject = "Queue function: Failure" - emailMessage.body = "Failed functions: %n%n%s%n" - .format(jobs.map(_.function.description).mkString("%n%n".format())) - emailMessage.attachments = jobs.flatMap(edge => logFiles(edge)) + addFailedFunctions(emailMessage, failed) emailMessage.trySend(qSettings.emailSettings) } } private def emailStatus() = { if (statusEmailTo.size > 0) { - var failedFunctions = List.empty[String] - var failedOutputs = List.empty[File] + var failed = List.empty[FunctionEdge] foreachFunction(edge => { if (edge.status == RunnerStatus.FAILED) { - failedFunctions :+= edge.function.description - failedOutputs ++= logFiles(edge) + failed :+= edge } }) val emailMessage = new EmailMessage emailMessage.from = statusEmailFrom emailMessage.to = statusEmailTo - emailMessage.body = getStatus + "%n".format() - if (failedFunctions.size == 0) { + emailMessage.body = getStatus + nl + if (failed.size == 0) { emailMessage.subject = "Queue run: Success" } else { emailMessage.subject = "Queue run: Failure" - emailMessage.attachments = failedOutputs + addFailedFunctions(emailMessage, failed) } emailMessage.trySend(qSettings.emailSettings) } } + private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) = { + val logs = failed.flatMap(edge => logFiles(edge)) + + if (emailMessage.body == null) + emailMessage.body = "" + emailMessage.body += """ + |Failed functions: + | + |%s + | + |Logs: + |%s%n + |""".stripMargin.trim.format( + failed.map(_.function.description).mkString(nl+nl), + logs.map(_.getAbsolutePath).mkString(nl)) + + emailMessage.attachments = logs + } + private def logFiles(edge: FunctionEdge) = { - // TODO: All functions should be writing error files, including InProcessFunctions var failedOutputs = List.empty[File] - if (edge.function.isInstanceOf[CommandLineFunction]) { - val clf = edge.function.asInstanceOf[CommandLineFunction] - failedOutputs :+= clf.jobOutputFile - if (clf.jobErrorFile != null) - failedOutputs :+= clf.jobErrorFile - } + failedOutputs :+= edge.function.jobOutputFile + if (edge.function.jobErrorFile != null) + failedOutputs :+= edge.function.jobErrorFile failedOutputs.filter(file => file != null && file.exists) } @@ -408,7 +417,6 @@ class QGraph extends Logging { */ private def getStatus = { val buffer = new StringBuilder - val nl = "%n".format() doStatus(status => buffer.append(status).append(nl)) buffer.toString } @@ -631,6 +639,7 @@ class QGraph extends Logging { * Kills any forked jobs still running. */ def shutdown() { + shuttingDown = true val lsfJobRunners = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner]) if (lsfJobRunners.size > 0) { for (jobRunners <- lsfJobRunners.filterNot(_.job.bsubJobId == null).grouped(10)) { diff --git a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala index a1afd7e40..b5efdbe7d 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/ShellJobRunner.scala @@ -1,12 +1,12 @@ package org.broadinstitute.sting.queue.engine -import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob} +import org.broadinstitute.sting.queue.util.{Logging, ShellJob} import org.broadinstitute.sting.queue.function.CommandLineFunction /** * Runs jobs one at a time locally */ -class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Logging { +class ShellJobRunner(val function: CommandLineFunction) extends JobRunner with Logging { private var runStatus: RunnerStatus.Value = _ /** @@ -14,42 +14,44 @@ class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Loggi * @param function Command to run. */ def start() = { - val job = new ShellJob - job.command = function.commandLine - job.workingDir = function.commandDirectory - job.outputFile = function.jobOutputFile - job.errorFile = function.jobErrorFile - - if (logger.isDebugEnabled) { - logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine) - } else { - logger.info("Starting: " + function.commandLine) - } - - logger.info("Output written to " + function.jobOutputFile) - if (function.jobErrorFile != null) { - logger.info("Errors written to " + function.jobErrorFile) - } else { - if (logger.isDebugEnabled) - logger.info("Errors also written to " + function.jobOutputFile) - } - - function.jobOutputFile.delete() - if (function.jobErrorFile != null) - function.jobErrorFile.delete() - function.doneOutputs.foreach(_.delete()) - function.failOutputs.foreach(_.delete()) - runStatus = RunnerStatus.RUNNING try { - job.run() - function.doneOutputs.foreach(_.createNewFile()) - runStatus = RunnerStatus.DONE - logger.info("Done: " + function.commandLine) + val job = new ShellJob + job.command = function.commandLine + job.workingDir = function.commandDirectory + job.outputFile = function.jobOutputFile + job.errorFile = function.jobErrorFile + + if (logger.isDebugEnabled) { + logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine) + } else { + logger.info("Starting: " + function.commandLine) + } + + logger.info("Output written to " + function.jobOutputFile) + if (function.jobErrorFile != null) { + logger.info("Errors written to " + function.jobErrorFile) + } else { + if (logger.isDebugEnabled) + logger.info("Errors also written to " + function.jobOutputFile) + } + + function.jobOutputFile.delete() + if (function.jobErrorFile != null) + function.jobErrorFile.delete() + function.doneOutputs.foreach(_.delete()) + function.failOutputs.foreach(_.delete()) + runStatus = RunnerStatus.RUNNING + function.mkOutputDirectories() + job.run() + function.doneOutputs.foreach(_.createNewFile()) + runStatus = RunnerStatus.DONE + logger.info("Done: " + function.commandLine) } catch { - case e: JobExitException => + case e => runStatus = RunnerStatus.FAILED try { function.failOutputs.foreach(_.createNewFile()) + writeStackTrace(e) } catch { case _ => /* ignore errors in the exception handler */ } diff --git a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala index 987e8bbba..af74c3b70 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/CommandLineFunction.scala @@ -1,10 +1,6 @@ package org.broadinstitute.sting.queue.function import org.broadinstitute.sting.queue.util._ -import org.broadinstitute.sting.commandline._ -import java.io.File -import collection.JavaConversions._ -import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather} /** * A command line that will be run in a pipeline. @@ -21,12 +17,6 @@ trait CommandLineFunction extends QFunction with Logging { /** Whether a job is restartable */ var jobRestartable = true - /** Prefix for automatic job name creation */ - var jobNamePrefix: String = _ - - /** The name name of the job */ - var jobName: String = _ - /** Job project to run the command */ var jobProject: String = _ @@ -36,50 +26,18 @@ trait CommandLineFunction extends QFunction with Logging { /** Extra arguments to specify on the command line */ var extraArgs: List[String] = Nil - /** Temporary directory to write any files */ - var jobTempDir: File = IOUtils.javaTempDir - - /** File to redirect any output. Defaults to .out */ - @Output(doc="File to redirect any output", required=false) - @Gather(classOf[SimpleTextGatherFunction]) - var jobOutputFile: File = _ - - /** File to redirect any errors. Defaults to .out */ - @Output(doc="File to redirect any errors", required=false) - @Gather(classOf[SimpleTextGatherFunction]) - var jobErrorFile: File = _ - /** * Returns set of directories required to run the command. * @return Set of directories required to run the command. */ - def jobDirectories = { - var dirs = Set.empty[File] - dirs += commandDirectory - if (jobTempDir != null) - dirs += jobTempDir - dirs ++= inputs.map(_.getParentFile) - dirs ++= outputs.map(_.getParentFile) - dirs - } - - override def useStatusOutput(file: File) = - file != jobOutputFile && file != jobErrorFile + def jobDirectories = outputDirectories ++ inputs.map(_.getParentFile) override def description = commandLine - /** - * The function description in .dot files - */ - override def dotString = jobName + " => " + commandLine - /** * Sets all field values. */ override def freezeFieldValues = { - if (jobNamePrefix == null) - jobNamePrefix = qSettings.jobNamePrefix - if (jobQueue == null) jobQueue = qSettings.jobQueue @@ -89,12 +47,6 @@ trait CommandLineFunction extends QFunction with Logging { if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined) memoryLimit = qSettings.memoryLimit - if (jobName == null) - jobName = CommandLineFunction.nextJobName(jobNamePrefix) - - if (jobOutputFile == null) - jobOutputFile = new File(jobName + ".out") - super.freezeFieldValues } @@ -143,21 +95,3 @@ trait CommandLineFunction extends QFunction with Logging { }) + suffix } - -/** - * A command line that will be run in a pipeline. - */ -object CommandLineFunction { - /** Job index counter for this run of Queue. */ - private var jobIndex = 0 - - /** - * Returns the next job name using the prefix. - * @param prefix Prefix of the job name. - * @return the next job name. - */ - private def nextJobName(prefix: String) = { - jobIndex += 1 - prefix + "-" + jobIndex - } -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala index 078907386..078699e70 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/InProcessFunction.scala @@ -1,12 +1,9 @@ package org.broadinstitute.sting.queue.function -import java.io.File - /** * Runs a function in process. */ trait InProcessFunction extends QFunction { def run() - def useStatusOutput(file: File) = true - def description = (List(this.getClass.getSimpleName) ++ this.outputs.map(_.getAbsolutePath)).mkString(" ") + def description = (List(this.getClass.getSimpleName) ++ this.outputs.filter(file => useStatusOutput(file)).map(_.getAbsolutePath)).mkString(" ") } diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index ced1ffa7a..b089d7aa0 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -6,6 +6,7 @@ import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.queue.util.{CollectionUtils, IOUtils, ReflectionUtils} import org.broadinstitute.sting.queue.{QException, QSettings} import collection.JavaConversions._ +import org.broadinstitute.sting.queue.function.scattergather.{Gather, SimpleTextGatherFunction} /** * The base interface for all functions in Queue. @@ -18,12 +19,31 @@ trait QFunction { */ var analysisName: String = _ + /** Prefix for automatic job name creation */ + var jobNamePrefix: String = _ + + /** The name name of the job */ + var jobName: String = _ + /** Default settings */ var qSettings: QSettings = _ /** Directory to run the command in. */ var commandDirectory: File = IOUtils.CURRENT_DIR + /** Temporary directory to write any files */ + var jobTempDir: File = IOUtils.javaTempDir + + /** File to redirect any output. Defaults to .out */ + @Output(doc="File to redirect any output", required=false) + @Gather(classOf[SimpleTextGatherFunction]) + var jobOutputFile: File = _ + + /** File to redirect any errors. Defaults to .out */ + @Output(doc="File to redirect any errors", required=false) + @Gather(classOf[SimpleTextGatherFunction]) + var jobErrorFile: File = _ + /** * Description of this command line function. */ @@ -32,7 +52,7 @@ trait QFunction { /** * The function description in .dot files */ - def dotString = "" + def dotString = jobName + " => " + description /** * Returns true if the function is done, false if it's @@ -62,7 +82,8 @@ trait QFunction { * Returns true if the file should be used for status output. * @return true if the file should be used for status output. */ - def useStatusOutput(file: File): Boolean + def useStatusOutput(file: File) = + file != jobOutputFile && file != jobErrorFile /** * Returns the output files for this function. @@ -110,6 +131,28 @@ trait QFunction { */ def outputs = getFieldFiles(outputFields) + /** + * Returns the set of directories where files may be written. + */ + def outputDirectories = { + var dirs = Set.empty[File] + dirs += commandDirectory + if (jobTempDir != null) + dirs += jobTempDir + dirs ++= outputs.map(_.getParentFile) + dirs + } + + /** + * Creates the output directories for this function if it doesn't exist. + */ + def mkOutputDirectories() = { + outputDirectories.foreach(dir => { + if (!dir.exists && !dir.mkdirs) + throw new QException("Unable to create directory: " + dir) + }) + } + /** * Returns fields that do not have values which are required. * @return List[String] names of fields missing values. @@ -186,33 +229,6 @@ trait QFunction { case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown)) } - /** - * Resets the field to the temporary directory. - * @param field Field to get and set the file. - * @param tempDir new root for the file. - */ - def resetFieldFile(field: ArgumentSource, tempDir: File): File = { - getFieldValue(field) match { - case fileExtension: FileExtension => { - val newFile = IOUtils.resetParent(tempDir, fileExtension) - val newFileExtension = fileExtension.withPath(newFile.getPath) - setFieldValue(field, newFileExtension) - newFileExtension - } - case file: File => { - if (file.getClass != classOf[File]) - throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified."); - val newFile = IOUtils.resetParent(tempDir, file) - setFieldValue(field, newFile) - newFile - } - case null => null - case unknown => - throw new QException("Unable to set file from %s: %s".format(field, unknown)) - } - } - - /** * After a function is frozen no more updates are allowed by the user. * The function is allow to make necessary updates internally to make sure @@ -223,7 +239,19 @@ trait QFunction { canonFieldValues } + /** + * Sets all field values. + */ def freezeFieldValues = { + if (jobNamePrefix == null) + jobNamePrefix = qSettings.jobNamePrefix + + if (jobName == null) + jobName = QFunction.nextJobName(jobNamePrefix) + + if (jobOutputFile == null) + jobOutputFile = new File(jobName + ".out") + commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory) } @@ -346,6 +374,19 @@ trait QFunction { } object QFunction { + /** Job index counter for this run of Queue. */ + private var jobIndex = 0 + + /** + * Returns the next job name using the prefix. + * @param prefix Prefix of the job name. + * @return the next job name. + */ + private def nextJobName(prefix: String) = { + jobIndex += 1 + prefix + "-" + jobIndex + } + /** * The list of fields defined on a class * @param clazz The class to lookup fields. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala deleted file mode 100644 index 0e0123b16..000000000 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CleanupTempDirsFunction.scala +++ /dev/null @@ -1,22 +0,0 @@ -package org.broadinstitute.sting.queue.function.scattergather - -import java.io.File -import org.broadinstitute.sting.commandline.Input -import org.broadinstitute.sting.queue.function.InProcessFunction -import org.apache.commons.io.FileUtils - -/** - * Removes the temporary directories for scatter / gather. - * The script can be changed by setting rmdirScript. - * By default uses rm -rf. - * The format of the call is [.. ] - */ -class CleanupTempDirsFunction extends InProcessFunction { - @Input(doc="Original outputs of the gather functions") - var originalOutputs: Set[File] = Set.empty[File] - - @Input(doc="Temporary directories to be deleted") - var tempDirectories: List[File] = Nil - - def run() = tempDirectories.foreach(FileUtils.deleteDirectory(_)) -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index 11ccff984..2963484ea 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -52,33 +52,31 @@ class CloneFunction extends CommandLineFunction { this.jobProject = originalFunction.jobProject if (this.jobName == null) this.jobName = originalFunction.jobName - if (this.jobOutputFile == null) - this.jobOutputFile = overriddenFile("jobOutputFile").get - if (this.jobErrorFile == null) - this.jobErrorFile = overriddenFile("jobErrorFile").getOrElse(null) super.freezeFieldValues } def commandLine = withScatterPart(() => originalFunction.commandLine) override def getFieldValue(source: ArgumentSource) = { - overriddenFields.get(source) match { - case Some(value) => value.asInstanceOf[AnyRef] - case None => { - val value = originalFunction.getFieldValue(source) - overriddenFields += source -> value - value + source.field.getName match { + case "jobOutputFile" => jobOutputFile + case "jobErrorFile" => jobErrorFile + case _ => overriddenFields.get(source) match { + case Some(value) => value.asInstanceOf[AnyRef] + case None => { + val value = originalFunction.getFieldValue(source) + overriddenFields += source -> value + value + } } } } override def setFieldValue(source: ArgumentSource, value: Any) = { - overriddenFields += source -> value - } - - private def overriddenFile(name: String) = { - overriddenFields - .find{case (key, _) => key.field.getName == name} - .map{case (_, value) => value.asInstanceOf[File]} + source.field.getName match { + case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File] + case "jobErrorFile" => jobErrorFile = value.asInstanceOf[File] + case _ => overriddenFields += source -> value + } } } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala deleted file mode 100644 index 0d1842b52..000000000 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CreateTempDirsFunction.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.broadinstitute.sting.queue.function.scattergather - -import java.io.File -import org.broadinstitute.sting.commandline.{Output, Input} -import org.broadinstitute.sting.queue.function.InProcessFunction - -/** - * Creates the temporary directories for scatter / gather. - * The script can be changed by setting mkdirScript. - * By default uses mkdir -pv - * The format of the call is [.. ] - */ -class CreateTempDirsFunction extends InProcessFunction { - @Input(doc="Original inputs to the scattered function") - var originalInputs: Set[File] = Set.empty[File] - - @Output(doc="Temporary directories to create") - var tempDirectories: List[File] = Nil - - override def useStatusOutput(file: File) = false - - override def isDone = Some(tempDirectories.forall(_.exists)) - - def run() = tempDirectories.foreach(_.mkdirs) -} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index 9203a693d..bf6d026b5 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -14,9 +14,6 @@ trait ScatterFunction extends QFunction { @Output(doc="Scattered parts of the original input, one per temp directory") var scatterParts: List[File] = Nil - @Input(doc="Temporary directories for each scatter part") - var tempDirectories: List[File] = Nil - /** * Sets the original function used to create this scatter function. * @param originalFunction The ScatterGatherableFunction. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 01ba6fe95..fdf7c5e88 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -16,12 +16,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** scatter gather directory */ var scatterGatherDirectory: File = _ - /** cleanup temporary directories */ - var cleanupTempDirectories = false - - /** Class to use for creating temporary directories. Defaults to CreateTempDirsFunction. */ - var createTempDirsClass: Class[_ <: CreateTempDirsFunction] = _ - /** Class to use for scattering. Defaults to the annotation used in the @Scatter tag. */ var scatterClass: Class[_ <: ScatterFunction] = _ @@ -32,16 +26,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ var gatherClass: PartialFunction[ArgumentSource, Class[_ <: GatherFunction]] = _ - /** Class to use for removing temporary directories. Defaults to CleanupTempDirsFunction. */ - var cleanupTempDirsClass: Class[_ <: CleanupTempDirsFunction] = _ - - /** - * Allows external modification of the CreateTempDirsFunction that will create the temporary directories. - * @param initializeFunction The function that will create the temporary directories. - * @param inputFields The input fields that the original function was dependent on. - */ - var setupInitializeFunction: PartialFunction[(CreateTempDirsFunction, List[ArgumentSource]), Unit] = _ - /** * Allows external modification of the ScatterFunction that will create the scatter pieces in the temporary directories. * @param scatterFunction The function that will create the scatter pieces in the temporary directories. @@ -63,14 +47,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { */ var setupCloneFunction: PartialFunction[(CloneFunction, Int), Unit] = _ - /** - * Allows external modification of the CleanupTempDirsFunction that will remove the temporary directories. - * @param cleanupFunction The function that will remove the temporary directories. - * @param gatherFunctions The functions that will gather up the original output fields. - * @param outputFields The output fields that the original function was dependent on. - */ - var setupCleanupFunction: PartialFunction[(CleanupTempDirsFunction, Map[ArgumentSource, GatherFunction], List[ArgumentSource]), Unit] = _ - /** * Returns true if the function is ready to be scatter / gathered. * The base implementation checks if the scatter count is greater than one, @@ -93,57 +69,73 @@ trait ScatterGatherableFunction extends CommandLineFunction { val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_)) // Only gather up fields that will have a value val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_)) + // The field containing the file to split + val originalInput = getFieldFile(scatterField) // Create the scatter function based on @Scatter val scatterFunction = this.newScatterFunction(this.scatterField) + scatterFunction.analysisName = this.analysisName + scatterFunction.qSettings = this.qSettings + scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) + scatterFunction.originalInput = originalInput + scatterFunction.setOriginalFunction(this, scatterField) initScatterFunction(scatterFunction, this.scatterField) tempDirectories :+= scatterFunction.commandDirectory functions :+= scatterFunction // Create the gather functions for each output field var gatherFunctions = Map.empty[ArgumentSource, GatherFunction] + var gatherOutputs = Map.empty[ArgumentSource, File] for (gatherField <- outputFieldsWithValues) { val gatherFunction = this.newGatherFunction(gatherField) + val gatherOutput = getFieldFile(gatherField) + gatherFunction.analysisName = this.analysisName + gatherFunction.qSettings = this.qSettings + gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) + gatherFunction.originalOutput = this.getFieldFile(gatherField) + gatherFunction.setOriginalFunction(this, gatherField) initGatherFunction(gatherFunction, gatherField) tempDirectories :+= gatherFunction.commandDirectory functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction + gatherOutputs += gatherField -> gatherOutput } // Create the clone functions for running the parallel jobs var cloneFunctions = List.empty[CloneFunction] for (i <- 1 to this.scatterCount) { val cloneFunction = this.newCloneFunction() + + cloneFunction.originalFunction = this + cloneFunction.index = i + + // Setup the fields on the clone function, outputing each as a relative file in the sg directory. + cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i) + var scatterPart = new File(originalInput.getName) + cloneFunction.setFieldValue(scatterField, scatterPart) + for (gatherField <- outputFieldsWithValues) { + val gatherPart = new File(gatherOutputs(gatherField).getName) + cloneFunction.setFieldValue(gatherField, gatherPart) + } + + // Allow the script writer to change the paths to the files. initCloneFunction(cloneFunction, i) + + // Get absolute paths to the files and bind the sg functions to the clone function via the absolute paths. + scatterPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(scatterField)) + cloneFunction.setFieldValue(scatterField, scatterPart) + scatterFunction.scatterParts :+= scatterPart + for (gatherField <- outputFieldsWithValues) { + val gatherPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(gatherField)) + cloneFunction.setFieldValue(gatherField, gatherPart) + gatherFunctions(gatherField).gatherParts :+= gatherPart + } + cloneFunctions :+= cloneFunction tempDirectories :+= cloneFunction.commandDirectory - - bindCloneFunctionScatter(scatterFunction, this.scatterField, cloneFunction, i) - // For each each output field, change value to the scatterGatherTempDir dir and feed it into the gatherer - for (gatherField <- outputFieldsWithValues) - bindCloneFunctionGather(gatherFunctions(gatherField), gatherField, cloneFunction, i) } functions ++= cloneFunctions - // Create a function to create all of the scatterGatherTempDir directories. - // All of its inputs are the inputs of the original function. - val initializeFunction = this.newInitializeFunction() - initInitializeFunction(initializeFunction, inputFieldsWithValues) - - // Create a function that will remove any temporary items - // All of its inputs are the outputs of the original function. - var cleanupFunction = newCleanupFunction() - initCleanupFunction(cleanupFunction, gatherFunctions, outputFieldsWithValues) - - // Set the temporary directories, for the initialize function as outputs for scatter and cleanup as inputs. - initializeFunction.tempDirectories = tempDirectories - scatterFunction.tempDirectories = tempDirectories - cleanupFunction.tempDirectories = tempDirectories - - functions +:= initializeFunction - if (this.cleanupTempDirectories) - functions :+= cleanupFunction - // Return all the various functions we created functions } @@ -167,41 +159,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected lazy val scatterField = this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get - /** - * Retrieves the original field value for the scatter field. - */ - protected lazy val originalInput = getFieldFile(scatterField) - - /** - * Creates a new initialize CreateTempDirsFunction that will create the temporary directories. - * @return A CreateTempDirsFunction that will create the temporary directories. - */ - protected def newInitializeFunction(): CreateTempDirsFunction = { - if (createTempDirsClass != null) - this.createTempDirsClass.newInstance - else - new CreateTempDirsFunction - } - - /** - * Initializes the CreateTempDirsFunction that will create the temporary directories. - * The initializeFunction qSettings is set so that the CreateTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction. - * The initializeFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction. - * The initializeFunction is modified to become dependent on the input files for this ScatterGatherableFunction. - * Calls setupInitializeFunction with initializeFunction. - * @param initializeFunction The function that will create the temporary directories. - * @param inputFields The input fields that the original function was dependent on. - */ - protected def initInitializeFunction(initializeFunction: CreateTempDirsFunction, inputFields: List[ArgumentSource]) = { - initializeFunction.qSettings = this.qSettings - initializeFunction.commandDirectory = this.commandDirectory - for (inputField <- inputFields) - initializeFunction.originalInputs ++= this.getFieldFiles(inputField) - if (this.setupInitializeFunction != null) - if (this.setupInitializeFunction.isDefinedAt(initializeFunction, inputFields)) - this.setupInitializeFunction(initializeFunction, inputFields) - } - /** * Creates a new ScatterFunction for the scatterField. * @param scatterField Field that defined @Scatter. @@ -217,19 +174,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories. - * The scatterFunction qSettings is set so that the ScatterFunction runs with the same prefix, etc. as this ScatterGatherableFunction. - * The scatterFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory. - * The scatterFunction has it's originalInput set with the file to be scattered into scatterCount pieces. - * Calls scatterFunction.setOriginalFunction with this ScatterGatherableFunction. * Calls setupScatterFunction with scatterFunction. * @param scatterFunction The function that will create the scatter pieces in the temporary directories. * @param scatterField The input field being scattered. */ protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = { - scatterFunction.qSettings = this.qSettings - scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) - scatterFunction.originalInput = this.originalInput - scatterFunction.setOriginalFunction(this, scatterField) if (this.setupScatterFunction != null) if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField)) this.setupScatterFunction(scatterFunction, scatterField) @@ -253,20 +202,12 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Initializes the GatherFunction created by newGatherFunction() that will collect the gather pieces in the temporary directories. - * The gatherFunction qSettings is set so that the GatherFunction runs with the same prefix, etc. as this ScatterGatherableFunction. - * The gatherFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory. - * The gatherFunction has it's originalOutput set with the file to be gathered from the scatterCount pieces. * Calls the gatherFunction.setOriginalFunction with this ScatterGatherableFunction. * Calls setupGatherFunction with gatherFunction. * @param gatherFunction The function that will merge the gather pieces from the temporary directories. * @param gatherField The output field being gathered. */ protected def initGatherFunction(gatherFunction: GatherFunction, gatherField: ArgumentSource) = { - gatherFunction.qSettings = this.qSettings - gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) - gatherFunction.originalOutput = this.getFieldFile(gatherField) - gatherFunction.setOriginalFunction(this, gatherField) - gatherFunction.analysisName = this.analysisName if (this.setupGatherFunction != null) if (this.setupGatherFunction.isDefinedAt(gatherFunction, gatherField)) this.setupGatherFunction(gatherFunction, gatherField) @@ -279,83 +220,16 @@ trait ScatterGatherableFunction extends CommandLineFunction { protected def newCloneFunction() = new CloneFunction /** - * Initializes the cloned function created by newCloneFunction() by setting it's commandDirectory to a temporary directory under scatterDirectory. * Calls setupCloneFunction with cloneFunction. * @param cloneFunction The clone of this ScatterGatherableFunction * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. */ protected def initCloneFunction(cloneFunction: CloneFunction, index: Int) = { - cloneFunction.originalFunction = this - cloneFunction.index = index - cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+index) if (this.setupCloneFunction != null) if (this.setupCloneFunction.isDefinedAt(cloneFunction, index)) this.setupCloneFunction(cloneFunction, index) } - /** - * Joins a piece of the ScatterFunction output to the cloned function's input. - * The input of the clone is changed to be in the output directory of the clone. - * The scatter function piece is added as an output of the scatterFunction. - * The clone function's original input is changed to use the piece from the output directory. - * Finally the scatterFunction.setCloneFunction is called with the clone of this ScatterGatherableFunction. - * @param scatterFunction Function that will create the pieces including the piece that will go to cloneFunction. - * @param scatterField The field to be scattered. - * @param cloneFunction Clone of this ScatterGatherableFunction. - * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. - */ - protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { - // Reset the input of the clone to the the scatterGatherTempDir dir and add it as an output of the scatter - val scatterPart = IOUtils.resetParent(cloneFunction.commandDirectory, scatterFunction.originalInput) - scatterFunction.scatterParts :+= scatterPart - cloneFunction.setFieldValue(scatterField, scatterPart) - scatterFunction.setCloneFunction(cloneFunction, index, scatterField) - } - - /** - * Joins the cloned function's output as a piece of the GatherFunction's input. - * Finally the scatterFunction.setCloneFunction is called with the clone of this ScatterGatherableFunction. - * @param cloneFunction Clone of this ScatterGatherableFunction. - * @param gatherFunction Function that will create the pieces including the piece that will go to cloneFunction. - * @param gatherField The field to be gathered. - */ - protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = { - val gatherPart = cloneFunction.resetFieldFile(gatherField, cloneFunction.commandDirectory) - gatherFunction.gatherParts :+= gatherPart - gatherFunction.setCloneFunction(cloneFunction, index, gatherField) - } - - /** - * Creates a new function that will remove the temporary directories. - * @return A CleanupTempDirs function that will remove the temporary directories. - */ - protected def newCleanupFunction(): CleanupTempDirsFunction = { - if (cleanupTempDirsClass != null) - this.cleanupTempDirsClass.newInstance - else - new CleanupTempDirsFunction - } - - /** - * Initializes the CleanupTempDirsFunction created by newCleanupFunction() that will remove the temporary directories. - * The cleanupFunction qSettings is set so that the CleanupTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction. - * The cleanupFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction. - * The initializeFunction is modified to become dependent on the output files for this ScatterGatherableFunction. - * Calls setupCleanupFunction with cleanupFunction. - * @param cleanupFunction The function that will remove the temporary directories. - * @param gatherFunctions The functions that will gather up the original output fields. - * @param outputFields The output fields that the original function was dependent on. - */ - protected def initCleanupFunction(cleanupFunction: CleanupTempDirsFunction, gatherFunctions: Map[ArgumentSource, GatherFunction], outputFields: List[ArgumentSource]) = { - cleanupFunction.qSettings = this.qSettings - cleanupFunction.commandDirectory = this.commandDirectory - for (gatherField <- outputFields) - cleanupFunction.originalOutputs += gatherFunctions(gatherField).originalOutput - if (this.setupCleanupFunction != null) - if (this.setupCleanupFunction.isDefinedAt(cleanupFunction, gatherFunctions, outputFields)) - this.setupCleanupFunction(cleanupFunction, gatherFunctions, outputFields) - } - /** * Returns a temporary directory under this scatter gather directory. * @param Sub directory under the scatter gather directory. diff --git a/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala b/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala index d950ad0ff..3ed8deb0e 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/EmailMessage.scala @@ -95,7 +95,7 @@ class EmailMessage extends Logging { } override def toString = { - """| + """ |From: %s |To: %s |Cc: %s @@ -106,7 +106,7 @@ class EmailMessage extends Logging { | |Attachments: |%s - |""".stripMargin.trim.format( + |""".stripMargin.format( this.from, this.to.mkString(", "), this.cc.mkString(", "), this.bcc.mkString(", "), this.subject, this.body, diff --git a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala index e61a85677..56c6133a1 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/IOUtils.scala @@ -10,6 +10,8 @@ object IOUtils { /** The current directory "." */ val CURRENT_DIR = new File(".") + val CURRENT_DIR_ABS = absolute(CURRENT_DIR) + /** * Returns the sub path rooted at the parent. * If the sub path is already absolute, returns the sub path. @@ -20,8 +22,8 @@ object IOUtils { * @param path The sub path to append to the parent, if the path is not absolute. * @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path. */ - def subDir(dir: File, path: String): File = - subDir(dir, new File(path)) + def subDir(parent: File, path: String): File = + subDir(parent, new File(path)) /** * Returns the sub path rooted at the parent. @@ -36,25 +38,16 @@ object IOUtils { def subDir(parent: File, file: File): File = { val parentAbs = absolute(parent) val fileAbs = absolute(file) - val currentAbs = absolute(CURRENT_DIR) - if (parentAbs == currentAbs && fileAbs == currentAbs) - absolute(CURRENT_DIR.getCanonicalFile) - else if (parentAbs == currentAbs || file.isAbsolute) + if (parentAbs == CURRENT_DIR_ABS && fileAbs == CURRENT_DIR_ABS) + CURRENT_DIR_ABS + else if (parentAbs == CURRENT_DIR_ABS || file.isAbsolute) fileAbs - else if (fileAbs == currentAbs) + else if (fileAbs == CURRENT_DIR_ABS) parentAbs else absolute(new File(parentAbs, file.getPath)) } - /** - * Resets the parent of the file to the directory. - * @param dir New parent directory. - * @param file Path to the file to be re-rooted. - * @return Absolute path to the new file. - */ - def resetParent(dir: File, file: File) = absolute(subDir(dir, file.getName)) - /** * Returns the temp directory as defined by java. * @return the temp directory as defined by java. diff --git a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala index 6982f4978..8c0428a76 100644 --- a/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala +++ b/scala/test/org/broadinstitute/sting/queue/util/IOUtilsUnitTest.scala @@ -41,12 +41,6 @@ class IOUtilsUnitTest extends BaseTest { Assert.assertEquals(new File("/path/../to/file"), subDir) } - @Test - def testResetParent = { - val newFile = IOUtils.resetParent(new File("/new/parent/dir"), new File("/old/parent_dir/file.name")) - Assert.assertEquals(new File("/new/parent/dir/file.name"), newFile) - } - @Test def testTempDir = { val tempDir = IOUtils.tempDir("Q-Unit-Test")