diff --git a/java/src/org/broadinstitute/sting/commandline/ParsingEngine.java b/java/src/org/broadinstitute/sting/commandline/ParsingEngine.java index 05088f186..5c48db670 100755 --- a/java/src/org/broadinstitute/sting/commandline/ParsingEngine.java +++ b/java/src/org/broadinstitute/sting/commandline/ParsingEngine.java @@ -377,11 +377,11 @@ public class ParsingEngine { * Extract all the argument sources from a given object, along with their bindings if obj != null . * @param obj the object corresponding to the sourceClass * @param sourceClass class to act as sources for other arguments. - * @param parentFields + * @param parentFields Parent Fields * @return A map of sources associated with this object and its aggregated objects and bindings to their bindings values */ private static Map extractArgumentBindings(Object obj, Class sourceClass, Field[] parentFields) { - Map bindings = new HashMap(); + Map bindings = new LinkedHashMap(); while( sourceClass != null ) { Field[] fields = sourceClass.getDeclaredFields(); diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java index 8e1324ebe..f059471a4 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentDefinitionField.java @@ -25,6 +25,7 @@ package org.broadinstitute.sting.queue.extensions.gatk; import net.sf.samtools.SAMFileWriter; +import org.broad.tribble.vcf.VCFWriter; import org.broadinstitute.sting.commandline.*; import java.io.File; @@ -59,15 +60,15 @@ public abstract class ArgumentDefinitionField extends ArgumentField { return ""; else return String.format("%n" + - "/** %n" + - " * Short name of %1$s%n" + - " * @return Short name of %1$s%n" + + "/**%n" + + " * Short name of %1$s%n" + + " * @return Short name of %1$s%n" + " */%n" + "def %3$s = this.%1$s%n" + "%n" + - "/** %n" + - " * Short name of %1$s%n" + - " * @param value Short name of %1$s%n" + + "/**%n" + + " * Short name of %1$s%n" + + " * @param value Short name of %1$s%n" + " */%n" + "def %4$s(value: %2$s) = this.%1$s = value%n", getFieldName(), @@ -96,7 +97,7 @@ public abstract class ArgumentDefinitionField extends ArgumentField { } @Override - protected String getScatterGatherAnnotation() { + protected String getGatherAnnotation() { return ""; } @@ -120,9 +121,8 @@ public abstract class ArgumentDefinitionField extends ArgumentField { private static List getArgumentFields(ArgumentDefinition argumentDefinition) { if (intervalFields.contains(argumentDefinition.fullName) && argumentDefinition.ioType == ArgumentIOType.INPUT) { - boolean scatter = "intervals".equals(argumentDefinition.fullName); return Arrays.asList( - new IntervalFileArgumentField(argumentDefinition, scatter), + new IntervalFileArgumentField(argumentDefinition), new IntervalStringArgumentField(argumentDefinition)); // ROD Bindings are set by the RodBindField @@ -166,18 +166,10 @@ public abstract class ArgumentDefinitionField extends ArgumentField { } // if (intervalFields.contains(argumentDefinition.fullName) && argumentDefinition.ioType == ArgumentIOType.INPUT) - // Change intervals to an input file, and optionally scatter it. + // Change intervals exclusize of intervalsString. private static class IntervalFileArgumentField extends InputArgumentField { - private final boolean scatter; - public IntervalFileArgumentField(ArgumentDefinition argumentDefinition, boolean scatter) { + public IntervalFileArgumentField(ArgumentDefinition argumentDefinition) { super(argumentDefinition); - this.scatter = scatter; - } - - @Override protected boolean isMultiValued() { return !this.scatter && super.isMultiValued(); } - @Override public boolean isScatter() { return this.scatter; } - @Override protected String getScatterGatherAnnotation() { - return scatter ? String.format("@Scatter(classOf[IntervalScatterFunction])%n") : super.getScatterGatherAnnotation(); } @Override @@ -241,10 +233,15 @@ public abstract class ArgumentDefinitionField extends ArgumentField { @Override protected String getDefaultValue() { return "_"; } @Override public boolean isGather() { return true; } - @Override protected String getScatterGatherAnnotation() { - return String.format(SAMFileWriter.class.isAssignableFrom(argumentDefinition.argumentType) - ? "@Gather(classOf[BamGatherFunction])%n" - : "@Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction])%n"); + @Override protected String getGatherAnnotation() { + String gather; + if (SAMFileWriter.class.isAssignableFrom(argumentDefinition.argumentType)) + gather = "@Gather(classOf[BamGatherFunction])%n"; + else if (VCFWriter.class.isAssignableFrom(argumentDefinition.argumentType)) + gather = "@Gather(classOf[VcfGatherFunction])%n"; + else + gather = "@Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction])%n"; + return String.format(gather); } } diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java index 88ad2a02b..e5a6e86c2 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/ArgumentField.java @@ -72,7 +72,7 @@ public abstract class ArgumentField { isRequired(), getExclusiveOf(), getValidation(), - getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(), + getGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(), getDefineAddition()); } @@ -105,8 +105,8 @@ public abstract class ArgumentField { /** @return A validation string for the argument. */ protected String getValidation() { return ""; } - /** @return A scatter or gather annotation with a line feed, or "". */ - protected String getScatterGatherAnnotation() { return ""; } + /** @return A gather annotation with a line feed, or "". */ + protected String getGatherAnnotation() { return ""; } // Scala @@ -136,9 +136,6 @@ public abstract class ArgumentField { return importClasses; } - /** @return True if this field uses @Scatter. */ - public boolean isScatter() { return false; } - /** @return True if this field uses @Gather. */ public boolean isGather() { return false; } diff --git a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java index a6893636e..6c77cc7f5 100644 --- a/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java +++ b/java/src/org/broadinstitute/sting/queue/extensions/gatk/GATKExtensionsGenerator.java @@ -38,6 +38,7 @@ import org.broadinstitute.sting.gatk.io.stubs.OutputStreamArgumentTypeDescriptor import org.broadinstitute.sting.gatk.io.stubs.SAMFileReaderArgumentTypeDescriptor; import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor; import org.broadinstitute.sting.gatk.refdata.tracks.builders.RMDTrackBuilder; +import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -104,7 +105,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram { String clpClassName = clpManager.getName(clp); writeClass("org.broadinstitute.sting.queue.function.JarCommandLineFunction", COMMANDLINE_PACKAGE_NAME, clpClassName, - "", ArgumentDefinitionField.getArgumentFields(clp)); + false, "", ArgumentDefinitionField.getArgumentFields(clp)); if (clp == CommandLineGATK.class) { for (Entry>> walkersByPackage: walkerManager.getWalkerNamesByPackage(false).entrySet()) { @@ -116,9 +117,16 @@ public class GATKExtensionsGenerator extends CommandLineProgram { argumentFields.addAll(RodBindField.getRodArguments(walkerType, trackBuilder)); argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType)); + String constructor = String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName); + String scatterClass = getScatterClass(walkerType); + boolean isScatter = false; + if (scatterClass != null) { + isScatter = true; + constructor += String.format("scatterClass = classOf[%s]%n", scatterClass); + } + writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, walkerName, - String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName), - argumentFields); + isScatter, constructor, argumentFields); } } } @@ -149,15 +157,22 @@ public class GATKExtensionsGenerator extends CommandLineProgram { return false; } - private void writeClass(String baseClass, String packageName, String className, String constructor, - List argumentFields) throws IOException { - String content = getContent(CLASS_TEMPLATE, baseClass, packageName, className, constructor, "", argumentFields); + private String getScatterClass(Class walkerType) { + if (ReadWalker.class.isAssignableFrom(walkerType)) + return "ContigScatterFunction"; + else + return "IntervalScatterFunction"; + } + + private void writeClass(String baseClass, String packageName, String className, boolean isScatter, + String constructor, List argumentFields) throws IOException { + String content = getContent(CLASS_TEMPLATE, baseClass, packageName, className, constructor, isScatter, "", argumentFields); writeFile(packageName + "." + className, content); } private void writeFilter(String packageName, String className, List argumentFields) throws IOException { String content = getContent(TRAIT_TEMPLATE, "org.broadinstitute.sting.queue.function.CommandLineFunction", - packageName, className, "", String.format(" + \" -read_filter %s\"", className), argumentFields); + packageName, className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields); writeFile(packageName + "." + className, content); } @@ -172,12 +187,12 @@ public class GATKExtensionsGenerator extends CommandLineProgram { } private static String getContent(String scalaTemplate, String baseClass, String packageName, String className, - String constructor, String commandLinePrefix, List argumentFields) { + String constructor, boolean isScatter, + String commandLinePrefix, List argumentFields) { StringBuilder arguments = new StringBuilder(); StringBuilder commandLine = new StringBuilder(commandLinePrefix); Set importSet = new HashSet(); - boolean isScatter = false; boolean isGather = false; List freezeFields = new ArrayList(); for(ArgumentField argumentField: argumentFields) { @@ -186,13 +201,11 @@ public class GATKExtensionsGenerator extends CommandLineProgram { importSet.addAll(argumentField.getImportStatements()); freezeFields.add(argumentField.getFreezeFields()); - isScatter |= argumentField.isScatter(); isGather |= argumentField.isGather(); } if (isScatter) { importSet.add("import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction"); - importSet.add("import org.broadinstitute.sting.queue.function.scattergather.Scatter"); baseClass += " with ScatterGatherableFunction"; } if (isGather) @@ -210,8 +223,10 @@ public class GATKExtensionsGenerator extends CommandLineProgram { freezeFieldOverride.append(String.format("}%n%n")); } + String importText = sortedImports.size() == 0 ? "" : NEWLINE + StringUtils.join(sortedImports, NEWLINE) + NEWLINE; + // see CLASS_TEMPLATE and TRAIT_TEMPLATE below - return String.format(scalaTemplate, packageName, StringUtils.join(sortedImports, NEWLINE), + return String.format(scalaTemplate, packageName, importText, className, baseClass, constructor, arguments, freezeFieldOverride, commandLine); } diff --git a/java/src/org/broadinstitute/sting/queue/function/scattergather/Scatter.java b/java/src/org/broadinstitute/sting/queue/function/scattergather/Scatter.java deleted file mode 100644 index 471f7fe29..000000000 --- a/java/src/org/broadinstitute/sting/queue/function/scattergather/Scatter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2010, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -package org.broadinstitute.sting.queue.function.scattergather; - -import java.lang.annotation.*; - -/** - * Specifies the class type of the CommandLineFunction to scatter an @Input - * Written in java because scala doesn't support RetentionPolicy.RUNTIME - */ -@Documented -@Inherited -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.FIELD}) -public @interface Scatter { - Class value(); -} diff --git a/scala/qscript/depristo/manySampleUGPerformance.scala b/scala/qscript/depristo/manySampleUGPerformance.scala index e265b6961..117763a79 100755 --- a/scala/qscript/depristo/manySampleUGPerformance.scala +++ b/scala/qscript/depristo/manySampleUGPerformance.scala @@ -16,7 +16,7 @@ val MERGED_DIR = new File("/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/ma trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { this.logging_level = "INFO"; this.jarFile = gatkJarFile; - this.intervals = new File(TARGET_INTERVAL); + this.intervals :+= new File(TARGET_INTERVAL); this.reference_sequence = referenceFile; this.jobQueue = "gsa"; this.et = Option(org.broadinstitute.sting.gatk.phonehome.GATKRunReport.PhoneHomeOption.STANDARD); diff --git a/scala/qscript/depristo/resequencingSamples1KG.scala b/scala/qscript/depristo/resequencingSamples1KG.scala index 61a3838a7..24def504f 100644 --- a/scala/qscript/depristo/resequencingSamples1KG.scala +++ b/scala/qscript/depristo/resequencingSamples1KG.scala @@ -20,7 +20,7 @@ class ManySampleUGPerformanceTesting extends QScript { trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { this.logging_level = "INFO"; this.jarFile = gatkJarFile; - this.intervals = new File(TARGET_INTERVAL); + this.intervals :+= new File(TARGET_INTERVAL); this.reference_sequence = referenceFile; this.jobQueue = "gsa"; this.et = Option(org.broadinstitute.sting.gatk.phonehome.GATKRunReport.PhoneHomeOption.STANDARD); diff --git a/scala/qscript/examples/ExampleUnifiedGenotyper.scala b/scala/qscript/examples/ExampleUnifiedGenotyper.scala index daf208359..3397d03a4 100644 --- a/scala/qscript/examples/ExampleUnifiedGenotyper.scala +++ b/scala/qscript/examples/ExampleUnifiedGenotyper.scala @@ -41,7 +41,7 @@ class ExampleUnifiedGenotyper extends QScript { trait UnifiedGenotyperArguments extends CommandLineGATK { this.jarFile = qscript.gatkJar this.reference_sequence = qscript.referenceFile - this.intervals = qscript.intervals + this.intervals :+= qscript.intervals // Some() is how you set the value for an scala Option. // Set the memory limit to 2 gigabytes on each command. this.memoryLimit = Some(2) diff --git a/scala/qscript/fullCallingPipeline.q b/scala/qscript/fullCallingPipeline.q index 38dd24994..bc73c23eb 100755 --- a/scala/qscript/fullCallingPipeline.q +++ b/scala/qscript/fullCallingPipeline.q @@ -64,7 +64,7 @@ class fullCallingPipeline extends QScript { private var pipeline: Pipeline = _ trait CommandLineGATKArgs extends CommandLineGATK { - this.intervals = qscript.pipeline.getProject.getIntervalList + this.intervals :+= qscript.pipeline.getProject.getIntervalList this.jarFile = qscript.gatkJar this.reference_sequence = qscript.pipeline.getProject.getReferenceFile this.memoryLimit = Some(4) @@ -118,10 +118,9 @@ class fullCallingPipeline extends QScript { realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId)) realigner.analysisName = "RealignBam_"+sampleId realigner.input_file = targetCreator.input_file - realigner.intervals = qscript.contigIntervals + realigner.intervals :+= qscript.contigIntervals realigner.targetIntervals = targetCreator.out realigner.scatterCount = contigCount - realigner.isIntermediate = true // may need to explicitly run fix mates var fixMates = new PicardBamJarFunction { @@ -134,7 +133,6 @@ class fullCallingPipeline extends QScript { } // realigner.out = cleaned_bam - // realigner.scatterClass = classOf[ContigScatterFunction] // realigner.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = qscript.picardFixMatesJar } // realigner.jobQueue = "week" @@ -142,9 +140,8 @@ class fullCallingPipeline extends QScript { if (realigner.scatterCount > 1) { realigner.out = cleaned_bam // While gathering run fix mates. - realigner.scatterClass = classOf[ContigScatterFunction] realigner.setupScatterFunction = { - case (scatter: ScatterFunction, _) => + case scatter: ScatterFunction => scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId)) scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId)) } @@ -167,6 +164,7 @@ class fullCallingPipeline extends QScript { } } else { realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam") + realigner.isIntermediate = true // Explicitly run fix mates if the function won't be scattered. @@ -175,7 +173,6 @@ class fullCallingPipeline extends QScript { fixMates.unfixed = realigner.out fixMates.fixed = cleaned_bam fixMates.analysisName = "FixMates_"+sampleId - fixMates.isIntermediate = true // Add the fix mates explicitly } @@ -183,7 +180,6 @@ class fullCallingPipeline extends QScript { samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId)) samtoolsindex.bamFile = cleaned_bam samtoolsindex.analysisName = "index_cleaned_"+sampleId - samtoolsindex.isIntermediate = true if (!qscript.skip_cleaning) { if ( realigner.scatterCount > 1 ) { @@ -238,7 +234,7 @@ class fullCallingPipeline extends QScript { snps.scatterCount = qscript.num_snp_scatter_jobs snps.setupScatterFunction = { - case (scatter: ScatterFunction, _) => + case scatter: ScatterFunction => scatter.commandDirectory = new File("SnpCalls/ScatterGather") scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter.out") } diff --git a/scala/qscript/kshakir/CleanBamFile.scala b/scala/qscript/kshakir/CleanBamFile.scala deleted file mode 100644 index 7d9e92da1..000000000 --- a/scala/qscript/kshakir/CleanBamFile.scala +++ /dev/null @@ -1,196 +0,0 @@ -import org.broadinstitute.sting.queue.extensions.firehose.ImportSingleValueFunction -import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction -import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction -import org.broadinstitute.sting.queue.QScript -import org.broadinstitute.sting.queue.extensions.gatk._ - -class CleanBamFile extends QScript { - qscript => - - @Argument(doc="gatk jar", shortName="gatk") - var gatkJar: File = _ - - @Argument(doc="samtools binary", shortName="samtools") - var samtoolsBinary: String = _ - - @Argument(doc="fix mates jar", shortName="fixMates") - var fixMatesJar: File = _ - - @Input(doc="Script that can merge text files, for example Sting/shell/mergeText.sh.", shortName="MTS") - var mergeTextScript: String = _ - - @Argument(doc="base name for output files", shortName="base") - var baseName: String = _ - - @Input(doc="reference genome", shortName="R") - var referenceFile: File = _ - - @Input(doc="recalibrated bam", shortName="I") - var recalibratedBam: File = _ - - @Argument(doc="read group blacklist conversion script that can convert firehose outputs to a GATK blacklist file.", shortName="RGBLS") - var readGroupBlackListScript: String = _ - - @Argument(doc="read group blacklist", shortName="RGBL", required=false) - var readGroupBlackList: String = _ - - @Argument(doc="intervals", shortName="L") - var intervals: File = _ - - @Argument(doc="Script that can split the interval file by contig, for example Sting/python/splitIntervalsByContig.py.", shortName="RTCSS") - var targetCreatorScatterScript: String = _ - - @Argument(doc="RealignerTargetCreator scatter count. " + - "Best if it is either 1 or the number of contigs in the interval list. " + - "If used the compute farm must also be used.", shortName="RTCSC") - var targetCreatorScatterCount = 0 - - @Argument(doc="Script that can split the intervals evenly, for example Sting/shell/splitIntervals.sh.", shortName="IRSS") - var indelRealignerScatterScript: String = _ - - @Argument(doc="IndelRealigner scatter count.", shortName="IRSC") - var indelRealignerScatterCount = 0 - - @Input(doc="dbsnp file", shortName="D") - var dbsnpFile: File = _ - - @Argument(doc="firehose import jar", shortName="importJar") - var firehoseImportJar: File = _ - - @Argument(doc="short job queue", shortName="shortQueue", required=false) - var shortJobQueue: String = _ - - @Argument(doc="firehose host", shortName="FHHost") - var firehoseHost: String = _ - - @Argument(doc="firehose port", shortName="FHPort") - var firehosePort: Int = _ - - @Argument(doc="firehose domain", shortName="FHDomain") - var firehoseDomain: String = _ - - @Argument(doc="clean bam firehose security token", shortName="FHToken") - var firehoseSecurityToken: String = _ - - @Argument(doc="clean bam firehose entity type", shortName="bamFHEType") - var bamFirehoseEntityType: String = _ - - @Argument(doc="clean bam firehose entity id", shortName="bamFHEID") - var bamFirehoseEntityID: String = _ - - @Argument(doc="clean bam firehose annotation type name", shortName="bamFHAnn") - var bamFirehoseAnnotationTypeName: String = _ - - trait GATKCommonArgs extends CommandLineGATK { - this.jarFile = qscript.gatkJar - this.reference_sequence = qscript.referenceFile - this.intervals = qscript.intervals - this.input_file :+= recalibratedBam - } - - def baseFile(suffix: String) = new File(baseName + suffix) - - def script = { - val blacklistConverter = new CommandLineFunction { - @Output(doc="blacklist file") var blacklistFile: File = _ - def commandLine = readGroupBlackListScript + " " + blacklistFile + " \"" + readGroupBlackList + "\"" - } - - if (readGroupBlackList != null) { - blacklistConverter.blacklistFile = baseFile(".blacklist.txt") - add(blacklistConverter) - } - - // -T RealignerTargetCreator -I -R -o .merged.intervals - val targetCreator = new RealignerTargetCreator with GATKCommonArgs - targetCreator.memoryLimit = Some(2) - targetCreator.read_group_black_list :+= blacklistConverter.blacklistFile - targetCreator.out = baseFile(".merged.intervals") - targetCreator.scatterCount = targetCreatorScatterCount - targetCreator.setupScatterFunction = { - case (scatter: IntervalScatterFunction, _) => - scatter.splitIntervalsScript = targetCreatorScatterScript - } - targetCreator.setupGatherFunction = { - case (gather: SimpleTextGatherFunction, _) => - gather.mergeTextScript = mergeTextScript - } - - // -T IndelRealigner -I -R -stats .indel.stats - // -O .unfixed.cleaned.bam -maxInRam 200000 -targetIntervals -D - val realigner = new IndelRealigner with GATKCommonArgs - realigner.memoryLimit = Some(4) - realigner.read_group_black_list :+= blacklistConverter.blacklistFile - realigner.statisticsFileForDebugging = baseFile(".indel.stats") - realigner.maxReadsInRam = Some(200000) - realigner.targetIntervals = targetCreator.out - realigner.DBSNP = dbsnpFile - realigner.scatterCount = indelRealignerScatterCount - - var fixedBam: File = null - - if (realigner.scatterCount > 1) { - realigner.out = baseFile(".cleaned.bam") - // While gathering run fix mates. - realigner.setupScatterFunction = { - case (scatter: IntervalScatterFunction, _) => - scatter.splitIntervalsScript = indelRealignerScatterScript - } - realigner.setupGatherFunction = { - case (gather: BamGatherFunction, _) => - gather.memoryLimit = Some(4) - gather.jarFile = fixMatesJar - // Don't pass this AS=true to fix mates! - gather.assumeSorted = None - case (gather: SimpleTextGatherFunction, _) => - gather.mergeTextScript = mergeTextScript - } - - fixedBam = realigner.out - } else { - realigner.out = baseFile(".unfixed.cleaned.bam") - - // Explicitly run fix mates if the function won't be scattered. - var fixMates = new PicardBamJarFunction { - // Declare inputs/outputs for dependency tracking. - @Input(doc="unfixed bam") var unfixed: File = _ - @Output(doc="fixed bam") var fixed: File = _ - def inputBams = List(unfixed) - def outputBam = fixed - } - fixMates.memoryLimit = Some(4) - fixMates.jarFile = fixMatesJar - fixMates.unfixed = realigner.out - fixMates.fixed = baseFile(".cleaned.bam") - - fixedBam = fixMates.fixed - - // Add the fix mates explicitly - add(fixMates) - } - - val bamIndex = new SamtoolsIndexFunction - bamIndex.samtools = samtoolsBinary - bamIndex.bamFile = fixedBam - bamIndex.bamFileIndex = swapExt(fixedBam, "bam", "bam.bai") - - val importer = new ImportSingleValueFunction { - /** Files that this job should wait on before running. */ - @Input(doc="Explicit job dependencies", required=false) - var jobDependencies: List[File] = Nil - } - importer.jobQueue = shortJobQueue - importer.jarFile = firehoseImportJar - importer.host = firehoseHost - importer.port = firehosePort - importer.domain = firehoseDomain - importer.securityToken = firehoseSecurityToken - importer.entityType = bamFirehoseEntityType - importer.entityID = bamFirehoseEntityID - importer.annotationTypeName = bamFirehoseAnnotationTypeName - importer.importValue = fixedBam - importer.jobDependencies :+= bamIndex.bamFileIndex - - add(targetCreator, realigner, bamIndex, importer) - } -} diff --git a/scala/qscript/kshakir/CleanBamFile.sh b/scala/qscript/kshakir/CleanBamFile.sh deleted file mode 100755 index 98def9efe..000000000 --- a/scala/qscript/kshakir/CleanBamFile.sh +++ /dev/null @@ -1,84 +0,0 @@ -#!/bin/sh - -ENTITY_TYPE=$1 -ENTITY_ID=$2 -FIREHOSE_TOKEN=$3 - -FIREHOSE_WORKSPACE=trunk -FIREHOSE_HOST=firehose -FIREHOSE_PORT=8080 -FIREHOSE_DOMAIN=gsa -CLEAN_BAM_ANNOTATION=clean_bam_file -DEFAULT_QUEUE=gsa -SHORT_QUEUE=hour -FIREHOSE_SOURCE_HOME=/humgen/gsa-firehose/firehose/source -STING_HOME=$FIREHOSE_SOURCE_HOME/Sting -CGA_HOME=$FIREHOSE_SOURCE_HOME/CancerGenomeAnalysis -RUN=-run -TMP_DIR=/broad/shptmp/$USER -JOB_QUEUE=gsa -INTERVAL_COUNT=15 -REALIGNER_COUNT=50 -QUEUE_JAR=$STING_HOME/dist/Queue.jar -GATK_JAR=$STING_HOME/dist/GenomeAnalysisTK.jar -FIX_MATES_JAR=/seq/software/picard/1.194/bin/FixMateInformation.jar -SAMTOOLS=/seq/dirseq/samtools/samtools-0.1.7-5/samtools -FIREHOSE_IMPORT_JAR=$CGA_HOME/analysis_pipeline/tools/dist/ImportSingleValue.jar - -CLEAN_BAM_FILE_SCRIPT=$STING_HOME/scala/qscript/kshakir/CleanBamFile.scala -FIREHOSE_TEST_HARNESS="python $CGA_HOME/analysis_pipeline/scripts/firehose_test_harness.py" -SPLIT_BY_CONTIG="python $STING_HOME/python/splitIntervalsByContig.py" -SPLIT_BY_INTERVALS=$STING_HOME/shell/splitIntervals.sh -MERGE_TEXT=$STING_HOME/shell/mergeText.sh -CONVERT_BLACKLIST=$CGA_HOME/analysis_pipeline/genepattern/modules/ConvertDependentsList/convert_dependents_list.sh - -# Record the Sting version number. -svnversion $STING_HOME > stingversion.txt -# If Sting has been modified, record the differences. -grep -E '^[0-9]+$' stingversion.txt || svn diff $STING_HOME > stingdiff.txt - -# Record the CGA version number. -svnversion $CGA_HOME > cgaversion.txt -# If CGA has been modified, record the differences. -grep -E '^[0-9]+$' cgaversion.txt || svn diff $CGA_HOME > cgadiff.txt - -# Try to retrieve the blacklist. If it fails then set it to "". -$FIREHOSE_TEST_HARNESS -d $FIREHOSE_DOMAIN -w $FIREHOSE_WORKSPACE -n $ENTITY_ID -t $ENTITY_TYPE 'BLACKLIST="${read_group_blacklist}"' && . firehose-populated-commands.sh || BLACKLIST="" - -# Retrieve all the required variables and run the pipeline in Queue. -$FIREHOSE_TEST_HARNESS -d $FIREHOSE_DOMAIN -w $FIREHOSE_WORKSPACE -n $ENTITY_ID -t $ENTITY_TYPE 'REFERENCE_FILE="${reference_file}";BAM_FILE="${recalibrated_bam_file}";DBSNP_FILE="${dbsnp_file}";INTERVAL_FILE="${interval_list}";DATABASE_ID="${database_id}"' && . firehose-populated-commands.sh && \ -\ -JOB_PREFIX=Q-$ENTITY_ID && \ -\ -java \ --Djava.io.tmpdir="$TMP_DIR" \ --jar "$QUEUE_JAR" \ --S "$CLEAN_BAM_FILE_SCRIPT" \ --bsub -bsubWait -skipUpToDate \ --jobQueue "$JOB_QUEUE" \ --jobPrefix "$JOB_PREFIX" \ --gatk "$GATK_JAR" \ --base "$ENTITY_ID" \ --R "$REFERENCE_FILE" \ --I "$BAM_FILE" \ --D "$DBSNP_FILE" \ --L "$INTERVAL_FILE" \ --RTCSS "$SPLIT_BY_CONTIG" \ --RTCSC "$INTERVAL_COUNT" \ --IRSS "$SPLIT_BY_INTERVALS" \ --IRSC "$REALIGNER_COUNT" \ --MTS "$MERGE_TEXT" \ --fixMates "$FIX_MATES_JAR" \ --samtools "$SAMTOOLS" \ --RGBLS "$CONVERT_BLACKLIST" \ --RGBL "$BLACKLIST" \ --importJar "$FIREHOSE_IMPORT_JAR" \ --shortQueue "$SHORT_QUEUE" \ --FHHost "$FIREHOSE_HOST" \ --FHPort "$FIREHOSE_PORT" \ --FHDomain "$FIREHOSE_DOMAIN" \ --FHToken "$FIREHOSE_TOKEN" \ --bamFHEType "$ENTITY_TYPE" \ --bamFHEID "$DATABASE_ID" \ --bamFHAnn "$CLEAN_BAM_ANNOTATION" \ -$RUN || ( echo Job failed. Check stdout.txt for more info. >&2; exit 1; ) diff --git a/scala/qscript/recalibrate.scala b/scala/qscript/recalibrate.scala index 25c6b2422..6d9464f4a 100755 --- a/scala/qscript/recalibrate.scala +++ b/scala/qscript/recalibrate.scala @@ -39,7 +39,7 @@ def script = { add(new CountCovariates(bamIn, recalData) { useOriginalQualities = true } ) val tableRecal = new TableRecalibrate(bamIn, recalData, recalBam) { useOriginalQualities = true } if ( scatter ) { - tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list") + tableRecal.intervals :+= new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list") //tableRecal.scatterClass = classOf[ContigScatterFunction] tableRecal.setupGatherFunction = { case (f: PicardBamJarFunction, _) => f.jarFile = picardMergeSamFilesJar; f.memoryLimit = Some(4) } tableRecal.scatterCount = 25 diff --git a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index ac8885f5a..c86a40f52 100755 --- a/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -26,8 +26,8 @@ class QCommandLine extends CommandLineProgram with Logging { @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) private var expandedDotFile: File = _ - @Argument(fullName="start_clean", shortName="clean", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) - private var startClean = false + @Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false) + private var startFromScratch = false @Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden private var runScripts = false @@ -38,12 +38,15 @@ class QCommandLine extends CommandLineProgram with Logging { @ArgumentCollection private val qSettings = new QSettings - @Argument(fullName="statusEmailFrom", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false) + @Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false) private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName - @Argument(fullName="statusEmailTo", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false) + @Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false) private var statusEmailTo: List[String] = Nil + @Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false) + private var deleteIntermediates = false + /** * Takes the QScripts passed in, runs their script() methods, retrieves their generated * functions, and then builds and runs a QGraph based on the dependencies. @@ -53,13 +56,14 @@ class QCommandLine extends CommandLineProgram with Logging { val qGraph = new QGraph qGraph.dryRun = !(run || runScripts) qGraph.bsubAllJobs = bsubAllJobs - qGraph.startClean = startClean + qGraph.startFromScratch = startFromScratch qGraph.dotFile = dotFile qGraph.expandedDotFile = expandedDotFile qGraph.qSettings = qSettings qGraph.debugMode = debugMode == true qGraph.statusEmailFrom = statusEmailFrom qGraph.statusEmailTo = statusEmailTo + qGraph.deleteIntermediates = deleteIntermediates val scripts = qScriptManager.createScripts() for (script <- scripts) { diff --git a/scala/src/org/broadinstitute/sting/queue/QScript.scala b/scala/src/org/broadinstitute/sting/queue/QScript.scala index 363e56831..3736abdaa 100755 --- a/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -16,7 +16,6 @@ trait QScript extends Logging { type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction type InProcessFunction = org.broadinstitute.sting.queue.function.InProcessFunction type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction - type Scatter = org.broadinstitute.sting.queue.function.scattergather.Scatter type Gather = org.broadinstitute.sting.queue.function.scattergather.Gather type SimpleTextGatherFunction = org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction diff --git a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 84d9b7e2b..b3bb52d98 100644 --- a/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -33,10 +33,10 @@ class FunctionEdge(var function: QFunction) extends QEdge { currentStatus = RunnerStatus.SKIPPED } - def resetToPending() = { + def resetToPending(cleanOutputs: Boolean) = { currentStatus = RunnerStatus.PENDING - function.doneOutputs.foreach(_.delete()) - function.failOutputs.foreach(_.delete()) + if (cleanOutputs) + function.deleteOutputs() } def inputs = function.inputs diff --git a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 7aa0920bb..1d7dda969 100755 --- a/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -21,11 +21,12 @@ import org.apache.commons.lang.StringUtils class QGraph extends Logging { var dryRun = true var bsubAllJobs = false - var startClean = false + var startFromScratch = false var dotFile: File = _ var expandedDotFile: File = _ var qSettings: QSettings = _ var debugMode = false + var deleteIntermediates = false var statusEmailFrom: String = _ var statusEmailTo: List[String] = _ @@ -127,6 +128,7 @@ class QGraph extends Logging { // build up the full DAG with scatter-gather jobs fillGraph logger.info("Checking pipeline status.") + updateGraphStatus(false) logStatus } @@ -241,24 +243,18 @@ class QGraph extends Logging { * Dry-runs the jobs by traversing the graph. */ private def dryRunJobs() = { - traverseFunctions(edge => { - edge.function match { - case qFunction => { - if (logger.isDebugEnabled) { - logger.debug(qFunction.commandDirectory + " > " + qFunction.description) - } else { - logger.info(qFunction.description) - } - logger.info("Output written to " + qFunction.jobOutputFile) - if (qFunction.jobErrorFile != null) { - logger.info("Errors written to " + qFunction.jobErrorFile) - } else { - if (logger.isDebugEnabled) - logger.info("Errors also written to " + qFunction.jobOutputFile) - } - } - } - }) + updateGraphStatus(false) + traverseFunctions(edge => logEdge(edge)) + } + + private def logEdge(edge: FunctionEdge) = { + logger.info("-------") + logger.info(StringUtils.capitalize(edge.status.toString) + ": " + edge.function.description) + if (logger.isDebugEnabled) + logger.debug(edge.function.commandDirectory + " > " + edge.function.description) + logger.info("Log: " + edge.function.jobOutputFile.getAbsolutePath) + if (edge.function.jobErrorFile != null) + logger.info("Error: " + edge.function.jobErrorFile.getAbsolutePath) } /** @@ -266,12 +262,11 @@ class QGraph extends Logging { */ private def runJobs() = { try { - traverseFunctions(edge => { - if (startClean) - edge.resetToPending() - else - checkDone(edge) - }) + if (startFromScratch) { + logger.info("Removing outputs from previous runs.") + foreachFunction(_.resetToPending(true)) + } else + updateGraphStatus(true) var readyJobs = getReadyJobs var runningJobs = Set.empty[FunctionEdge] @@ -303,6 +298,8 @@ class QGraph extends Logging { Thread.sleep(30000L) readyJobs = getReadyJobs } + + deleteIntermediateOutputs() } catch { case e => logger.error("Uncaught error running jobs.", e) @@ -312,13 +309,22 @@ class QGraph extends Logging { } } + /** + * Updates the status of edges in the graph. + * @param cleanOutputs If true will delete outputs when setting edges to pending. + */ + private def updateGraphStatus(cleanOutputs: Boolean) = { + traverseFunctions(edge => checkDone(edge, cleanOutputs)) + } + /** * Checks if an edge is done or if it's an intermediate edge if it can be skipped. * This function may modify previous edges if it discovers that the edge passed in * is dependent jobs that were previously marked as skipped. * @param edge Edge to check to see if it's done or can be skipped. + * @param cleanOutputs If true will delete outputs when setting edges to pending. */ - private def checkDone(edge: FunctionEdge) = { + private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) = { if (edge.function.isIntermediate) { // By default we do not need to run intermediate edges. // Mark any intermediate edges as skipped, if they're not already done. @@ -329,8 +335,8 @@ class QGraph extends Logging { val isDone = edge.status == RunnerStatus.DONE && previous.forall(edge => edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED) if (!isDone) { - edge.resetToPending() - resetPreviousSkipped(edge, previous) + edge.resetToPending(cleanOutputs) + resetPreviousSkipped(edge, previous, cleanOutputs) } } } @@ -341,11 +347,12 @@ class QGraph extends Logging { * to pending. * @param edge Dependent edge. * @param previous Previous edges that provide inputs to edge. + * @param cleanOutputs If true will clean up the output files when resetting skipped jobs to pending. */ - private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge]): Unit = { + private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean): Unit = { for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) { - previousEdge.resetToPending() - resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge)) + previousEdge.resetToPending(cleanOutputs) + resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge), cleanOutputs) } } @@ -468,7 +475,7 @@ class QGraph extends Logging { foreachFunction(edge => { val name = edge.function.analysisName if (name != null) { - updateStatus(statuses.find(_.analysisName == name) match { + updateAnalysisStatus(statuses.find(_.analysisName == name) match { case Some(status) => status case None => val status = new AnalysisStatus(name) @@ -484,11 +491,13 @@ class QGraph extends Logging { val sgDone = status.scatter.done + status.gather.done val sgFailed = status.scatter.failed + status.gather.failed val sgSkipped = status.scatter.skipped + status.gather.skipped + val gatherTotal = status.gather.total + val gatherDone = status.gather.done if (sgTotal > 0) { var sgStatus = RunnerStatus.PENDING if (sgFailed > 0) sgStatus = RunnerStatus.FAILED - else if (sgDone == sgTotal) + else if (gatherDone == gatherTotal) sgStatus = RunnerStatus.DONE else if (sgDone + sgSkipped == sgTotal) sgStatus = RunnerStatus.SKIPPED @@ -510,7 +519,7 @@ class QGraph extends Logging { /** * Updates a status map with scatter/gather status information (e.g. counts) */ - private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = { + private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) = { if (edge.function.isInstanceOf[GatherFunction]) { updateSGStatus(stats.gather, edge) } else if (edge.function.isInstanceOf[CloneFunction]) { @@ -671,7 +680,19 @@ class QGraph extends Logging { } }) iterator.foreach(_ => {}) - } + } + + private def deleteIntermediateOutputs() = { + if (this.deleteIntermediates && !hasFailed) { + logger.info("Deleting intermediate files.") + traverseFunctions(edge => { + if (edge.function.isIntermediate) { + logger.debug("Deleting intermediates:" + edge.function.description) + edge.function.deleteOutputs() + } + }) + } + } /** * Outputs the graph to a .dot file. @@ -705,13 +726,8 @@ class QGraph extends Logging { def logFailed = { foreachFunction(edge => { - if (edge.status == RunnerStatus.FAILED) { - logger.error("-----") - logger.error("Failed: " + edge.function.description) - logger.error("Log: " + edge.function.jobOutputFile.getAbsolutePath) - if (edge.function.jobErrorFile != null) - logger.error("Error: " + edge.function.jobErrorFile.getAbsolutePath) - } + if (edge.status == RunnerStatus.FAILED) + logEdge(edge) }) } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala index ff8cc4c7c..2fb327e3f 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunction.scala @@ -1,8 +1,6 @@ package org.broadinstitute.sting.queue.extensions.gatk -import org.broadinstitute.sting.queue.function.InProcessFunction import org.broadinstitute.sting.commandline.ArgumentSource -import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, ScatterFunction} import org.broadinstitute.sting.utils.interval.IntervalUtils import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceDataSource import java.io.File @@ -10,6 +8,10 @@ import net.sf.picard.util.IntervalList import net.sf.samtools.SAMFileHeader import collection.JavaConversions._ import org.broadinstitute.sting.utils.{GenomeLoc, GenomeLocSortedSet, GenomeLocParser} +import org.broadinstitute.sting.queue.util.IOUtils +import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction} +import org.broadinstitute.sting.queue.function.{QFunction, InProcessFunction} +import org.broadinstitute.sting.queue.QException /** * An interval scatter function. @@ -18,19 +20,45 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { var splitByContig = false private var referenceSequence: File = _ + private var intervalField: ArgumentSource = _ private var intervals: List[String] = Nil - override def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = { + def isScatterGatherable(originalFunction: ScatterGatherableFunction) = { + if (originalFunction.isInstanceOf[CommandLineGATK]) { + val gatk = originalFunction.asInstanceOf[CommandLineGATK] + gatk.reference_sequence != null + } else false + } + + def setScatterGatherable(originalFunction: ScatterGatherableFunction) = { val gatk = originalFunction.asInstanceOf[CommandLineGATK] - referenceSequence = gatk.reference_sequence - intervals = gatk.intervalsString - if (gatk.intervals != null) - intervals ::= gatk.intervals.toString + this.referenceSequence = gatk.reference_sequence + this.intervals ++= gatk.intervalsString + this.intervals ++= gatk.intervals.map(_.toString) + this.intervalField = QFunction.findField(originalFunction.getClass, "intervals") + } + + def initCloneInputs(cloneFunction: CloneFunction, index: Int) = { + cloneFunction.setFieldValue(this.intervalField, List(new File("scatter.intervals"))) + } + + def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = { + val scatterPart = cloneFunction.getFieldValue(this.intervalField) + .asInstanceOf[List[File]] + .map(file => IOUtils.subDir(cloneFunction.commandDirectory, file)) + cloneFunction.setFieldValue(this.intervalField, scatterPart) + this.scatterParts ++= scatterPart } def run() = { - val referenceSource = new ReferenceDataSource(referenceSequence) - GenomeLocParser.setupRefContigOrdering(referenceSource.getReference); + IntervalScatterFunction.scatter(this.intervals, this.scatterParts, this.referenceSequence, this.splitByContig) + } +} + +object IntervalScatterFunction { + def scatter(intervals: List[String], scatterParts: List[File], reference: File, splitByContig: Boolean) = { + val referenceSource = new ReferenceDataSource(reference) + GenomeLocParser.setupRefContigOrdering(referenceSource.getReference) val locs = { // TODO: Abstract genome analysis engine has richer logic for parsing. We need to use it! if (intervals.size == 0) { @@ -47,6 +75,9 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { var fileIndex = -1 var locIndex = 0 + if (locs == null || locs.size == 0) + throw new QException("Locs produced an empty interval list: " + intervals.mkString(", ")) + if (splitByContig) { var contig: String = null for (loc <- locs) { @@ -54,29 +85,48 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction { if (fileIndex >= 0) intervalList.write(scatterParts(fileIndex)) fileIndex += 1 + contig = loc.getContig intervalList = new IntervalList(fileHeader) } locIndex += 1 intervalList.add(toInterval(loc, locIndex)) } intervalList.write(scatterParts(fileIndex)) + if ((fileIndex + 1) != scatterParts.size) + throw new QException("Only able to write contigs into %d of %d files.".format(fileIndex + 1, scatterParts.size)) } else { - var locsPerFile = locs.size / this.scatterParts.size - if (locs.size % this.scatterParts.size != 0) locsPerFile += 1 + var locsPerFile = locs.size / scatterParts.size + val locRemainder = locs.size % scatterParts.size + + // At the start, put an extra loc per file + locsPerFile += 1 + var locsLeftFile = 0 + for (loc <- locs) { - if (locIndex % locsPerFile == 0) { + if (locsLeftFile == 0) { if (fileIndex >= 0) intervalList.write(scatterParts(fileIndex)) + fileIndex += 1 intervalList = new IntervalList(fileHeader) + + // When we have put enough locs into each file, + // reduce the number of locs per file back + // to the original calculated value. + if (fileIndex == locRemainder) + locsPerFile -= 1 + locsLeftFile = locsPerFile } + locsLeftFile -= 1 locIndex += 1 intervalList.add(toInterval(loc, locIndex)) } intervalList.write(scatterParts(fileIndex)) + if ((fileIndex + 1) != scatterParts.size) + throw new QException("Only able to write intervals into %d of %d files.".format(fileIndex + 1, scatterParts.size)) } } private def toInterval(loc: GenomeLoc, locIndex: Int) = - new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, true, "interval_" + locIndex) + new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, false, "interval_" + locIndex) } diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala new file mode 100644 index 000000000..c271d5eca --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -0,0 +1,46 @@ +package org.broadinstitute.sting.queue.extensions.gatk + +import org.broadinstitute.sting.queue.function.InProcessFunction +import org.broadinstitute.sting.queue.QException +import org.broadinstitute.sting.queue.function.scattergather.GatherFunction +import java.io.{FileReader, PrintWriter} +import org.apache.commons.io.{LineIterator, IOUtils, FileUtils} + +/** + * Merges a vcf text file. + */ +class VcfGatherFunction extends GatherFunction with InProcessFunction { + def run() = { + if (gatherParts.size < 1) { + throw new QException("No files to gather to output: " + originalOutput) + } else { + val writer = new PrintWriter(originalOutput) + try { + var reader = new FileReader(gatherParts(0)) + try { + IOUtils.copy(reader, writer) + } finally { + IOUtils.closeQuietly(reader) + } + + for (file <- gatherParts.tail) { + var inHeaders = true + val itor = FileUtils.lineIterator(file) + try { + while (itor.hasNext) { + val nextLine = itor.nextLine + if (inHeaders && nextLine(0) != '#') + inHeaders = false + if (!inHeaders) + writer.println(nextLine) + } + } finally { + LineIterator.closeQuietly(itor) + } + } + } finally { + IOUtils.closeQuietly(writer) + } + } + } +} \ No newline at end of file diff --git a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 695fe629b..4888749e9 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -152,6 +152,15 @@ trait QFunction { dirs } + /** + * Deletes the output files and all the status files for this function. + */ + def deleteOutputs() = { + outputs.foreach(_.delete()) + doneOutputs.foreach(_.delete()) + failOutputs.foreach(_.delete()) + } + /** * Creates the output directories for this function if it doesn't exist. */ @@ -416,6 +425,19 @@ object QFunction { */ private var classFieldsMap = Map.empty[Class[_], ClassFields] + /** + * Returns the field on clazz. + * @param clazz Class to search. + * @param name Name of the field to return. + * @return Argument source for the field. + */ + def findField(clazz: Class[_], name: String) = { + classFields(clazz).functionFields.find(_.field.getName == name) match { + case Some(source) => source + case None => throw new QException("Could not find a field on class %s with name %s".format(clazz, name)) + } + } + /** * Returns the fields for a class. * @param clazz Class to retrieve fields for. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index 16748bf6a..b7451f69e 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.function.scattergather import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.commandline.ArgumentSource import java.io.File +import org.broadinstitute.sting.queue.QException /** * Shadow clones another command line function. @@ -61,7 +62,7 @@ class CloneFunction extends CommandLineFunction { } } - override def setFieldValue(source: ArgumentSource, value: Any) = { + override def setFieldValue(source: ArgumentSource, value: Any): Unit = { source.field.getName match { case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File] case "jobErrorFile" => jobErrorFile = value.asInstanceOf[File] diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index e9649adc8..8bf5d7082 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -1,7 +1,7 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File -import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.commandline.{Input, Output} import org.broadinstitute.sting.queue.function.QFunction /** @@ -13,19 +13,4 @@ trait GatherFunction extends QFunction { @Output(doc="The original output of the scattered function") var originalOutput: File = _ - - /** - * Sets the original function used to create this scatter function. - * @param originalFunction The ScatterGatherableFunction. - * @param gatherField The field being gathered. - */ - def setOriginalFunction(originalFunction: ScatterGatherableFunction, gatherField: ArgumentSource) = {} - - /** - * Sets the clone function creating one of the inputs for this gather function. - * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. - * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. - * @param gatherField The field to be gathered. - */ - def setCloneFunction(cloneFunction: CloneFunction, index: Int, gatherField: ArgumentSource) = {} } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index bf6d026b5..86e9ab921 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -1,31 +1,47 @@ package org.broadinstitute.sting.queue.function.scattergather import java.io.File -import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output} +import org.broadinstitute.sting.commandline.{Input, Output} import org.broadinstitute.sting.queue.function.QFunction /** - * Base class for Scatter command line functions. + * Base class for Scatter functions. */ trait ScatterFunction extends QFunction { - @Input(doc="Original input to scatter") - var originalInput: File = _ + @Input(doc="Original inputs to scatter") + var originalInputs: Set[File] = _ - @Output(doc="Scattered parts of the original input, one per temp directory") + @Output(doc="Scattered parts of the original inputs, one set per temp directory") var scatterParts: List[File] = Nil /** - * Sets the original function used to create this scatter function. - * @param originalFunction The ScatterGatherableFunction. - * @param scatterField The field being scattered. + * Returns true if the scatter function can scatter this original function. + * @param originalFunction The original function to check. + * @return true if the scatter function can scatter this original function. */ - def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = {} + def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean /** - * Sets the clone function using one of the outputs of this scatter function. - * @param cloneFunction The clone wrapper for the original ScatterGatherableFunction. - * @param index The one based index (from 1..scatterCount inclusive) of the scatter piece. - * @param scatterField The field being scattered. + * Sets the original ScatterGatherableFunction to be scattered. + * @param originalFunction The original function to with inputs bind to this scatter function. */ - def setCloneFunction(cloneFunction: CloneFunction, index: Int, scatterField: ArgumentSource) = {} + def setScatterGatherable(originalFunction: ScatterGatherableFunction) + + /** + * Initializes the input fields for the clone function. + * The input values should be set to their defaults + * and may be changed by the user. + * @param cloneFunction CloneFunction to initialize. + * @param index The one based scatter index. + */ + def initCloneInputs(cloneFunction: CloneFunction, index: Int) + + /** + * Binds the input fields for the clone function to this scatter function. + * The input values should be set to their absolute values and added + * to scatter parts. + * @param cloneFunction CloneFunction to bind. + * @param index The one based scatter index. + */ + def bindCloneInputs(cloneFunction: CloneFunction, index: Int) } diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 2aebfef3d..4bc270436 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -4,6 +4,7 @@ import java.io.File import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.commandline.ArgumentSource import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} +import org.broadinstitute.sting.queue.QException /** * A function that can be run faster by splitting it up into pieces and then joining together the results. @@ -29,9 +30,8 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Allows external modification of the ScatterFunction that will create the scatter pieces in the temporary directories. * @param scatterFunction The function that will create the scatter pieces in the temporary directories. - * @param scatterField The input field being scattered. */ - var setupScatterFunction: PartialFunction[(ScatterFunction, ArgumentSource), Unit] = _ + var setupScatterFunction: PartialFunction[ScatterFunction, Unit] = _ /** * Allows external modification of the GatherFunction that will collect the gather pieces in the temporary directories. @@ -50,10 +50,10 @@ trait ScatterGatherableFunction extends CommandLineFunction { /** * Returns true if the function is ready to be scatter / gathered. * The base implementation checks if the scatter count is greater than one, - * and that the scatter field has a value. + * and that the scatter function can scatter this instance. * @return true if the function is ready to be scatter / gathered. */ - def scatterGatherable = this.scatterCount > 1 && hasFieldValue(this.scatterField) + def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable(this) /** * Returns a list of scatter / gather and clones of this function @@ -68,17 +68,15 @@ trait ScatterGatherableFunction extends CommandLineFunction { val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_)) // Only gather up fields that will have a value val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_)) - // The field containing the file to split - val originalInput = getFieldFile(scatterField) // Create the scatter function based on @Scatter - val scatterFunction = this.newScatterFunction(this.scatterField) syncFunction(scatterFunction) scatterFunction.addOrder = this.addOrder :+ 1 - scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName) - scatterFunction.originalInput = originalInput - scatterFunction.setOriginalFunction(this, scatterField) - initScatterFunction(scatterFunction, this.scatterField) + scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter") + scatterFunction.originalInputs = this.inputs + scatterFunction.isIntermediate = true + scatterFunction.setScatterGatherable(this) + initScatterFunction(scatterFunction) functions :+= scatterFunction // Create the gather functions for each output field @@ -92,7 +90,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { gatherFunction.addOrder = this.addOrder :+ gatherAddOrder gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) gatherFunction.originalOutput = this.getFieldFile(gatherField) - gatherFunction.setOriginalFunction(this, gatherField) + gatherFunction.isIntermediate = this.isIntermediate initGatherFunction(gatherFunction, gatherField) functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction @@ -110,11 +108,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { cloneFunction.index = i cloneFunction.addOrder = this.addOrder :+ (i+1) cloneFunction.memoryLimit = this.memoryLimit + cloneFunction.isIntermediate = true // Setup the fields on the clone function, outputting each as a relative file in the sg directory. cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i) - var scatterPart = new File(originalInput.getName) - cloneFunction.setFieldValue(scatterField, scatterPart) + scatterFunction.initCloneInputs(cloneFunction, i) for (gatherField <- outputFieldsWithValues) { val gatherPart = new File(gatherOutputs(gatherField).getName) cloneFunction.setFieldValue(gatherField, gatherPart) @@ -124,9 +122,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { initCloneFunction(cloneFunction, i) // Get absolute paths to the files and bind the sg functions to the clone function via the absolute paths. - scatterPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(scatterField)) - cloneFunction.setFieldValue(scatterField, scatterPart) - scatterFunction.scatterParts :+= scatterPart + scatterFunction.bindCloneInputs(cloneFunction, i) for (gatherField <- outputFieldsWithValues) { val gatherPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(gatherField)) cloneFunction.setFieldValue(gatherField, gatherPart) @@ -137,7 +133,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { } functions ++= cloneFunctions - // Return all the various functions we created + // Return all the various created functions. functions } @@ -155,34 +151,24 @@ trait ScatterGatherableFunction extends CommandLineFunction { } /** - * Retrieves the scatter field from the first field that has the annotation @Scatter. + * Creates a new ScatterFunction. + * @return A ScatterFunction instantiated scatterClass */ - protected lazy val scatterField = - this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get - - /** - * Creates a new ScatterFunction for the scatterField. - * @param scatterField Field that defined @Scatter. - * @return A ScatterFunction instantiated from @Scatter or scatterClass if scatterClass was set on this ScatterGatherableFunction. - */ - protected def newScatterFunction(scatterField: ArgumentSource): ScatterFunction = { - var scatterClass = this.scatterClass - if (scatterClass == null) - scatterClass = ReflectionUtils.getAnnotation(scatterField.field, classOf[Scatter]) - .value.asSubclass(classOf[ScatterFunction]) - scatterClass.newInstance.asInstanceOf[ScatterFunction] + protected def newScatterFunction(): ScatterFunction = { + if (this.scatterClass == null) + throw new QException("scatterClass is null.") + this.scatterClass.newInstance.asInstanceOf[ScatterFunction] } /** * Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories. * Calls setupScatterFunction with scatterFunction. * @param scatterFunction The function that will create the scatter pieces in the temporary directories. - * @param scatterField The input field being scattered. */ - protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = { + protected def initScatterFunction(scatterFunction: ScatterFunction) = { if (this.setupScatterFunction != null) - if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField)) - this.setupScatterFunction(scatterFunction, scatterField) + if (this.setupScatterFunction.isDefinedAt(scatterFunction)) + this.setupScatterFunction(scatterFunction) } /** @@ -236,7 +222,6 @@ trait ScatterGatherableFunction extends CommandLineFunction { * @param newFunction newly created function. */ protected def syncFunction(newFunction: QFunction) = { - newFunction.isIntermediate = this.isIntermediate newFunction.analysisName = this.analysisName newFunction.qSettings = this.qSettings newFunction.jobTempDir = this.jobTempDir @@ -249,6 +234,11 @@ trait ScatterGatherableFunction extends CommandLineFunction { } } + /** + * The scatter function. + */ + private lazy val scatterFunction = newScatterFunction() + /** * Returns a temporary directory under this scatter gather directory. * @param Sub directory under the scatter gather directory. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala index 6e35221a5..cdd3c2ab2 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/SimpleTextGatherFunction.scala @@ -1,23 +1,14 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.commandline.Argument import org.broadinstitute.sting.queue.function.InProcessFunction -import org.apache.commons.io.FileUtils import org.broadinstitute.sting.queue.QException -import collection.JavaConversions._ import java.io.PrintWriter +import org.apache.commons.io.{LineIterator, IOUtils, FileUtils} /** * Merges a text file. - * The script can be changed by setting rmdirScript. - * By default uses mergeText.sh in Sting/shell. - * The format of the call is [.. ] */ class SimpleTextGatherFunction extends GatherFunction with InProcessFunction { - @Argument(doc="merge text script") - var mergeTextScript = "mergeText.sh" - - def run() = { if (gatherParts.size < 1) { throw new QException("No files to gather to output: " + originalOutput) @@ -25,38 +16,48 @@ class SimpleTextGatherFunction extends GatherFunction with InProcessFunction { FileUtils.copyFile(gatherParts(0), originalOutput) } else { val writer = new PrintWriter(originalOutput) - var startLine = 0 + try { + var startLine = 0 - val readerA = FileUtils.lineIterator(gatherParts(0)) - val readerB = FileUtils.lineIterator(gatherParts(1)) - var headersMatch = true - while (headersMatch) { - if (readerA.hasNext && readerB.hasNext) { - val headerA = readerA.nextLine - val headerB = readerB.nextLine - headersMatch = headerA == headerB - if (headersMatch) { - startLine += 1 - writer.println(headerA) + val readerA = FileUtils.lineIterator(gatherParts(0)) + val readerB = FileUtils.lineIterator(gatherParts(1)) + try { + var headersMatch = true + while (headersMatch) { + if (readerA.hasNext && readerB.hasNext) { + val headerA = readerA.nextLine + val headerB = readerB.nextLine + headersMatch = headerA == headerB + if (headersMatch) { + startLine += 1 + writer.println(headerA) + } + } else { + headersMatch = false + } } - } else { - headersMatch = false + } finally { + LineIterator.closeQuietly(readerA) + LineIterator.closeQuietly(readerB) } - } - readerA.close - readerB.close - for (file <- gatherParts) { - val reader = FileUtils.lineIterator(file) - var lineNum = 0 - while (reader.hasNext && lineNum < startLine) { - reader.nextLine - lineNum += 1 + for (file <- gatherParts) { + val reader = FileUtils.lineIterator(file) + try { + var lineNum = 0 + while (reader.hasNext && lineNum < startLine) { + reader.nextLine + lineNum += 1 + } + while (reader.hasNext) + writer.println(reader.nextLine) + } finally { + LineIterator.closeQuietly(reader) + } } - while (reader.hasNext) - writer.println(reader.nextLine) + } finally { + IOUtils.closeQuietly(writer) } - writer.close } } } diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala index bbee47f44..41506b825 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/BamProcessing.scala @@ -23,7 +23,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) { trait StandardCommandLineGATK extends CommandLineGATK { this.reference_sequence = library.attributes.getProject.getReferenceFile - this.intervals = library.attributes.getProject.getIntervalList + this.intervals :+= library.attributes.getProject.getIntervalList this.DBSNP = library.attributes.getProject.getDbsnpFile this.memoryLimit = Some(2) this.jarFile = library.gatkJar @@ -35,7 +35,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) { */ def StandardRealignerTargetCreator(bam: File, contigs: List[String], output: File) : RealignerTargetCreator = { var rtc = new RealignerTargetCreator with StandardCommandLineGATK - rtc.intervals = null + rtc.intervals = Nil rtc.intervalsString = contigs rtc.input_file :+= bam rtc.out = output @@ -51,7 +51,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) { def StandardIndelCleaner(bam: File, contigs: List[String], targets: File, outBam: File) : IndelRealigner = { var realigner = new IndelRealigner with StandardCommandLineGATK realigner.intervalsString = contigs - realigner.intervals = null + realigner.intervals = Nil realigner.input_file :+= bam realigner.out = outBam realigner.targetIntervals = targets diff --git a/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala b/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala index 81d1ee1ab..a0fd91351 100755 --- a/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala +++ b/scala/src/org/broadinstitute/sting/queue/pipeline/VariantCalling.scala @@ -23,7 +23,7 @@ class VariantCalling(yaml: File,gatkJar: File) { */ trait StandardCommandLineGATK extends CommandLineGATK { this.reference_sequence = vc.attributes.getProject.getReferenceFile - this.intervals = vc.attributes.getProject.getIntervalList + this.intervals :+= vc.attributes.getProject.getIntervalList this.DBSNP = vc.attributes.getProject.getDbsnpFile // set global memory limit on the low side. Additional input bams will affect it. this.memoryLimit = Some(2) diff --git a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala index 6f6ffdcc7..f6a174dd6 100644 --- a/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala +++ b/scala/src/org/broadinstitute/sting/queue/util/ReflectionUtils.scala @@ -19,6 +19,19 @@ object ReflectionUtils { */ def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null + /** + * Returns true if clazz or one of its superclasses has the annotation. + * @param clazz Class to check. + * @param annotation Class of the annotation to look for. + * @return true if field has the annotation. + */ + def hasAnnotation(clazz: Class[_], annotation: Class[_ <: Annotation]) = { + var foundAnnotation = false + while (!foundAnnotation && clazz != null) + foundAnnotation = (clazz.getAnnotation(annotation) != null) + foundAnnotation + } + /** * Gets the annotation or throws an exception if the annotation is not found. * @param field Field to check. @@ -26,9 +39,26 @@ object ReflectionUtils { * @return The annotation. */ def getAnnotation[T <: Annotation](field: Field, annotation: Class[T]): T = { - if (!hasAnnotation(field, annotation)) - throw new QException("Field %s is missing annotation %s".format(field, annotation)) - field.getAnnotation(annotation).asInstanceOf[T] + field.getAnnotation(annotation) match { + case null => + throw new QException("Field %s is missing annotation %s".format(field, annotation)) + case fieldAnnotation => fieldAnnotation.asInstanceOf[T] + } + } + + /** + * Gets the annotation or throws an exception if the annotation is not found. + * @param clazz Class to check. + * @param annotation Class of the annotation to look for. + * @return The annotation. + */ + def getAnnotation[T <: Annotation](clazz: Class[_], annotation: Class[T]): T = { + var result: T = null.asInstanceOf[T] + while (result == null && clazz != null) + result = clazz.getAnnotation(annotation) + if (result == null) + throw new QException("Class %s is missing annotation %s".format(clazz, annotation)) + result } /** diff --git a/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala b/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala new file mode 100644 index 000000000..d541f1aa0 --- /dev/null +++ b/scala/test/org/broadinstitute/sting/queue/extensions/gatk/IntervalScatterFunctionUnitTest.scala @@ -0,0 +1,234 @@ +package org.broadinstitute.sting.queue.extensions.gatk + +import collection.JavaConversions._ +import java.io.File +import org.junit.{Before, Assert, Test} +import org.broadinstitute.sting.BaseTest +import org.broadinstitute.sting.utils.interval.IntervalUtils +import org.broadinstitute.sting.utils.GenomeLocParser +import org.broadinstitute.sting.queue.QException +import net.sf.picard.reference.IndexedFastaSequenceFile + +class IntervalScatterFunctionUnitTest extends BaseTest { + private def reference = new File(BaseTest.b36KGReference) + private var header: IndexedFastaSequenceFile = _ + + @Before + def setup() { + header = new IndexedFastaSequenceFile(reference) + GenomeLocParser.setupRefContigOrdering(header.getSequenceDictionary()) + } + + @Test + def testBasicScatter = { + val chr1 = GenomeLocParser.parseGenomeInterval("1") + val chr2 = GenomeLocParser.parseGenomeInterval("2") + val chr3 = GenomeLocParser.parseGenomeInterval("3") + + val files = (1 to 3).toList.map(index => new File(testDir + "basic." + index + ".intervals")) + + IntervalScatterFunction.scatter(List("1", "2", "3"), files, reference, false) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(1, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3, locs3.get(0)) + } + + @Test + def testScatterLessFiles = { + val chr1 = GenomeLocParser.parseGenomeInterval("1") + val chr2 = GenomeLocParser.parseGenomeInterval("2") + val chr3 = GenomeLocParser.parseGenomeInterval("3") + val chr4 = GenomeLocParser.parseGenomeInterval("4") + + val files = (1 to 3).toList.map(index => new File(testDir + "less." + index + ".intervals")) + + IntervalScatterFunction.scatter(List("1", "2", "3", "4"), files, reference, false) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(2, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2, locs1.get(1)) + Assert.assertEquals(chr3, locs2.get(0)) + Assert.assertEquals(chr4, locs3.get(0)) + } + + @Test(expected=classOf[QException]) + def testScatterMoreFiles = { + val files = (1 to 3).toList.map(index => new File(testDir + "more." + index + ".intervals")) + IntervalScatterFunction.scatter(List("1", "2"), files, reference, false) + } + + @Test + def testScatterIntervals = { + val intervals = List("1:1-2", "1:4-5", "2:1-1", "3:2-2") + val chr1a = GenomeLocParser.parseGenomeInterval("1:1-2") + val chr1b = GenomeLocParser.parseGenomeInterval("1:4-5") + val chr2 = GenomeLocParser.parseGenomeInterval("2:1-1") + val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2") + + val files = (1 to 3).toList.map(index => new File(testDir + "split." + index + ".intervals")) + + IntervalScatterFunction.scatter(intervals, files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(2, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1a, locs1.get(0)) + Assert.assertEquals(chr1b, locs1.get(1)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3, locs3.get(0)) + } + + + @Test + def testBasicScatterByContig = { + val chr1 = GenomeLocParser.parseGenomeInterval("1") + val chr2 = GenomeLocParser.parseGenomeInterval("2") + val chr3 = GenomeLocParser.parseGenomeInterval("3") + + val files = (1 to 3).toList.map(index => new File(testDir + "contig_basic." + index + ".intervals")) + + IntervalScatterFunction.scatter(List("1", "2", "3"), files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(1, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3, locs3.get(0)) + } + + @Test + def testScatterByContigLessFiles = { + val chr1 = GenomeLocParser.parseGenomeInterval("1") + val chr2 = GenomeLocParser.parseGenomeInterval("2") + val chr3 = GenomeLocParser.parseGenomeInterval("3") + val chr4 = GenomeLocParser.parseGenomeInterval("4") + + val files = (1 to 3).toList.map(index => new File(testDir + "contig_less." + index + ".intervals")) + + IntervalScatterFunction.scatter(List("1", "2", "3", "4"), files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(1, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(2, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3, locs3.get(0)) + Assert.assertEquals(chr4, locs3.get(1)) + } + + @Test(expected=classOf[QException]) + def testScatterByContigMoreFiles = { + val files = (1 to 3).toList.map(index => new File(testDir + "contig_more." + index + ".intervals")) + IntervalScatterFunction.scatter(List("1", "2"), files, reference, true) + } + + @Test + def testScatterByContigIntervalsStart = { + val intervals = List("1:1-2", "1:4-5", "2:1-1", "3:2-2") + val chr1a = GenomeLocParser.parseGenomeInterval("1:1-2") + val chr1b = GenomeLocParser.parseGenomeInterval("1:4-5") + val chr2 = GenomeLocParser.parseGenomeInterval("2:1-1") + val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2") + + val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_start." + index + ".intervals")) + + IntervalScatterFunction.scatter(intervals, files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(2, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1a, locs1.get(0)) + Assert.assertEquals(chr1b, locs1.get(1)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3, locs3.get(0)) + } + + @Test + def testScatterByContigIntervalsMiddle = { + val intervals = List("1:1-1", "2:1-2", "2:4-5", "3:2-2") + val chr1 = GenomeLocParser.parseGenomeInterval("1:1-1") + val chr2a = GenomeLocParser.parseGenomeInterval("2:1-2") + val chr2b = GenomeLocParser.parseGenomeInterval("2:4-5") + val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2") + + val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_middle." + index + ".intervals")) + + IntervalScatterFunction.scatter(intervals, files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(1, locs1.size) + Assert.assertEquals(2, locs2.size) + Assert.assertEquals(1, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2a, locs2.get(0)) + Assert.assertEquals(chr2b, locs2.get(1)) + Assert.assertEquals(chr3, locs3.get(0)) + } + + @Test + def testScatterByContigIntervalsEnd = { + val intervals = List("1:1-1", "2:2-2", "3:1-2", "3:4-5") + val chr1 = GenomeLocParser.parseGenomeInterval("1:1-1") + val chr2 = GenomeLocParser.parseGenomeInterval("2:2-2") + val chr3a = GenomeLocParser.parseGenomeInterval("3:1-2") + val chr3b = GenomeLocParser.parseGenomeInterval("3:4-5") + + val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_end." + index + ".intervals")) + + IntervalScatterFunction.scatter(intervals, files, reference, true) + + val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false) + val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false) + val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false) + + Assert.assertEquals(1, locs1.size) + Assert.assertEquals(1, locs2.size) + Assert.assertEquals(2, locs3.size) + + Assert.assertEquals(chr1, locs1.get(0)) + Assert.assertEquals(chr2, locs2.get(0)) + Assert.assertEquals(chr3a, locs3.get(0)) + Assert.assertEquals(chr3b, locs3.get(1)) + } +}