While bkill'ing on the shutdown thread Queue will no longer try to submit more jobs on the original thread.

Updated pipeline output structure to current recommendations by Corin.
Directories are now automatically before the function runs.
Fixed several bugs with scatter gather binding when the script author needs to change the directories.
Fixed bug with tracking of log files for CloneFunctions.
More error handling and logging of exceptions (good test environment while LSF was down this early AM!)
Removed cleanup utility for scatter gather.  SG Output structure has changed significantly.  Will need to discuss and find a better approach for Queue programatically deleting files.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4504 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-15 17:01:36 +00:00
parent 42c3d74432
commit 7157cb9090
19 changed files with 449 additions and 538 deletions

View File

@ -1,10 +1,13 @@
import net.sf.picard.reference.FastaSequenceFile
import org.broadinstitute.sting.commandline.ArgumentSource
import org.broadinstitute.sting.datasources.pipeline.Pipeline
import org.broadinstitute.sting.gatk.walkers.genotyper.GenotypeCalculationModel.Model
import org.broadinstitute.sting.gatk.{CommandLineGATK, DownsampleType}
import org.broadinstitute.sting.gatk.DownsampleType
import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
import org.broadinstitute.sting.queue.extensions.samtools._
import org.broadinstitute.sting.queue.function.scattergather.{GatherFunction, CloneFunction, ScatterFunction}
import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.{QException, QScript}
import collection.JavaConversions._
import org.broadinstitute.sting.utils.yaml.YamlUtils
@ -61,7 +64,7 @@ class fullCallingPipeline extends QScript {
this.intervals = qscript.pipeline.getProject.getIntervalList
this.jarFile = qscript.gatkJar
this.reference_sequence = qscript.pipeline.getProject.getReferenceFile
this.memoryLimit = Some(6)
this.memoryLimit = Some(4)
}
@ -88,30 +91,36 @@ class fullCallingPipeline extends QScript {
}
for ( sample <- recalibratedSamples ) {
val sampleId = sample.getId
// put unclean bams in unclean genotypers in advance, create the extension files
val bam = sample.getBamFiles.get("recalibrated")
if (!sample.getBamFiles.contains("cleaned")) {
sample.getBamFiles.put("cleaned", swapExt(bam,"bam","cleaned.bam"))
sample.getBamFiles.put("cleaned", swapExt("CleanedBams", bam,"bam","cleaned.bam"))
}
val cleaned_bam = sample.getBamFiles.get("cleaned")
val indel_targets = swapExt(bam,"bam","realigner_targets.interval_list")
val indel_targets = swapExt("CleanedBams/IntermediateFiles/"+sampleId, bam,"bam","realigner_targets.interval_list")
// create the cleaning commands
val targetCreator = new RealignerTargetCreator with CommandLineGATKArgs
targetCreator.analysisName = "CreateTargets_"+bam.getName
targetCreator.jobOutputFile = new File(".queue/logs/Cleaning/%s/RealignerTargetCreator.out".format(sampleId))
targetCreator.jobName = "CreateTargets_"+sampleId
targetCreator.analysisName = "CreateTargets_"+sampleId
targetCreator.input_file :+= bam
targetCreator.out = indel_targets
targetCreator.memoryLimit = Some(2)
val realigner = new IndelRealigner with CommandLineGATKArgs
realigner.analysisName = "RealignBam_"+bam.getName
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
realigner.analysisName = "RealignBam_"+sampleId
realigner.input_file = targetCreator.input_file
realigner.intervals = qscript.contigIntervals
realigner.targetIntervals = new java.io.File(targetCreator.out.getAbsolutePath)
realigner.targetIntervals = targetCreator.out
realigner.scatterCount = contigCount
// may need to explicitly run fix mates
var fixMates = new PicardBamJarFunction {
jobOutputFile = new File(".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
// Declare inputs/outputs for dependency tracking.
@Input(doc="unfixed bam") var unfixed: File = _
@Output(doc="fixed bam") var fixed: File = _
@ -129,15 +138,30 @@ class fullCallingPipeline extends QScript {
realigner.out = cleaned_bam
// While gathering run fix mates.
realigner.scatterClass = classOf[ContigScatterFunction]
realigner.setupScatterFunction = {
case (scatter: ScatterFunction, _) =>
scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId))
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId))
}
realigner.setupCloneFunction = {
case (clone: CloneFunction, index: Int) =>
clone.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Scatter_%s".format(sampleId, index))
clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter_%s.out".format(sampleId, index))
}
realigner.setupGatherFunction = {
case (gather: BamGatherFunction, _) =>
case (gather: BamGatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/FixMates.out".format(sampleId))
gather.memoryLimit = Some(6)
gather.jarFile = qscript.picardFixMatesJar
// Don't pass this AS=true to fix mates!
gather.assumeSorted = None
case (gather: GatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather/Gather_%s".format(sampleId, source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Gather_%s.out".format(sampleId, source.field.getName))
}
} else {
realigner.out = swapExt(bam,"bam","unfixed.cleaned.bam")
realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam")
// Explicitly run fix mates if the function won't be scattered.
@ -145,13 +169,14 @@ class fullCallingPipeline extends QScript {
fixMates.jarFile = qscript.picardFixMatesJar
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+bam.getName
fixMates.analysisName = "FixMates_"+sampleId
// Add the fix mates explicitly
}
var samtoolsindex = new SamtoolsIndexFunction
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_"+cleaned_bam.getName
samtoolsindex.analysisName = "index_cleaned_"+sampleId
if (!qscript.skip_cleaning) {
if ( realigner.scatterCount > 1 ) {
@ -162,35 +187,30 @@ class fullCallingPipeline extends QScript {
}
}
val recalibratedBamFiles = recalibratedSamples
.map(_.getBamFiles.get("recalibrated"))
.toList
val cleanBamFiles = qscript.pipeline.getSamples
.filter(_.getBamFiles.contains("cleaned"))
.map(_.getBamFiles.get("cleaned"))
.toList
// actually make calls
if (!qscript.skip_cleaning) {
//endToEnd(cleanedBase, cleanBamFiles, adprRscript, seq, expKind)
endToEnd(cleanedBase, cleanBamFiles)
//endToEnd(cleanedBase, "cleaned", adprRscript, seq, expKind)
endToEnd(cleanedBase, "cleaned")
} else {
//endToEnd(uncleanedBase, recalibratedBamFiles, adprRscript, seq, expKind)
endToEnd(uncleanedBase, recalibratedBamFiles)
//endToEnd(uncleanedBase, "recalibrated", adprRscript, seq, expKind)
endToEnd(uncleanedBase, "recalibrated")
}
}
//def endToEnd(base: String, bamFiles: List[File], adprthing: File, seqinfo: String, exptype: String) = {
def endToEnd(base: String, bamFiles: List[File]) = {
//def endToEnd(base: String, bamType: String, adprthing: File, seqinfo: String, exptype: String) = {
def endToEnd(base: String, bamType: String) = {
val samples = qscript.pipeline.getSamples.filter(_.getBamFiles.contains(bamType)).toList
val bamFiles = samples.map(_.getBamFiles.get(bamType))
// step through the un-indel-cleaned graph:
// 1a. call snps and indels
val snps = new UnifiedGenotyper with CommandLineGATKArgs
snps.jobOutputFile = new File(".queue/logs/SNPCalling/UnifiedGenotyper.out")
snps.analysisName = base+"_SNP_calls"
snps.input_file = bamFiles
snps.group :+= "Standard"
snps.out = new File(base+".vcf")
snps.out = new File("SnpCalls", base+".vcf")
snps.standard_min_confidence_threshold_for_emitting = Some(10)
snps.min_mapping_quality_score = Some(20)
snps.min_base_quality_score = Some(20)
@ -210,17 +230,36 @@ class fullCallingPipeline extends QScript {
//}
snps.scatterCount = qscript.num_snp_scatter_jobs
snps.setupScatterFunction = {
case (scatter: ScatterFunction, _) =>
scatter.commandDirectory = new File("SnpCalls/ScatterGather")
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter.out")
}
snps.setupCloneFunction = {
case (clone: CloneFunction, index: Int) =>
clone.commandDirectory = new File("SnpCalls/ScatterGather/Scatter_%s".format(index))
clone.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter_%s.out".format(index))
}
snps.setupGatherFunction = {
case (gather: GatherFunction, source: ArgumentSource) =>
gather.commandDirectory = new File("SnpCalls/ScatterGather/Gather_%s".format(source.field.getName))
gather.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Gather_%s.out".format(source.field.getName))
}
// indel genotyper does one sample at a time
var indelCallFiles = List.empty[RodBind]
var indelGenotypers = List.empty[IndelGenotyperV2 with CommandLineGATKArgs]
var loopNo = 0
var priority = ""
for ( bam <- bamFiles ) {
for ( sample <- samples ) {
val sampleId = sample.getId
val bam = sample.getBamFiles.get(bamType)
var indel = new IndelGenotyperV2 with CommandLineGATKArgs
indel.analysisName = "IndelGenotyper_"+bam.getName
indel.jobOutputFile = new File(".queue/logs/IndelCalling/%s/IndelGenotyperV2.out".format(sampleId))
indel.analysisName = "IndelGenotyper_"+sampleId
indel.input_file :+= bam
indel.out = swapExt(bam,".bam",".indels.vcf")
indel.out = swapExt("IndelCalls/IntermediateFiles/" + sampleId, bam,".bam",".indels.vcf")
indel.downsample_to_coverage = Some(qscript.downsampling_coverage)
indelCallFiles :+= RodBind("v"+loopNo.toString, "VCF", indel.out)
//indel.scatterCount = qscript.num_indel_scatter_jobs
@ -235,49 +274,56 @@ class fullCallingPipeline extends QScript {
loopNo += 1
}
val mergeIndels = new CombineVariants with CommandLineGATKArgs
mergeIndels.out = new TaggedFile(qscript.pipeline.getProject.getName+".indels.vcf","vcf")
mergeIndels.jobOutputFile = new File(".queue/logs/IndelCalling/CombineVariants.out")
mergeIndels.out = new TaggedFile("IndelCalls/" + qscript.pipeline.getProject.getName+".indels.vcf","vcf")
mergeIndels.genotypemergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.GenotypeMergeType.UNIQUIFY)
mergeIndels.priority = priority
mergeIndels.variantmergeoption = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.UNION)
mergeIndels.rodBind = indelCallFiles
mergeIndels.analysisName = base+"_MergeIndels"
mergeIndels.memoryLimit = Some(16)
mergeIndels.jobQueue = "gsa"
// 1b. genomically annotate SNPs -- no longer slow
val annotated = new GenomicAnnotator with CommandLineGATKArgs
annotated.jobOutputFile = new File(".queue/logs/SNPCalling/GenomicAnnotator.out")
annotated.rodBind :+= RodBind("variant", "VCF", snps.out)
annotated.rodBind :+= RodBind("refseq", "AnnotatorInputTable", qscript.refseqTable)
//annotated.rodBind :+= RodBind("dbsnp", "AnnotatorInputTable", qscript.dbsnpTable)
annotated.out = swapExt(snps.out,".vcf",".annotated.vcf")
annotated.out = swapExt("SnpCalls",snps.out,".vcf",".annotated.vcf")
//annotated.select :+= "dbsnp.name,dbsnp.refUCSC,dbsnp.strand,dbsnp.observed,dbsnp.avHet"
annotated.rodToIntervalTrackName = "variant"
annotated.analysisName = base+"_GenomicAnnotator"
// 2.a filter on cluster and near indels
val masker = new VariantFiltration with CommandLineGATKArgs
masker.jobOutputFile = new File(".queue/logs/SNPCalling/Masker.out")
masker.variantVCF = annotated.out
masker.rodBind :+= RodBind("mask", "VCF", mergeIndels.out)
masker.maskName = "NearIndel"
masker.clusterWindowSize = Some(10)
masker.clusterSize = Some(3)
masker.out = swapExt(annotated.out,".vcf",".indel.masked.vcf")
masker.out = swapExt("SnpCalls",annotated.out,".vcf",".indel.masked.vcf")
masker.analysisName = base+"_Cluster_and_Indel_filter"
// 2.b hand filter with standard filter
val handFilter = new VariantFiltration with CommandLineGATKArgs
handFilter.jobOutputFile = new File(".queue/logs/SNPCalling/HandFilter.out")
handFilter.variantVCF = masker.out
handFilter.rodBind :+= RodBind("mask", "VCF", mergeIndels.out)
handFilter.filterName ++= List("StrandBias","AlleleBalance","QualByDepth","HomopolymerRun")
handFilter.filterExpression ++= List("\"SB>=0.10\"","\"AB>=0.75\"","\"QD<5.0\"","\"HRun>=4\"")
handFilter.out = swapExt(annotated.out,".vcf",".handfiltered.vcf")
handFilter.out = swapExt("SnpCalls",annotated.out,".vcf",".handfiltered.vcf")
handFilter.analysisName = base+"_HandFilter"
// 3.i generate gaussian clusters on the masked vcf
// todo -- args for annotations?
// todo -- args for resources (properties file)
val clusters = new GenerateVariantClusters with CommandLineGATKArgs
clusters.jobOutputFile = new File(".queue/logs/SNPCalling/Clusters.out")
clusters.rodBind :+= RodBind("input", "VCF", masker.out)
clusters.DBSNP = qscript.pipeline.getProject.getDbsnpFile
val clusters_clusterFile = swapExt(new File(snps.out.getAbsolutePath),".vcf",".cluster")
val clusters_clusterFile = swapExt("SnpCalls/IntermediateFiles",snps.out,".vcf",".cluster")
clusters.clusterFile = clusters_clusterFile
clusters.memoryLimit = Some(6)
clusters.jobQueue = "gsa"
@ -287,19 +333,21 @@ class fullCallingPipeline extends QScript {
// 3.ii apply gaussian clusters to the masked vcf
val recalibrate = new VariantRecalibrator with CommandLineGATKArgs
recalibrate.jobOutputFile = new File(".queue/logs/SNPCalling/Recalibrator.out")
recalibrate.clusterFile = clusters.clusterFile
recalibrate.DBSNP = qscript.pipeline.getProject.getDbsnpFile
recalibrate.rodBind :+= RodBind("input", "VCF", masker.out)
recalibrate.out = swapExt(masker.out,".vcf",".recalibrated.vcf")
recalibrate.out = swapExt("SnpCalls",masker.out,".vcf",".recalibrated.vcf")
recalibrate.target_titv = qscript.target_titv
recalibrate.report_dat_file = swapExt(masker.out,".vcf",".recalibrate.dat")
recalibrate.tranches_file = swapExt(masker.out,".vcf",".recalibrate.tranches")
recalibrate.report_dat_file = swapExt("SnpCalls/IntermediateFiles", masker.out,".vcf",".recalibrate.dat")
recalibrate.tranches_file = swapExt("SnpCalls/IntermediateFiles", masker.out,".vcf",".recalibrate.tranches")
recalibrate.analysisName = base+"_VariantRecalibrator"
// 3.iii apply variant cuts to the clusters
val cut = new ApplyVariantCuts with CommandLineGATKArgs
cut.jobOutputFile = new File(".queue/logs/SNPCalling/VariantCuts.out")
cut.rodBind :+= RodBind("input", "VCF", recalibrate.out)
cut.out = swapExt(recalibrate.out,".vcf",".tranched.vcf")
cut.out = swapExt("SnpCalls",recalibrate.out,".vcf",".tranched.vcf")
cut.tranches_file = recalibrate.tranches_file
// todo -- fdr inputs, etc
cut.fdr_filter_level = Some(1)
@ -307,13 +355,14 @@ class fullCallingPipeline extends QScript {
// 4. Variant eval the cut and the hand-filtered vcf files
val eval = new VariantEval with CommandLineGATKArgs
eval.jobOutputFile = new File(".queue/logs/SNPCalling/VariantEval.out")
eval.rodBind :+= RodBind("evalOptimized", "VCF", cut.out)
eval.rodBind :+= RodBind("evalHandFiltered", "VCF", handFilter.out)
eval.evalModule ++= List("CountFunctionalClasses", "CompOverlap", "CountVariants", "TiTvVariantEvaluator")
eval.reportLocation = new File(base+".eval")
eval.reportLocation = new File("SnpCalls", base+".eval")
eval.reportType = Option(org.broadinstitute.sting.utils.report.VE2ReportFactory.VE2TemplateType.R)
eval.analysisName = base+"_VariantEval"
eval.DBSNP = qscript.pipeline.getProject.getDBSNP
eval.DBSNP = qscript.pipeline.getProject.getDbsnpFile
add(snps)
@ -335,7 +384,8 @@ class fullCallingPipeline extends QScript {
// adpr.sequencer = seqinfo
// adpr.protocol = exptype
// adpr.dependents = eval.reportLocation
// adpr.out = new File(base + "_adpr.pdf")
// adpr.jobOutputFile = new File(".queue/logs/Reporting/ADPR.out")
// adpr.out = new File("Reporting", base + "_adpr.pdf")
// adpr.analysisName = base + "_ADPR"
//In order for ADPR to finish successfully, a squid file for both the lane and sample level data needs to be
// produced, reformatted and named <projectBase>_lanes.txt or <projectBase>_samps.txt, respectively. These files

View File

@ -86,7 +86,6 @@ class CleanBamFile extends QScript {
this.reference_sequence = qscript.referenceFile
this.intervals = qscript.intervals
this.input_file :+= recalibratedBam
this.cleanupTempDirectories = true
}
def baseFile(suffix: String) = new File(baseName + suffix)

View File

@ -30,12 +30,39 @@ trait QScript extends Logging {
/**
* Exchanges the extension on a file.
* @param file File to look for the extension.
* @param oldExtension Old extension to strip off, if present.
* @param newExtension New extension to append.
* @return new File with the new extension in the current directory.
*/
protected def swapExt(file: File, oldExtension: String, newExtension: String) =
new File(file.getName.stripSuffix(oldExtension) + newExtension)
/**
* Exchanges the extension on a file.
* @param dir New directory for the file.
* @param file File to look for the extension.
* @param oldExtension Old extension to strip off, if present.
* @param newExtension New extension to append.
* @return new File with the new extension in dir.
*/
protected def swapExt(dir: String, file: File, oldExtension: String, newExtension: String) =
new File(dir, file.getName.stripSuffix(oldExtension) + newExtension)
/**
* Exchanges the extension on a file.
* @param dir New directory for the file.
* @param file File to look for the extension.
* @param oldExtension Old extension to strip off, if present.
* @param newExtension New extension to append.
* @return new File with the new extension in dir.
*/
protected def swapExt(dir: File, file: File, oldExtension: String, newExtension: String) =
new File(dir, file.getName.stripSuffix(oldExtension) + newExtension)
/**
* Adds one or more command line functions to be run.
* @param functions Functions to add.
*/
def add(functions: CommandLineFunction*) = this.functions ++= List(functions:_*)

View File

@ -1,27 +1,29 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.queue.util.Logging
import org.broadinstitute.sting.queue.util.{IOUtils, Logging}
/**
* Runs a function that executes in process and does not fork out an external process.
*/
class InProcessRunner(function: InProcessFunction) extends JobRunner with Logging {
class InProcessRunner(val function: InProcessFunction) extends JobRunner with Logging {
private var runStatus: RunnerStatus.Value = _
def start() = {
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.description)
} else {
logger.info("Starting: " + function.description)
}
function.doneOutputs.foreach(_.delete())
function.failOutputs.foreach(_.delete())
runStatus = RunnerStatus.RUNNING
try {
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.description)
} else {
logger.info("Starting: " + function.description)
}
function.doneOutputs.foreach(_.delete())
function.failOutputs.foreach(_.delete())
runStatus = RunnerStatus.RUNNING
function.mkOutputDirectories()
function.run()
function.doneOutputs.foreach(_.createNewFile())
writeDone()
runStatus = RunnerStatus.DONE
logger.info("Done: " + function.description)
} catch {
@ -29,6 +31,7 @@ class InProcessRunner(function: InProcessFunction) extends JobRunner with Loggin
runStatus = RunnerStatus.FAILED
try {
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}

View File

@ -1,5 +1,9 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.IOUtils
import java.io.{PrintWriter, StringWriter}
import org.broadinstitute.sting.queue.function.QFunction
/**
* Base interface for job runners.
*/
@ -18,4 +22,24 @@ trait JobRunner {
* @return RUNNING, DONE, or FAILED.
*/
def status: RunnerStatus.Value
/**
* Returns the function to be run.
*/
def function: QFunction
protected def writeDone() = {
val content = "%s%nDone.".format(function.description)
IOUtils.writeContents(function.jobOutputFile, content)
}
protected def writeStackTrace(e: Throwable) = {
val stackTrace = new StringWriter
val printWriter = new PrintWriter(stackTrace)
printWriter.println(function.description)
e.printStackTrace(printWriter)
printWriter.close
val outputFile = if (function.jobErrorFile != null) function.jobErrorFile else function.jobOutputFile
IOUtils.writeContents(outputFile, stackTrace.toString)
}
}

View File

@ -7,10 +7,10 @@ import org.broadinstitute.sting.queue.util._
/**
* Runs jobs on an LSF compute cluster.
*/
class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with Logging {
class LsfJobRunner(val function: CommandLineFunction) extends DispatchJobRunner with Logging {
private var runStatus: RunnerStatus.Value = _
var job: LsfJob = _
var job: LsfJob = new LsfJob
/** A file to look for to validate that the function ran to completion. */
private var jobStatusPath: String = _
@ -35,59 +35,62 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with
* @param function Command to run.
*/
def start() = {
job = new LsfJob
// job.name = function.jobName TODO: Make setting the job name optional.
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
job.project = function.jobProject
job.queue = function.jobQueue
if (!IOUtils.CURRENT_DIR.getCanonicalFile.equals(function.commandDirectory))
job.workingDir = function.commandDirectory
job.extraBsubArgs ++= function.extraArgs
if (function.jobRestartable)
job.extraBsubArgs :+= "-r"
if (function.memoryLimit.isDefined)
job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]")
job.name = function.commandLine.take(1000)
// TODO: Look into passing in a single chained script as recommended by Doug instead of pre, exec, and post.
exec = writeExec()
job.command = "sh " + exec
preExec = writePreExec()
job.preExecCommand = "sh " + preExec
postExec = writePostExec()
job.postExecCommand = "sh " + postExec
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" "))
} else {
logger.info("Starting: " + job.bsubCommand.mkString(" "))
}
function.jobOutputFile.delete()
if (function.jobErrorFile != null)
function.jobErrorFile.delete()
runStatus = RunnerStatus.RUNNING
try {
function.mkOutputDirectories()
// job.name = function.jobName TODO: Make setting the job name optional.
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
job.project = function.jobProject
job.queue = function.jobQueue
if (!IOUtils.CURRENT_DIR_ABS.equals(function.commandDirectory))
job.workingDir = function.commandDirectory
job.extraBsubArgs ++= function.extraArgs
if (function.jobRestartable)
job.extraBsubArgs :+= "-r"
if (function.memoryLimit.isDefined)
job.extraBsubArgs ++= List("-R", "rusage[mem=" + function.memoryLimit.get + "]")
job.name = function.commandLine.take(1000)
exec = writeExec()
job.command = "sh " + exec
preExec = writePreExec()
job.preExecCommand = "sh " + preExec
postExec = writePostExec()
job.postExecCommand = "sh " + postExec
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + job.bsubCommand.mkString(" "))
} else {
logger.info("Starting: " + job.bsubCommand.mkString(" "))
}
function.jobOutputFile.delete()
if (function.jobErrorFile != null)
function.jobErrorFile.delete()
runStatus = RunnerStatus.RUNNING
Retry.attempt(() => job.run(), 1, 5, 10)
jobStatusPath = IOUtils.absolute(new File(function.commandDirectory, "." + job.bsubJobId)).toString
logger.info("Submitted LSF job id: " + job.bsubJobId)
} catch {
case re: RetryException =>
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
case e =>
logger.error("Error trying to start job.", e)
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + job.bsubCommand.mkString(" "), e)
}
}
@ -100,20 +103,33 @@ class LsfJobRunner(function: CommandLineFunction) extends DispatchJobRunner with
* .done files used to determine if a file has been created successfully.
*/
def status = {
if (logger.isDebugEnabled) {
logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists))
logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists))
}
try {
if (logger.isDebugEnabled) {
logger.debug("Done %s exists = %s".format(jobDoneFile, jobDoneFile.exists))
logger.debug("Fail %s exists = %s".format(jobFailFile, jobFailFile.exists))
}
if (jobFailFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
logger.info("Error: " + job.bsubCommand.mkString(" "))
tailError()
} else if (jobDoneFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.DONE
logger.info("Done: " + job.bsubCommand.mkString(" "))
if (jobFailFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.FAILED
logger.info("Error: " + job.bsubCommand.mkString(" "))
tailError()
} else if (jobDoneFile.exists) {
removeTemporaryFiles()
runStatus = RunnerStatus.DONE
logger.info("Done: " + job.bsubCommand.mkString(" "))
}
} catch {
case e =>
runStatus = RunnerStatus.FAILED
try {
removeTemporaryFiles()
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}
logger.error("Error: " + job.bsubCommand.mkString(" "), e)
}
runStatus

View File

@ -29,6 +29,8 @@ class QGraph extends Logging {
var statusEmailTo: List[String] = _
private val jobGraph = newGraph
private var shuttingDown = false
private val nl = "%n".format()
/**
* Adds a QScript created CommandLineFunction to the graph.
@ -86,7 +88,7 @@ class QGraph extends Logging {
.function.asInstanceOf[ScatterGatherableFunction]
.generateFunctions()
if (this.debugMode)
logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString("%n".format())))
logger.debug("Scattered into %d parts: %n%s".format(functions.size, functions.mkString(nl)))
addedFunctions ++= functions
}
@ -240,23 +242,20 @@ class QGraph extends Logging {
private def dryRunJobs() = {
traverseFunctions(edge => {
edge.function match {
case clf: CommandLineFunction => {
case qFunction => {
if (logger.isDebugEnabled) {
logger.debug(clf.commandDirectory + " > " + clf.commandLine)
logger.debug(qFunction.commandDirectory + " > " + qFunction.description)
} else {
logger.info(clf.commandLine)
logger.info(qFunction.description)
}
logger.info("Output written to " + clf.jobOutputFile)
if (clf.jobErrorFile != null) {
logger.info("Errors written to " + clf.jobErrorFile)
logger.info("Output written to " + qFunction.jobOutputFile)
if (qFunction.jobErrorFile != null) {
logger.info("Errors written to " + qFunction.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + clf.jobOutputFile)
logger.info("Errors also written to " + qFunction.jobOutputFile)
}
}
case qFunction => {
logger.info(qFunction.description)
}
}
})
}
@ -276,7 +275,7 @@ class QGraph extends Logging {
var readyJobs = getReadyJobs
var runningJobs = Set.empty[FunctionEdge]
while (readyJobs.size + runningJobs.size > 0) {
while (!shuttingDown && readyJobs.size + runningJobs.size > 0) {
var exitedJobs = List.empty[FunctionEdge]
var failedJobs = List.empty[FunctionEdge]
@ -304,12 +303,12 @@ class QGraph extends Logging {
Thread.sleep(30000L)
readyJobs = getReadyJobs
}
emailStatus()
} catch {
case e =>
logger.error("Uncaught error running jobs.", e)
throw e
} finally {
emailStatus()
}
}
@ -327,54 +326,64 @@ class QGraph extends Logging {
}
}
private def emailFailedJobs(jobs: List[FunctionEdge]) = {
private def emailFailedJobs(failed: List[FunctionEdge]) = {
if (statusEmailTo.size > 0) {
val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom
emailMessage.to = statusEmailTo
emailMessage.body = getStatus
emailMessage.subject = "Queue function: Failure"
emailMessage.body = "Failed functions: %n%n%s%n"
.format(jobs.map(_.function.description).mkString("%n%n".format()))
emailMessage.attachments = jobs.flatMap(edge => logFiles(edge))
addFailedFunctions(emailMessage, failed)
emailMessage.trySend(qSettings.emailSettings)
}
}
private def emailStatus() = {
if (statusEmailTo.size > 0) {
var failedFunctions = List.empty[String]
var failedOutputs = List.empty[File]
var failed = List.empty[FunctionEdge]
foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) {
failedFunctions :+= edge.function.description
failedOutputs ++= logFiles(edge)
failed :+= edge
}
})
val emailMessage = new EmailMessage
emailMessage.from = statusEmailFrom
emailMessage.to = statusEmailTo
emailMessage.body = getStatus + "%n".format()
if (failedFunctions.size == 0) {
emailMessage.body = getStatus + nl
if (failed.size == 0) {
emailMessage.subject = "Queue run: Success"
} else {
emailMessage.subject = "Queue run: Failure"
emailMessage.attachments = failedOutputs
addFailedFunctions(emailMessage, failed)
}
emailMessage.trySend(qSettings.emailSettings)
}
}
private def addFailedFunctions(emailMessage: EmailMessage, failed: List[FunctionEdge]) = {
val logs = failed.flatMap(edge => logFiles(edge))
if (emailMessage.body == null)
emailMessage.body = ""
emailMessage.body += """
|Failed functions:
|
|%s
|
|Logs:
|%s%n
|""".stripMargin.trim.format(
failed.map(_.function.description).mkString(nl+nl),
logs.map(_.getAbsolutePath).mkString(nl))
emailMessage.attachments = logs
}
private def logFiles(edge: FunctionEdge) = {
// TODO: All functions should be writing error files, including InProcessFunctions
var failedOutputs = List.empty[File]
if (edge.function.isInstanceOf[CommandLineFunction]) {
val clf = edge.function.asInstanceOf[CommandLineFunction]
failedOutputs :+= clf.jobOutputFile
if (clf.jobErrorFile != null)
failedOutputs :+= clf.jobErrorFile
}
failedOutputs :+= edge.function.jobOutputFile
if (edge.function.jobErrorFile != null)
failedOutputs :+= edge.function.jobErrorFile
failedOutputs.filter(file => file != null && file.exists)
}
@ -408,7 +417,6 @@ class QGraph extends Logging {
*/
private def getStatus = {
val buffer = new StringBuilder
val nl = "%n".format()
doStatus(status => buffer.append(status).append(nl))
buffer.toString
}
@ -631,6 +639,7 @@ class QGraph extends Logging {
* Kills any forked jobs still running.
*/
def shutdown() {
shuttingDown = true
val lsfJobRunners = getRunningJobs.filter(_.runner.isInstanceOf[LsfJobRunner]).map(_.runner.asInstanceOf[LsfJobRunner])
if (lsfJobRunners.size > 0) {
for (jobRunners <- lsfJobRunners.filterNot(_.job.bsubJobId == null).grouped(10)) {

View File

@ -1,12 +1,12 @@
package org.broadinstitute.sting.queue.engine
import org.broadinstitute.sting.queue.util.{JobExitException, Logging, ShellJob}
import org.broadinstitute.sting.queue.util.{Logging, ShellJob}
import org.broadinstitute.sting.queue.function.CommandLineFunction
/**
* Runs jobs one at a time locally
*/
class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Logging {
class ShellJobRunner(val function: CommandLineFunction) extends JobRunner with Logging {
private var runStatus: RunnerStatus.Value = _
/**
@ -14,42 +14,44 @@ class ShellJobRunner(function: CommandLineFunction) extends JobRunner with Loggi
* @param function Command to run.
*/
def start() = {
val job = new ShellJob
job.command = function.commandLine
job.workingDir = function.commandDirectory
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine)
} else {
logger.info("Starting: " + function.commandLine)
}
logger.info("Output written to " + function.jobOutputFile)
if (function.jobErrorFile != null) {
logger.info("Errors written to " + function.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + function.jobOutputFile)
}
function.jobOutputFile.delete()
if (function.jobErrorFile != null)
function.jobErrorFile.delete()
function.doneOutputs.foreach(_.delete())
function.failOutputs.foreach(_.delete())
runStatus = RunnerStatus.RUNNING
try {
job.run()
function.doneOutputs.foreach(_.createNewFile())
runStatus = RunnerStatus.DONE
logger.info("Done: " + function.commandLine)
val job = new ShellJob
job.command = function.commandLine
job.workingDir = function.commandDirectory
job.outputFile = function.jobOutputFile
job.errorFile = function.jobErrorFile
if (logger.isDebugEnabled) {
logger.debug("Starting: " + function.commandDirectory + " > " + function.commandLine)
} else {
logger.info("Starting: " + function.commandLine)
}
logger.info("Output written to " + function.jobOutputFile)
if (function.jobErrorFile != null) {
logger.info("Errors written to " + function.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + function.jobOutputFile)
}
function.jobOutputFile.delete()
if (function.jobErrorFile != null)
function.jobErrorFile.delete()
function.doneOutputs.foreach(_.delete())
function.failOutputs.foreach(_.delete())
runStatus = RunnerStatus.RUNNING
function.mkOutputDirectories()
job.run()
function.doneOutputs.foreach(_.createNewFile())
runStatus = RunnerStatus.DONE
logger.info("Done: " + function.commandLine)
} catch {
case e: JobExitException =>
case e =>
runStatus = RunnerStatus.FAILED
try {
function.failOutputs.foreach(_.createNewFile())
writeStackTrace(e)
} catch {
case _ => /* ignore errors in the exception handler */
}

View File

@ -1,10 +1,6 @@
package org.broadinstitute.sting.queue.function
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.commandline._
import java.io.File
import collection.JavaConversions._
import org.broadinstitute.sting.queue.function.scattergather.{SimpleTextGatherFunction, Gather}
/**
* A command line that will be run in a pipeline.
@ -21,12 +17,6 @@ trait CommandLineFunction extends QFunction with Logging {
/** Whether a job is restartable */
var jobRestartable = true
/** Prefix for automatic job name creation */
var jobNamePrefix: String = _
/** The name name of the job */
var jobName: String = _
/** Job project to run the command */
var jobProject: String = _
@ -36,50 +26,18 @@ trait CommandLineFunction extends QFunction with Logging {
/** Extra arguments to specify on the command line */
var extraArgs: List[String] = Nil
/** Temporary directory to write any files */
var jobTempDir: File = IOUtils.javaTempDir
/** File to redirect any output. Defaults to <jobName>.out */
@Output(doc="File to redirect any output", required=false)
@Gather(classOf[SimpleTextGatherFunction])
var jobOutputFile: File = _
/** File to redirect any errors. Defaults to <jobName>.out */
@Output(doc="File to redirect any errors", required=false)
@Gather(classOf[SimpleTextGatherFunction])
var jobErrorFile: File = _
/**
* Returns set of directories required to run the command.
* @return Set of directories required to run the command.
*/
def jobDirectories = {
var dirs = Set.empty[File]
dirs += commandDirectory
if (jobTempDir != null)
dirs += jobTempDir
dirs ++= inputs.map(_.getParentFile)
dirs ++= outputs.map(_.getParentFile)
dirs
}
override def useStatusOutput(file: File) =
file != jobOutputFile && file != jobErrorFile
def jobDirectories = outputDirectories ++ inputs.map(_.getParentFile)
override def description = commandLine
/**
* The function description in .dot files
*/
override def dotString = jobName + " => " + commandLine
/**
* Sets all field values.
*/
override def freezeFieldValues = {
if (jobNamePrefix == null)
jobNamePrefix = qSettings.jobNamePrefix
if (jobQueue == null)
jobQueue = qSettings.jobQueue
@ -89,12 +47,6 @@ trait CommandLineFunction extends QFunction with Logging {
if (memoryLimit.isEmpty && qSettings.memoryLimit.isDefined)
memoryLimit = qSettings.memoryLimit
if (jobName == null)
jobName = CommandLineFunction.nextJobName(jobNamePrefix)
if (jobOutputFile == null)
jobOutputFile = new File(jobName + ".out")
super.freezeFieldValues
}
@ -143,21 +95,3 @@ trait CommandLineFunction extends QFunction with Logging {
}) + suffix
}
/**
* A command line that will be run in a pipeline.
*/
object CommandLineFunction {
/** Job index counter for this run of Queue. */
private var jobIndex = 0
/**
* Returns the next job name using the prefix.
* @param prefix Prefix of the job name.
* @return the next job name.
*/
private def nextJobName(prefix: String) = {
jobIndex += 1
prefix + "-" + jobIndex
}
}

View File

@ -1,12 +1,9 @@
package org.broadinstitute.sting.queue.function
import java.io.File
/**
* Runs a function in process.
*/
trait InProcessFunction extends QFunction {
def run()
def useStatusOutput(file: File) = true
def description = (List(this.getClass.getSimpleName) ++ this.outputs.map(_.getAbsolutePath)).mkString(" ")
def description = (List(this.getClass.getSimpleName) ++ this.outputs.filter(file => useStatusOutput(file)).map(_.getAbsolutePath)).mkString(" ")
}

View File

@ -6,6 +6,7 @@ import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util.{CollectionUtils, IOUtils, ReflectionUtils}
import org.broadinstitute.sting.queue.{QException, QSettings}
import collection.JavaConversions._
import org.broadinstitute.sting.queue.function.scattergather.{Gather, SimpleTextGatherFunction}
/**
* The base interface for all functions in Queue.
@ -18,12 +19,31 @@ trait QFunction {
*/
var analysisName: String = _
/** Prefix for automatic job name creation */
var jobNamePrefix: String = _
/** The name name of the job */
var jobName: String = _
/** Default settings */
var qSettings: QSettings = _
/** Directory to run the command in. */
var commandDirectory: File = IOUtils.CURRENT_DIR
/** Temporary directory to write any files */
var jobTempDir: File = IOUtils.javaTempDir
/** File to redirect any output. Defaults to <jobName>.out */
@Output(doc="File to redirect any output", required=false)
@Gather(classOf[SimpleTextGatherFunction])
var jobOutputFile: File = _
/** File to redirect any errors. Defaults to <jobName>.out */
@Output(doc="File to redirect any errors", required=false)
@Gather(classOf[SimpleTextGatherFunction])
var jobErrorFile: File = _
/**
* Description of this command line function.
*/
@ -32,7 +52,7 @@ trait QFunction {
/**
* The function description in .dot files
*/
def dotString = ""
def dotString = jobName + " => " + description
/**
* Returns true if the function is done, false if it's
@ -62,7 +82,8 @@ trait QFunction {
* Returns true if the file should be used for status output.
* @return true if the file should be used for status output.
*/
def useStatusOutput(file: File): Boolean
def useStatusOutput(file: File) =
file != jobOutputFile && file != jobErrorFile
/**
* Returns the output files for this function.
@ -110,6 +131,28 @@ trait QFunction {
*/
def outputs = getFieldFiles(outputFields)
/**
* Returns the set of directories where files may be written.
*/
def outputDirectories = {
var dirs = Set.empty[File]
dirs += commandDirectory
if (jobTempDir != null)
dirs += jobTempDir
dirs ++= outputs.map(_.getParentFile)
dirs
}
/**
* Creates the output directories for this function if it doesn't exist.
*/
def mkOutputDirectories() = {
outputDirectories.foreach(dir => {
if (!dir.exists && !dir.mkdirs)
throw new QException("Unable to create directory: " + dir)
})
}
/**
* Returns fields that do not have values which are required.
* @return List[String] names of fields missing values.
@ -186,33 +229,6 @@ trait QFunction {
case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown))
}
/**
* Resets the field to the temporary directory.
* @param field Field to get and set the file.
* @param tempDir new root for the file.
*/
def resetFieldFile(field: ArgumentSource, tempDir: File): File = {
getFieldValue(field) match {
case fileExtension: FileExtension => {
val newFile = IOUtils.resetParent(tempDir, fileExtension)
val newFileExtension = fileExtension.withPath(newFile.getPath)
setFieldValue(field, newFileExtension)
newFileExtension
}
case file: File => {
if (file.getClass != classOf[File])
throw new QException("Extensions of file must also extend with FileExtension so that the path can be modified.");
val newFile = IOUtils.resetParent(tempDir, file)
setFieldValue(field, newFile)
newFile
}
case null => null
case unknown =>
throw new QException("Unable to set file from %s: %s".format(field, unknown))
}
}
/**
* After a function is frozen no more updates are allowed by the user.
* The function is allow to make necessary updates internally to make sure
@ -223,7 +239,19 @@ trait QFunction {
canonFieldValues
}
/**
* Sets all field values.
*/
def freezeFieldValues = {
if (jobNamePrefix == null)
jobNamePrefix = qSettings.jobNamePrefix
if (jobName == null)
jobName = QFunction.nextJobName(jobNamePrefix)
if (jobOutputFile == null)
jobOutputFile = new File(jobName + ".out")
commandDirectory = IOUtils.subDir(IOUtils.CURRENT_DIR, commandDirectory)
}
@ -346,6 +374,19 @@ trait QFunction {
}
object QFunction {
/** Job index counter for this run of Queue. */
private var jobIndex = 0
/**
* Returns the next job name using the prefix.
* @param prefix Prefix of the job name.
* @return the next job name.
*/
private def nextJobName(prefix: String) = {
jobIndex += 1
prefix + "-" + jobIndex
}
/**
* The list of fields defined on a class
* @param clazz The class to lookup fields.

View File

@ -1,22 +0,0 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.commandline.Input
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.apache.commons.io.FileUtils
/**
* Removes the temporary directories for scatter / gather.
* The script can be changed by setting rmdirScript.
* By default uses rm -rf.
* The format of the call is <mkdirScript> <dir_1> [.. <dir_n>]
*/
class CleanupTempDirsFunction extends InProcessFunction {
@Input(doc="Original outputs of the gather functions")
var originalOutputs: Set[File] = Set.empty[File]
@Input(doc="Temporary directories to be deleted")
var tempDirectories: List[File] = Nil
def run() = tempDirectories.foreach(FileUtils.deleteDirectory(_))
}

View File

@ -52,33 +52,31 @@ class CloneFunction extends CommandLineFunction {
this.jobProject = originalFunction.jobProject
if (this.jobName == null)
this.jobName = originalFunction.jobName
if (this.jobOutputFile == null)
this.jobOutputFile = overriddenFile("jobOutputFile").get
if (this.jobErrorFile == null)
this.jobErrorFile = overriddenFile("jobErrorFile").getOrElse(null)
super.freezeFieldValues
}
def commandLine = withScatterPart(() => originalFunction.commandLine)
override def getFieldValue(source: ArgumentSource) = {
overriddenFields.get(source) match {
case Some(value) => value.asInstanceOf[AnyRef]
case None => {
val value = originalFunction.getFieldValue(source)
overriddenFields += source -> value
value
source.field.getName match {
case "jobOutputFile" => jobOutputFile
case "jobErrorFile" => jobErrorFile
case _ => overriddenFields.get(source) match {
case Some(value) => value.asInstanceOf[AnyRef]
case None => {
val value = originalFunction.getFieldValue(source)
overriddenFields += source -> value
value
}
}
}
}
override def setFieldValue(source: ArgumentSource, value: Any) = {
overriddenFields += source -> value
}
private def overriddenFile(name: String) = {
overriddenFields
.find{case (key, _) => key.field.getName == name}
.map{case (_, value) => value.asInstanceOf[File]}
source.field.getName match {
case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File]
case "jobErrorFile" => jobErrorFile = value.asInstanceOf[File]
case _ => overriddenFields += source -> value
}
}
}

View File

@ -1,25 +0,0 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.commandline.{Output, Input}
import org.broadinstitute.sting.queue.function.InProcessFunction
/**
* Creates the temporary directories for scatter / gather.
* The script can be changed by setting mkdirScript.
* By default uses mkdir -pv
* The format of the call is <rmdirScript> <dir_1> [.. <dir_n>]
*/
class CreateTempDirsFunction extends InProcessFunction {
@Input(doc="Original inputs to the scattered function")
var originalInputs: Set[File] = Set.empty[File]
@Output(doc="Temporary directories to create")
var tempDirectories: List[File] = Nil
override def useStatusOutput(file: File) = false
override def isDone = Some(tempDirectories.forall(_.exists))
def run() = tempDirectories.foreach(_.mkdirs)
}

View File

@ -14,9 +14,6 @@ trait ScatterFunction extends QFunction {
@Output(doc="Scattered parts of the original input, one per temp directory")
var scatterParts: List[File] = Nil
@Input(doc="Temporary directories for each scatter part")
var tempDirectories: List[File] = Nil
/**
* Sets the original function used to create this scatter function.
* @param originalFunction The ScatterGatherableFunction.

View File

@ -16,12 +16,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/** scatter gather directory */
var scatterGatherDirectory: File = _
/** cleanup temporary directories */
var cleanupTempDirectories = false
/** Class to use for creating temporary directories. Defaults to CreateTempDirsFunction. */
var createTempDirsClass: Class[_ <: CreateTempDirsFunction] = _
/** Class to use for scattering. Defaults to the annotation used in the @Scatter tag. */
var scatterClass: Class[_ <: ScatterFunction] = _
@ -32,16 +26,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
*/
var gatherClass: PartialFunction[ArgumentSource, Class[_ <: GatherFunction]] = _
/** Class to use for removing temporary directories. Defaults to CleanupTempDirsFunction. */
var cleanupTempDirsClass: Class[_ <: CleanupTempDirsFunction] = _
/**
* Allows external modification of the CreateTempDirsFunction that will create the temporary directories.
* @param initializeFunction The function that will create the temporary directories.
* @param inputFields The input fields that the original function was dependent on.
*/
var setupInitializeFunction: PartialFunction[(CreateTempDirsFunction, List[ArgumentSource]), Unit] = _
/**
* Allows external modification of the ScatterFunction that will create the scatter pieces in the temporary directories.
* @param scatterFunction The function that will create the scatter pieces in the temporary directories.
@ -63,14 +47,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
*/
var setupCloneFunction: PartialFunction[(CloneFunction, Int), Unit] = _
/**
* Allows external modification of the CleanupTempDirsFunction that will remove the temporary directories.
* @param cleanupFunction The function that will remove the temporary directories.
* @param gatherFunctions The functions that will gather up the original output fields.
* @param outputFields The output fields that the original function was dependent on.
*/
var setupCleanupFunction: PartialFunction[(CleanupTempDirsFunction, Map[ArgumentSource, GatherFunction], List[ArgumentSource]), Unit] = _
/**
* Returns true if the function is ready to be scatter / gathered.
* The base implementation checks if the scatter count is greater than one,
@ -93,57 +69,73 @@ trait ScatterGatherableFunction extends CommandLineFunction {
val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_))
// Only gather up fields that will have a value
val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_))
// The field containing the file to split
val originalInput = getFieldFile(scatterField)
// Create the scatter function based on @Scatter
val scatterFunction = this.newScatterFunction(this.scatterField)
scatterFunction.analysisName = this.analysisName
scatterFunction.qSettings = this.qSettings
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
scatterFunction.originalInput = originalInput
scatterFunction.setOriginalFunction(this, scatterField)
initScatterFunction(scatterFunction, this.scatterField)
tempDirectories :+= scatterFunction.commandDirectory
functions :+= scatterFunction
// Create the gather functions for each output field
var gatherFunctions = Map.empty[ArgumentSource, GatherFunction]
var gatherOutputs = Map.empty[ArgumentSource, File]
for (gatherField <- outputFieldsWithValues) {
val gatherFunction = this.newGatherFunction(gatherField)
val gatherOutput = getFieldFile(gatherField)
gatherFunction.analysisName = this.analysisName
gatherFunction.qSettings = this.qSettings
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
initGatherFunction(gatherFunction, gatherField)
tempDirectories :+= gatherFunction.commandDirectory
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput
}
// Create the clone functions for running the parallel jobs
var cloneFunctions = List.empty[CloneFunction]
for (i <- 1 to this.scatterCount) {
val cloneFunction = this.newCloneFunction()
cloneFunction.originalFunction = this
cloneFunction.index = i
// Setup the fields on the clone function, outputing each as a relative file in the sg directory.
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)
var scatterPart = new File(originalInput.getName)
cloneFunction.setFieldValue(scatterField, scatterPart)
for (gatherField <- outputFieldsWithValues) {
val gatherPart = new File(gatherOutputs(gatherField).getName)
cloneFunction.setFieldValue(gatherField, gatherPart)
}
// Allow the script writer to change the paths to the files.
initCloneFunction(cloneFunction, i)
// Get absolute paths to the files and bind the sg functions to the clone function via the absolute paths.
scatterPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(scatterField))
cloneFunction.setFieldValue(scatterField, scatterPart)
scatterFunction.scatterParts :+= scatterPart
for (gatherField <- outputFieldsWithValues) {
val gatherPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(gatherField))
cloneFunction.setFieldValue(gatherField, gatherPart)
gatherFunctions(gatherField).gatherParts :+= gatherPart
}
cloneFunctions :+= cloneFunction
tempDirectories :+= cloneFunction.commandDirectory
bindCloneFunctionScatter(scatterFunction, this.scatterField, cloneFunction, i)
// For each each output field, change value to the scatterGatherTempDir dir and feed it into the gatherer
for (gatherField <- outputFieldsWithValues)
bindCloneFunctionGather(gatherFunctions(gatherField), gatherField, cloneFunction, i)
}
functions ++= cloneFunctions
// Create a function to create all of the scatterGatherTempDir directories.
// All of its inputs are the inputs of the original function.
val initializeFunction = this.newInitializeFunction()
initInitializeFunction(initializeFunction, inputFieldsWithValues)
// Create a function that will remove any temporary items
// All of its inputs are the outputs of the original function.
var cleanupFunction = newCleanupFunction()
initCleanupFunction(cleanupFunction, gatherFunctions, outputFieldsWithValues)
// Set the temporary directories, for the initialize function as outputs for scatter and cleanup as inputs.
initializeFunction.tempDirectories = tempDirectories
scatterFunction.tempDirectories = tempDirectories
cleanupFunction.tempDirectories = tempDirectories
functions +:= initializeFunction
if (this.cleanupTempDirectories)
functions :+= cleanupFunction
// Return all the various functions we created
functions
}
@ -167,41 +159,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
protected lazy val scatterField =
this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get
/**
* Retrieves the original field value for the scatter field.
*/
protected lazy val originalInput = getFieldFile(scatterField)
/**
* Creates a new initialize CreateTempDirsFunction that will create the temporary directories.
* @return A CreateTempDirsFunction that will create the temporary directories.
*/
protected def newInitializeFunction(): CreateTempDirsFunction = {
if (createTempDirsClass != null)
this.createTempDirsClass.newInstance
else
new CreateTempDirsFunction
}
/**
* Initializes the CreateTempDirsFunction that will create the temporary directories.
* The initializeFunction qSettings is set so that the CreateTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The initializeFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction.
* The initializeFunction is modified to become dependent on the input files for this ScatterGatherableFunction.
* Calls setupInitializeFunction with initializeFunction.
* @param initializeFunction The function that will create the temporary directories.
* @param inputFields The input fields that the original function was dependent on.
*/
protected def initInitializeFunction(initializeFunction: CreateTempDirsFunction, inputFields: List[ArgumentSource]) = {
initializeFunction.qSettings = this.qSettings
initializeFunction.commandDirectory = this.commandDirectory
for (inputField <- inputFields)
initializeFunction.originalInputs ++= this.getFieldFiles(inputField)
if (this.setupInitializeFunction != null)
if (this.setupInitializeFunction.isDefinedAt(initializeFunction, inputFields))
this.setupInitializeFunction(initializeFunction, inputFields)
}
/**
* Creates a new ScatterFunction for the scatterField.
* @param scatterField Field that defined @Scatter.
@ -217,19 +174,11 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories.
* The scatterFunction qSettings is set so that the ScatterFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The scatterFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory.
* The scatterFunction has it's originalInput set with the file to be scattered into scatterCount pieces.
* Calls scatterFunction.setOriginalFunction with this ScatterGatherableFunction.
* Calls setupScatterFunction with scatterFunction.
* @param scatterFunction The function that will create the scatter pieces in the temporary directories.
* @param scatterField The input field being scattered.
*/
protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = {
scatterFunction.qSettings = this.qSettings
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
scatterFunction.originalInput = this.originalInput
scatterFunction.setOriginalFunction(this, scatterField)
if (this.setupScatterFunction != null)
if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField))
this.setupScatterFunction(scatterFunction, scatterField)
@ -253,20 +202,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Initializes the GatherFunction created by newGatherFunction() that will collect the gather pieces in the temporary directories.
* The gatherFunction qSettings is set so that the GatherFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The gatherFunction commandDirectory is set so that the function runs from a temporary directory under the scatterDirectory.
* The gatherFunction has it's originalOutput set with the file to be gathered from the scatterCount pieces.
* Calls the gatherFunction.setOriginalFunction with this ScatterGatherableFunction.
* Calls setupGatherFunction with gatherFunction.
* @param gatherFunction The function that will merge the gather pieces from the temporary directories.
* @param gatherField The output field being gathered.
*/
protected def initGatherFunction(gatherFunction: GatherFunction, gatherField: ArgumentSource) = {
gatherFunction.qSettings = this.qSettings
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
gatherFunction.analysisName = this.analysisName
if (this.setupGatherFunction != null)
if (this.setupGatherFunction.isDefinedAt(gatherFunction, gatherField))
this.setupGatherFunction(gatherFunction, gatherField)
@ -279,83 +220,16 @@ trait ScatterGatherableFunction extends CommandLineFunction {
protected def newCloneFunction() = new CloneFunction
/**
* Initializes the cloned function created by newCloneFunction() by setting it's commandDirectory to a temporary directory under scatterDirectory.
* Calls setupCloneFunction with cloneFunction.
* @param cloneFunction The clone of this ScatterGatherableFunction
* @param index The one based index (from 1..scatterCount inclusive) of the scatter piece.
*/
protected def initCloneFunction(cloneFunction: CloneFunction, index: Int) = {
cloneFunction.originalFunction = this
cloneFunction.index = index
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+index)
if (this.setupCloneFunction != null)
if (this.setupCloneFunction.isDefinedAt(cloneFunction, index))
this.setupCloneFunction(cloneFunction, index)
}
/**
* Joins a piece of the ScatterFunction output to the cloned function's input.
* The input of the clone is changed to be in the output directory of the clone.
* The scatter function piece is added as an output of the scatterFunction.
* The clone function's original input is changed to use the piece from the output directory.
* Finally the scatterFunction.setCloneFunction is called with the clone of this ScatterGatherableFunction.
* @param scatterFunction Function that will create the pieces including the piece that will go to cloneFunction.
* @param scatterField The field to be scattered.
* @param cloneFunction Clone of this ScatterGatherableFunction.
* @param index The one based index (from 1..scatterCount inclusive) of the scatter piece.
*/
protected def bindCloneFunctionScatter(scatterFunction: ScatterFunction, scatterField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = {
// Reset the input of the clone to the the scatterGatherTempDir dir and add it as an output of the scatter
val scatterPart = IOUtils.resetParent(cloneFunction.commandDirectory, scatterFunction.originalInput)
scatterFunction.scatterParts :+= scatterPart
cloneFunction.setFieldValue(scatterField, scatterPart)
scatterFunction.setCloneFunction(cloneFunction, index, scatterField)
}
/**
* Joins the cloned function's output as a piece of the GatherFunction's input.
* Finally the scatterFunction.setCloneFunction is called with the clone of this ScatterGatherableFunction.
* @param cloneFunction Clone of this ScatterGatherableFunction.
* @param gatherFunction Function that will create the pieces including the piece that will go to cloneFunction.
* @param gatherField The field to be gathered.
*/
protected def bindCloneFunctionGather(gatherFunction: GatherFunction, gatherField: ArgumentSource, cloneFunction: CloneFunction, index: Int) = {
val gatherPart = cloneFunction.resetFieldFile(gatherField, cloneFunction.commandDirectory)
gatherFunction.gatherParts :+= gatherPart
gatherFunction.setCloneFunction(cloneFunction, index, gatherField)
}
/**
* Creates a new function that will remove the temporary directories.
* @return A CleanupTempDirs function that will remove the temporary directories.
*/
protected def newCleanupFunction(): CleanupTempDirsFunction = {
if (cleanupTempDirsClass != null)
this.cleanupTempDirsClass.newInstance
else
new CleanupTempDirsFunction
}
/**
* Initializes the CleanupTempDirsFunction created by newCleanupFunction() that will remove the temporary directories.
* The cleanupFunction qSettings is set so that the CleanupTempDirsFunction runs with the same prefix, etc. as this ScatterGatherableFunction.
* The cleanupFunction commandDirectory is set so that the function runs in the directory as this ScatterGatherableFunction.
* The initializeFunction is modified to become dependent on the output files for this ScatterGatherableFunction.
* Calls setupCleanupFunction with cleanupFunction.
* @param cleanupFunction The function that will remove the temporary directories.
* @param gatherFunctions The functions that will gather up the original output fields.
* @param outputFields The output fields that the original function was dependent on.
*/
protected def initCleanupFunction(cleanupFunction: CleanupTempDirsFunction, gatherFunctions: Map[ArgumentSource, GatherFunction], outputFields: List[ArgumentSource]) = {
cleanupFunction.qSettings = this.qSettings
cleanupFunction.commandDirectory = this.commandDirectory
for (gatherField <- outputFields)
cleanupFunction.originalOutputs += gatherFunctions(gatherField).originalOutput
if (this.setupCleanupFunction != null)
if (this.setupCleanupFunction.isDefinedAt(cleanupFunction, gatherFunctions, outputFields))
this.setupCleanupFunction(cleanupFunction, gatherFunctions, outputFields)
}
/**
* Returns a temporary directory under this scatter gather directory.
* @param Sub directory under the scatter gather directory.

View File

@ -95,7 +95,7 @@ class EmailMessage extends Logging {
}
override def toString = {
"""|
"""
|From: %s
|To: %s
|Cc: %s
@ -106,7 +106,7 @@ class EmailMessage extends Logging {
|
|Attachments:
|%s
|""".stripMargin.trim.format(
|""".stripMargin.format(
this.from, this.to.mkString(", "),
this.cc.mkString(", "), this.bcc.mkString(", "),
this.subject, this.body,

View File

@ -10,6 +10,8 @@ object IOUtils {
/** The current directory "." */
val CURRENT_DIR = new File(".")
val CURRENT_DIR_ABS = absolute(CURRENT_DIR)
/**
* Returns the sub path rooted at the parent.
* If the sub path is already absolute, returns the sub path.
@ -20,8 +22,8 @@ object IOUtils {
* @param path The sub path to append to the parent, if the path is not absolute.
* @return The absolute path to the file in the parent dir if the path was not absolute, otherwise the original path.
*/
def subDir(dir: File, path: String): File =
subDir(dir, new File(path))
def subDir(parent: File, path: String): File =
subDir(parent, new File(path))
/**
* Returns the sub path rooted at the parent.
@ -36,25 +38,16 @@ object IOUtils {
def subDir(parent: File, file: File): File = {
val parentAbs = absolute(parent)
val fileAbs = absolute(file)
val currentAbs = absolute(CURRENT_DIR)
if (parentAbs == currentAbs && fileAbs == currentAbs)
absolute(CURRENT_DIR.getCanonicalFile)
else if (parentAbs == currentAbs || file.isAbsolute)
if (parentAbs == CURRENT_DIR_ABS && fileAbs == CURRENT_DIR_ABS)
CURRENT_DIR_ABS
else if (parentAbs == CURRENT_DIR_ABS || file.isAbsolute)
fileAbs
else if (fileAbs == currentAbs)
else if (fileAbs == CURRENT_DIR_ABS)
parentAbs
else
absolute(new File(parentAbs, file.getPath))
}
/**
* Resets the parent of the file to the directory.
* @param dir New parent directory.
* @param file Path to the file to be re-rooted.
* @return Absolute path to the new file.
*/
def resetParent(dir: File, file: File) = absolute(subDir(dir, file.getName))
/**
* Returns the temp directory as defined by java.
* @return the temp directory as defined by java.

View File

@ -41,12 +41,6 @@ class IOUtilsUnitTest extends BaseTest {
Assert.assertEquals(new File("/path/../to/file"), subDir)
}
@Test
def testResetParent = {
val newFile = IOUtils.resetParent(new File("/new/parent/dir"), new File("/old/parent_dir/file.name"))
Assert.assertEquals(new File("/new/parent/dir/file.name"), newFile)
}
@Test
def testTempDir = {
val tempDir = IOUtils.tempDir("Q-Unit-Test")