Changed parsing engine to store the order the argument bindings based on their definition in the class, moving "-T" to the front of Queue command lines.

Queue GATK generated .intervals is now a List(File) again removing special case handling in the generator.
Instead of using @Scatter annotation, using ScatterFunction instance to determine if a job can be scattered.
Implemented special VcfGatherFunction which only uses the header from the first file, even if the other files differ in their headers.
Added a -deleteIntermediates to Queue to delete the outputs from intermediate commands after a successful run.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4536 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2010-10-20 21:43:52 +00:00
parent 91049269c2
commit 88a0d77433
28 changed files with 631 additions and 551 deletions

View File

@ -377,11 +377,11 @@ public class ParsingEngine {
* Extract all the argument sources from a given object, along with their bindings if obj != null .
* @param obj the object corresponding to the sourceClass
* @param sourceClass class to act as sources for other arguments.
* @param parentFields
* @param parentFields Parent Fields
* @return A map of sources associated with this object and its aggregated objects and bindings to their bindings values
*/
private static Map<ArgumentSource, Object> extractArgumentBindings(Object obj, Class sourceClass, Field[] parentFields) {
Map<ArgumentSource, Object> bindings = new HashMap<ArgumentSource, Object>();
Map<ArgumentSource, Object> bindings = new LinkedHashMap<ArgumentSource, Object>();
while( sourceClass != null ) {
Field[] fields = sourceClass.getDeclaredFields();

View File

@ -25,6 +25,7 @@
package org.broadinstitute.sting.queue.extensions.gatk;
import net.sf.samtools.SAMFileWriter;
import org.broad.tribble.vcf.VCFWriter;
import org.broadinstitute.sting.commandline.*;
import java.io.File;
@ -59,15 +60,15 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
return "";
else
return String.format("%n" +
"/** %n" +
" * Short name of %1$s%n" +
" * @return Short name of %1$s%n" +
"/**%n" +
" * Short name of %1$s%n" +
" * @return Short name of %1$s%n" +
" */%n" +
"def %3$s = this.%1$s%n" +
"%n" +
"/** %n" +
" * Short name of %1$s%n" +
" * @param value Short name of %1$s%n" +
"/**%n" +
" * Short name of %1$s%n" +
" * @param value Short name of %1$s%n" +
" */%n" +
"def %4$s(value: %2$s) = this.%1$s = value%n",
getFieldName(),
@ -96,7 +97,7 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
}
@Override
protected String getScatterGatherAnnotation() {
protected String getGatherAnnotation() {
return "";
}
@ -120,9 +121,8 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
private static List<? extends ArgumentField> getArgumentFields(ArgumentDefinition argumentDefinition) {
if (intervalFields.contains(argumentDefinition.fullName) && argumentDefinition.ioType == ArgumentIOType.INPUT) {
boolean scatter = "intervals".equals(argumentDefinition.fullName);
return Arrays.asList(
new IntervalFileArgumentField(argumentDefinition, scatter),
new IntervalFileArgumentField(argumentDefinition),
new IntervalStringArgumentField(argumentDefinition));
// ROD Bindings are set by the RodBindField
@ -166,18 +166,10 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
}
// if (intervalFields.contains(argumentDefinition.fullName) && argumentDefinition.ioType == ArgumentIOType.INPUT)
// Change intervals to an input file, and optionally scatter it.
// Change intervals exclusize of intervalsString.
private static class IntervalFileArgumentField extends InputArgumentField {
private final boolean scatter;
public IntervalFileArgumentField(ArgumentDefinition argumentDefinition, boolean scatter) {
public IntervalFileArgumentField(ArgumentDefinition argumentDefinition) {
super(argumentDefinition);
this.scatter = scatter;
}
@Override protected boolean isMultiValued() { return !this.scatter && super.isMultiValued(); }
@Override public boolean isScatter() { return this.scatter; }
@Override protected String getScatterGatherAnnotation() {
return scatter ? String.format("@Scatter(classOf[IntervalScatterFunction])%n") : super.getScatterGatherAnnotation();
}
@Override
@ -241,10 +233,15 @@ public abstract class ArgumentDefinitionField extends ArgumentField {
@Override protected String getDefaultValue() { return "_"; }
@Override public boolean isGather() { return true; }
@Override protected String getScatterGatherAnnotation() {
return String.format(SAMFileWriter.class.isAssignableFrom(argumentDefinition.argumentType)
? "@Gather(classOf[BamGatherFunction])%n"
: "@Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction])%n");
@Override protected String getGatherAnnotation() {
String gather;
if (SAMFileWriter.class.isAssignableFrom(argumentDefinition.argumentType))
gather = "@Gather(classOf[BamGatherFunction])%n";
else if (VCFWriter.class.isAssignableFrom(argumentDefinition.argumentType))
gather = "@Gather(classOf[VcfGatherFunction])%n";
else
gather = "@Gather(classOf[org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction])%n";
return String.format(gather);
}
}

View File

@ -72,7 +72,7 @@ public abstract class ArgumentField {
isRequired(),
getExclusiveOf(),
getValidation(),
getScatterGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(),
getGatherAnnotation(), getFieldName(), getFieldType(), getDefaultValue(),
getDefineAddition());
}
@ -105,8 +105,8 @@ public abstract class ArgumentField {
/** @return A validation string for the argument. */
protected String getValidation() { return ""; }
/** @return A scatter or gather annotation with a line feed, or "". */
protected String getScatterGatherAnnotation() { return ""; }
/** @return A gather annotation with a line feed, or "". */
protected String getGatherAnnotation() { return ""; }
// Scala
@ -136,9 +136,6 @@ public abstract class ArgumentField {
return importClasses;
}
/** @return True if this field uses @Scatter. */
public boolean isScatter() { return false; }
/** @return True if this field uses @Gather. */
public boolean isGather() { return false; }

View File

@ -38,6 +38,7 @@ import org.broadinstitute.sting.gatk.io.stubs.OutputStreamArgumentTypeDescriptor
import org.broadinstitute.sting.gatk.io.stubs.SAMFileReaderArgumentTypeDescriptor;
import org.broadinstitute.sting.gatk.io.stubs.SAMFileWriterArgumentTypeDescriptor;
import org.broadinstitute.sting.gatk.refdata.tracks.builders.RMDTrackBuilder;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
@ -104,7 +105,7 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
String clpClassName = clpManager.getName(clp);
writeClass("org.broadinstitute.sting.queue.function.JarCommandLineFunction", COMMANDLINE_PACKAGE_NAME, clpClassName,
"", ArgumentDefinitionField.getArgumentFields(clp));
false, "", ArgumentDefinitionField.getArgumentFields(clp));
if (clp == CommandLineGATK.class) {
for (Entry<String, Collection<Class<? extends Walker>>> walkersByPackage: walkerManager.getWalkerNamesByPackage(false).entrySet()) {
@ -116,9 +117,16 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
argumentFields.addAll(RodBindField.getRodArguments(walkerType, trackBuilder));
argumentFields.addAll(ReadFilterField.getFilterArguments(walkerType));
String constructor = String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName);
String scatterClass = getScatterClass(walkerType);
boolean isScatter = false;
if (scatterClass != null) {
isScatter = true;
constructor += String.format("scatterClass = classOf[%s]%n", scatterClass);
}
writeClass(COMMANDLINE_PACKAGE_NAME + "." + clpClassName, WALKER_PACKAGE_NAME, walkerName,
String.format("analysisName = \"%1$s\"%nanalysis_type = \"%1$s\"%n", walkerName),
argumentFields);
isScatter, constructor, argumentFields);
}
}
}
@ -149,15 +157,22 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
return false;
}
private void writeClass(String baseClass, String packageName, String className, String constructor,
List<? extends ArgumentField> argumentFields) throws IOException {
String content = getContent(CLASS_TEMPLATE, baseClass, packageName, className, constructor, "", argumentFields);
private String getScatterClass(Class<? extends Walker> walkerType) {
if (ReadWalker.class.isAssignableFrom(walkerType))
return "ContigScatterFunction";
else
return "IntervalScatterFunction";
}
private void writeClass(String baseClass, String packageName, String className, boolean isScatter,
String constructor, List<? extends ArgumentField> argumentFields) throws IOException {
String content = getContent(CLASS_TEMPLATE, baseClass, packageName, className, constructor, isScatter, "", argumentFields);
writeFile(packageName + "." + className, content);
}
private void writeFilter(String packageName, String className, List<? extends ArgumentField> argumentFields) throws IOException {
String content = getContent(TRAIT_TEMPLATE, "org.broadinstitute.sting.queue.function.CommandLineFunction",
packageName, className, "", String.format(" + \" -read_filter %s\"", className), argumentFields);
packageName, className, "", false, String.format(" + \" -read_filter %s\"", className), argumentFields);
writeFile(packageName + "." + className, content);
}
@ -172,12 +187,12 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
}
private static String getContent(String scalaTemplate, String baseClass, String packageName, String className,
String constructor, String commandLinePrefix, List<? extends ArgumentField> argumentFields) {
String constructor, boolean isScatter,
String commandLinePrefix, List<? extends ArgumentField> argumentFields) {
StringBuilder arguments = new StringBuilder();
StringBuilder commandLine = new StringBuilder(commandLinePrefix);
Set<String> importSet = new HashSet<String>();
boolean isScatter = false;
boolean isGather = false;
List<String> freezeFields = new ArrayList<String>();
for(ArgumentField argumentField: argumentFields) {
@ -186,13 +201,11 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
importSet.addAll(argumentField.getImportStatements());
freezeFields.add(argumentField.getFreezeFields());
isScatter |= argumentField.isScatter();
isGather |= argumentField.isGather();
}
if (isScatter) {
importSet.add("import org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction");
importSet.add("import org.broadinstitute.sting.queue.function.scattergather.Scatter");
baseClass += " with ScatterGatherableFunction";
}
if (isGather)
@ -210,8 +223,10 @@ public class GATKExtensionsGenerator extends CommandLineProgram {
freezeFieldOverride.append(String.format("}%n%n"));
}
String importText = sortedImports.size() == 0 ? "" : NEWLINE + StringUtils.join(sortedImports, NEWLINE) + NEWLINE;
// see CLASS_TEMPLATE and TRAIT_TEMPLATE below
return String.format(scalaTemplate, packageName, StringUtils.join(sortedImports, NEWLINE),
return String.format(scalaTemplate, packageName, importText,
className, baseClass, constructor, arguments, freezeFieldOverride, commandLine);
}

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.function.scattergather;
import java.lang.annotation.*;
/**
* Specifies the class type of the CommandLineFunction to scatter an @Input
* Written in java because scala doesn't support RetentionPolicy.RUNTIME
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Scatter {
Class value();
}

View File

@ -16,7 +16,7 @@ val MERGED_DIR = new File("/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/ma
trait UNIVERSAL_GATK_ARGS extends CommandLineGATK {
this.logging_level = "INFO";
this.jarFile = gatkJarFile;
this.intervals = new File(TARGET_INTERVAL);
this.intervals :+= new File(TARGET_INTERVAL);
this.reference_sequence = referenceFile;
this.jobQueue = "gsa";
this.et = Option(org.broadinstitute.sting.gatk.phonehome.GATKRunReport.PhoneHomeOption.STANDARD);

View File

@ -20,7 +20,7 @@ class ManySampleUGPerformanceTesting extends QScript {
trait UNIVERSAL_GATK_ARGS extends CommandLineGATK {
this.logging_level = "INFO";
this.jarFile = gatkJarFile;
this.intervals = new File(TARGET_INTERVAL);
this.intervals :+= new File(TARGET_INTERVAL);
this.reference_sequence = referenceFile;
this.jobQueue = "gsa";
this.et = Option(org.broadinstitute.sting.gatk.phonehome.GATKRunReport.PhoneHomeOption.STANDARD);

View File

@ -41,7 +41,7 @@ class ExampleUnifiedGenotyper extends QScript {
trait UnifiedGenotyperArguments extends CommandLineGATK {
this.jarFile = qscript.gatkJar
this.reference_sequence = qscript.referenceFile
this.intervals = qscript.intervals
this.intervals :+= qscript.intervals
// Some() is how you set the value for an scala Option.
// Set the memory limit to 2 gigabytes on each command.
this.memoryLimit = Some(2)

View File

@ -64,7 +64,7 @@ class fullCallingPipeline extends QScript {
private var pipeline: Pipeline = _
trait CommandLineGATKArgs extends CommandLineGATK {
this.intervals = qscript.pipeline.getProject.getIntervalList
this.intervals :+= qscript.pipeline.getProject.getIntervalList
this.jarFile = qscript.gatkJar
this.reference_sequence = qscript.pipeline.getProject.getReferenceFile
this.memoryLimit = Some(4)
@ -118,10 +118,9 @@ class fullCallingPipeline extends QScript {
realigner.jobOutputFile = new File(".queue/logs/Cleaning/%s/IndelRealigner.out".format(sampleId))
realigner.analysisName = "RealignBam_"+sampleId
realigner.input_file = targetCreator.input_file
realigner.intervals = qscript.contigIntervals
realigner.intervals :+= qscript.contigIntervals
realigner.targetIntervals = targetCreator.out
realigner.scatterCount = contigCount
realigner.isIntermediate = true
// may need to explicitly run fix mates
var fixMates = new PicardBamJarFunction {
@ -134,7 +133,6 @@ class fullCallingPipeline extends QScript {
}
// realigner.out = cleaned_bam
// realigner.scatterClass = classOf[ContigScatterFunction]
// realigner.setupGatherFunction = { case (f: BamGatherFunction, _) => f.jarFile = qscript.picardFixMatesJar }
// realigner.jobQueue = "week"
@ -142,9 +140,8 @@ class fullCallingPipeline extends QScript {
if (realigner.scatterCount > 1) {
realigner.out = cleaned_bam
// While gathering run fix mates.
realigner.scatterClass = classOf[ContigScatterFunction]
realigner.setupScatterFunction = {
case (scatter: ScatterFunction, _) =>
case scatter: ScatterFunction =>
scatter.commandDirectory = new File("CleanedBams/IntermediateFiles/%s/ScatterGather".format(sampleId))
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/Cleaning/%s/Scatter.out".format(sampleId))
}
@ -167,6 +164,7 @@ class fullCallingPipeline extends QScript {
}
} else {
realigner.out = swapExt("CleanedBams/IntermediateFiles/"+sampleId,bam,"bam","unfixed.cleaned.bam")
realigner.isIntermediate = true
// Explicitly run fix mates if the function won't be scattered.
@ -175,7 +173,6 @@ class fullCallingPipeline extends QScript {
fixMates.unfixed = realigner.out
fixMates.fixed = cleaned_bam
fixMates.analysisName = "FixMates_"+sampleId
fixMates.isIntermediate = true
// Add the fix mates explicitly
}
@ -183,7 +180,6 @@ class fullCallingPipeline extends QScript {
samtoolsindex.jobOutputFile = new File(".queue/logs/Cleaning/%s/SamtoolsIndex.out".format(sampleId))
samtoolsindex.bamFile = cleaned_bam
samtoolsindex.analysisName = "index_cleaned_"+sampleId
samtoolsindex.isIntermediate = true
if (!qscript.skip_cleaning) {
if ( realigner.scatterCount > 1 ) {
@ -238,7 +234,7 @@ class fullCallingPipeline extends QScript {
snps.scatterCount = qscript.num_snp_scatter_jobs
snps.setupScatterFunction = {
case (scatter: ScatterFunction, _) =>
case scatter: ScatterFunction =>
scatter.commandDirectory = new File("SnpCalls/ScatterGather")
scatter.jobOutputFile = new File(IOUtils.CURRENT_DIR_ABS, ".queue/logs/SNPCalling/ScatterGather/Scatter.out")
}

View File

@ -1,196 +0,0 @@
import org.broadinstitute.sting.queue.extensions.firehose.ImportSingleValueFunction
import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction
import org.broadinstitute.sting.queue.QScript
import org.broadinstitute.sting.queue.extensions.gatk._
class CleanBamFile extends QScript {
qscript =>
@Argument(doc="gatk jar", shortName="gatk")
var gatkJar: File = _
@Argument(doc="samtools binary", shortName="samtools")
var samtoolsBinary: String = _
@Argument(doc="fix mates jar", shortName="fixMates")
var fixMatesJar: File = _
@Input(doc="Script that can merge text files, for example Sting/shell/mergeText.sh.", shortName="MTS")
var mergeTextScript: String = _
@Argument(doc="base name for output files", shortName="base")
var baseName: String = _
@Input(doc="reference genome", shortName="R")
var referenceFile: File = _
@Input(doc="recalibrated bam", shortName="I")
var recalibratedBam: File = _
@Argument(doc="read group blacklist conversion script that can convert firehose outputs to a GATK blacklist file.", shortName="RGBLS")
var readGroupBlackListScript: String = _
@Argument(doc="read group blacklist", shortName="RGBL", required=false)
var readGroupBlackList: String = _
@Argument(doc="intervals", shortName="L")
var intervals: File = _
@Argument(doc="Script that can split the interval file by contig, for example Sting/python/splitIntervalsByContig.py.", shortName="RTCSS")
var targetCreatorScatterScript: String = _
@Argument(doc="RealignerTargetCreator scatter count. " +
"Best if it is either 1 or the number of contigs in the interval list. " +
"If used the compute farm must also be used.", shortName="RTCSC")
var targetCreatorScatterCount = 0
@Argument(doc="Script that can split the intervals evenly, for example Sting/shell/splitIntervals.sh.", shortName="IRSS")
var indelRealignerScatterScript: String = _
@Argument(doc="IndelRealigner scatter count.", shortName="IRSC")
var indelRealignerScatterCount = 0
@Input(doc="dbsnp file", shortName="D")
var dbsnpFile: File = _
@Argument(doc="firehose import jar", shortName="importJar")
var firehoseImportJar: File = _
@Argument(doc="short job queue", shortName="shortQueue", required=false)
var shortJobQueue: String = _
@Argument(doc="firehose host", shortName="FHHost")
var firehoseHost: String = _
@Argument(doc="firehose port", shortName="FHPort")
var firehosePort: Int = _
@Argument(doc="firehose domain", shortName="FHDomain")
var firehoseDomain: String = _
@Argument(doc="clean bam firehose security token", shortName="FHToken")
var firehoseSecurityToken: String = _
@Argument(doc="clean bam firehose entity type", shortName="bamFHEType")
var bamFirehoseEntityType: String = _
@Argument(doc="clean bam firehose entity id", shortName="bamFHEID")
var bamFirehoseEntityID: String = _
@Argument(doc="clean bam firehose annotation type name", shortName="bamFHAnn")
var bamFirehoseAnnotationTypeName: String = _
trait GATKCommonArgs extends CommandLineGATK {
this.jarFile = qscript.gatkJar
this.reference_sequence = qscript.referenceFile
this.intervals = qscript.intervals
this.input_file :+= recalibratedBam
}
def baseFile(suffix: String) = new File(baseName + suffix)
def script = {
val blacklistConverter = new CommandLineFunction {
@Output(doc="blacklist file") var blacklistFile: File = _
def commandLine = readGroupBlackListScript + " " + blacklistFile + " \"" + readGroupBlackList + "\""
}
if (readGroupBlackList != null) {
blacklistConverter.blacklistFile = baseFile(".blacklist.txt")
add(blacklistConverter)
}
// -T RealignerTargetCreator -I <input.bam> -R <reference.genome> <interval.list> <blacklist.file> -o <base.name>.merged.intervals
val targetCreator = new RealignerTargetCreator with GATKCommonArgs
targetCreator.memoryLimit = Some(2)
targetCreator.read_group_black_list :+= blacklistConverter.blacklistFile
targetCreator.out = baseFile(".merged.intervals")
targetCreator.scatterCount = targetCreatorScatterCount
targetCreator.setupScatterFunction = {
case (scatter: IntervalScatterFunction, _) =>
scatter.splitIntervalsScript = targetCreatorScatterScript
}
targetCreator.setupGatherFunction = {
case (gather: SimpleTextGatherFunction, _) =>
gather.mergeTextScript = mergeTextScript
}
// -T IndelRealigner -I <input.bam> -R <reference.genome> <blacklist.file> -stats <base.name>.indel.stats
// -O <base.name>.unfixed.cleaned.bam -maxInRam 200000 -targetIntervals <merged.intervals> -D <dbsnp.file>
val realigner = new IndelRealigner with GATKCommonArgs
realigner.memoryLimit = Some(4)
realigner.read_group_black_list :+= blacklistConverter.blacklistFile
realigner.statisticsFileForDebugging = baseFile(".indel.stats")
realigner.maxReadsInRam = Some(200000)
realigner.targetIntervals = targetCreator.out
realigner.DBSNP = dbsnpFile
realigner.scatterCount = indelRealignerScatterCount
var fixedBam: File = null
if (realigner.scatterCount > 1) {
realigner.out = baseFile(".cleaned.bam")
// While gathering run fix mates.
realigner.setupScatterFunction = {
case (scatter: IntervalScatterFunction, _) =>
scatter.splitIntervalsScript = indelRealignerScatterScript
}
realigner.setupGatherFunction = {
case (gather: BamGatherFunction, _) =>
gather.memoryLimit = Some(4)
gather.jarFile = fixMatesJar
// Don't pass this AS=true to fix mates!
gather.assumeSorted = None
case (gather: SimpleTextGatherFunction, _) =>
gather.mergeTextScript = mergeTextScript
}
fixedBam = realigner.out
} else {
realigner.out = baseFile(".unfixed.cleaned.bam")
// Explicitly run fix mates if the function won't be scattered.
var fixMates = new PicardBamJarFunction {
// Declare inputs/outputs for dependency tracking.
@Input(doc="unfixed bam") var unfixed: File = _
@Output(doc="fixed bam") var fixed: File = _
def inputBams = List(unfixed)
def outputBam = fixed
}
fixMates.memoryLimit = Some(4)
fixMates.jarFile = fixMatesJar
fixMates.unfixed = realigner.out
fixMates.fixed = baseFile(".cleaned.bam")
fixedBam = fixMates.fixed
// Add the fix mates explicitly
add(fixMates)
}
val bamIndex = new SamtoolsIndexFunction
bamIndex.samtools = samtoolsBinary
bamIndex.bamFile = fixedBam
bamIndex.bamFileIndex = swapExt(fixedBam, "bam", "bam.bai")
val importer = new ImportSingleValueFunction {
/** Files that this job should wait on before running. */
@Input(doc="Explicit job dependencies", required=false)
var jobDependencies: List[File] = Nil
}
importer.jobQueue = shortJobQueue
importer.jarFile = firehoseImportJar
importer.host = firehoseHost
importer.port = firehosePort
importer.domain = firehoseDomain
importer.securityToken = firehoseSecurityToken
importer.entityType = bamFirehoseEntityType
importer.entityID = bamFirehoseEntityID
importer.annotationTypeName = bamFirehoseAnnotationTypeName
importer.importValue = fixedBam
importer.jobDependencies :+= bamIndex.bamFileIndex
add(targetCreator, realigner, bamIndex, importer)
}
}

View File

@ -1,84 +0,0 @@
#!/bin/sh
ENTITY_TYPE=$1
ENTITY_ID=$2
FIREHOSE_TOKEN=$3
FIREHOSE_WORKSPACE=trunk
FIREHOSE_HOST=firehose
FIREHOSE_PORT=8080
FIREHOSE_DOMAIN=gsa
CLEAN_BAM_ANNOTATION=clean_bam_file
DEFAULT_QUEUE=gsa
SHORT_QUEUE=hour
FIREHOSE_SOURCE_HOME=/humgen/gsa-firehose/firehose/source
STING_HOME=$FIREHOSE_SOURCE_HOME/Sting
CGA_HOME=$FIREHOSE_SOURCE_HOME/CancerGenomeAnalysis
RUN=-run
TMP_DIR=/broad/shptmp/$USER
JOB_QUEUE=gsa
INTERVAL_COUNT=15
REALIGNER_COUNT=50
QUEUE_JAR=$STING_HOME/dist/Queue.jar
GATK_JAR=$STING_HOME/dist/GenomeAnalysisTK.jar
FIX_MATES_JAR=/seq/software/picard/1.194/bin/FixMateInformation.jar
SAMTOOLS=/seq/dirseq/samtools/samtools-0.1.7-5/samtools
FIREHOSE_IMPORT_JAR=$CGA_HOME/analysis_pipeline/tools/dist/ImportSingleValue.jar
CLEAN_BAM_FILE_SCRIPT=$STING_HOME/scala/qscript/kshakir/CleanBamFile.scala
FIREHOSE_TEST_HARNESS="python $CGA_HOME/analysis_pipeline/scripts/firehose_test_harness.py"
SPLIT_BY_CONTIG="python $STING_HOME/python/splitIntervalsByContig.py"
SPLIT_BY_INTERVALS=$STING_HOME/shell/splitIntervals.sh
MERGE_TEXT=$STING_HOME/shell/mergeText.sh
CONVERT_BLACKLIST=$CGA_HOME/analysis_pipeline/genepattern/modules/ConvertDependentsList/convert_dependents_list.sh
# Record the Sting version number.
svnversion $STING_HOME > stingversion.txt
# If Sting has been modified, record the differences.
grep -E '^[0-9]+$' stingversion.txt || svn diff $STING_HOME > stingdiff.txt
# Record the CGA version number.
svnversion $CGA_HOME > cgaversion.txt
# If CGA has been modified, record the differences.
grep -E '^[0-9]+$' cgaversion.txt || svn diff $CGA_HOME > cgadiff.txt
# Try to retrieve the blacklist. If it fails then set it to "".
$FIREHOSE_TEST_HARNESS -d $FIREHOSE_DOMAIN -w $FIREHOSE_WORKSPACE -n $ENTITY_ID -t $ENTITY_TYPE 'BLACKLIST="${read_group_blacklist}"' && . firehose-populated-commands.sh || BLACKLIST=""
# Retrieve all the required variables and run the pipeline in Queue.
$FIREHOSE_TEST_HARNESS -d $FIREHOSE_DOMAIN -w $FIREHOSE_WORKSPACE -n $ENTITY_ID -t $ENTITY_TYPE 'REFERENCE_FILE="${reference_file}";BAM_FILE="${recalibrated_bam_file}";DBSNP_FILE="${dbsnp_file}";INTERVAL_FILE="${interval_list}";DATABASE_ID="${database_id}"' && . firehose-populated-commands.sh && \
\
JOB_PREFIX=Q-$ENTITY_ID && \
\
java \
-Djava.io.tmpdir="$TMP_DIR" \
-jar "$QUEUE_JAR" \
-S "$CLEAN_BAM_FILE_SCRIPT" \
-bsub -bsubWait -skipUpToDate \
-jobQueue "$JOB_QUEUE" \
-jobPrefix "$JOB_PREFIX" \
-gatk "$GATK_JAR" \
-base "$ENTITY_ID" \
-R "$REFERENCE_FILE" \
-I "$BAM_FILE" \
-D "$DBSNP_FILE" \
-L "$INTERVAL_FILE" \
-RTCSS "$SPLIT_BY_CONTIG" \
-RTCSC "$INTERVAL_COUNT" \
-IRSS "$SPLIT_BY_INTERVALS" \
-IRSC "$REALIGNER_COUNT" \
-MTS "$MERGE_TEXT" \
-fixMates "$FIX_MATES_JAR" \
-samtools "$SAMTOOLS" \
-RGBLS "$CONVERT_BLACKLIST" \
-RGBL "$BLACKLIST" \
-importJar "$FIREHOSE_IMPORT_JAR" \
-shortQueue "$SHORT_QUEUE" \
-FHHost "$FIREHOSE_HOST" \
-FHPort "$FIREHOSE_PORT" \
-FHDomain "$FIREHOSE_DOMAIN" \
-FHToken "$FIREHOSE_TOKEN" \
-bamFHEType "$ENTITY_TYPE" \
-bamFHEID "$DATABASE_ID" \
-bamFHAnn "$CLEAN_BAM_ANNOTATION" \
$RUN || ( echo Job failed. Check stdout.txt for more info. >&2; exit 1; )

View File

@ -39,7 +39,7 @@ def script = {
add(new CountCovariates(bamIn, recalData) { useOriginalQualities = true } )
val tableRecal = new TableRecalibrate(bamIn, recalData, recalBam) { useOriginalQualities = true }
if ( scatter ) {
tableRecal.intervals = new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list")
tableRecal.intervals :+= new File("/humgen/gsa-hpprojects/GATK/data/chromosomes.hg18.interval_list")
//tableRecal.scatterClass = classOf[ContigScatterFunction]
tableRecal.setupGatherFunction = { case (f: PicardBamJarFunction, _) => f.jarFile = picardMergeSamFilesJar; f.memoryLimit = Some(4) }
tableRecal.scatterCount = 25

View File

@ -26,8 +26,8 @@ class QCommandLine extends CommandLineProgram with Logging {
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
private var expandedDotFile: File = _
@Argument(fullName="start_clean", shortName="clean", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false)
private var startClean = false
@Argument(fullName="start_from_scratch", shortName="startFromScratch", doc="Runs all command line functions even if the outputs were previously output successfully.", required=false)
private var startFromScratch = false
@Argument(fullName="for_reals", shortName="forReals", doc="Run QScripts", required=false) @Hidden
private var runScripts = false
@ -38,12 +38,15 @@ class QCommandLine extends CommandLineProgram with Logging {
@ArgumentCollection
private val qSettings = new QSettings
@Argument(fullName="statusEmailFrom", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
@Argument(fullName="status_email_from", shortName="statusFrom", doc="Email address to send emails from upon completion or on error.", required=false)
private var statusEmailFrom: String = System.getProperty("user.name") + "@" + SystemUtils.domainName
@Argument(fullName="statusEmailTo", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
@Argument(fullName="status_email_to", shortName="statusTo", doc="Email address to send emails to upon completion or on error.", required=false)
private var statusEmailTo: List[String] = Nil
@Argument(fullName="delete_intermediate_outputs", shortName="deleteIntermediates", doc="After a successful run delete the outputs of any Function marked as intermediate.", required=false)
private var deleteIntermediates = false
/**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated
* functions, and then builds and runs a QGraph based on the dependencies.
@ -53,13 +56,14 @@ class QCommandLine extends CommandLineProgram with Logging {
val qGraph = new QGraph
qGraph.dryRun = !(run || runScripts)
qGraph.bsubAllJobs = bsubAllJobs
qGraph.startClean = startClean
qGraph.startFromScratch = startFromScratch
qGraph.dotFile = dotFile
qGraph.expandedDotFile = expandedDotFile
qGraph.qSettings = qSettings
qGraph.debugMode = debugMode == true
qGraph.statusEmailFrom = statusEmailFrom
qGraph.statusEmailTo = statusEmailTo
qGraph.deleteIntermediates = deleteIntermediates
val scripts = qScriptManager.createScripts()
for (script <- scripts) {

View File

@ -16,7 +16,6 @@ trait QScript extends Logging {
type CommandLineFunction = org.broadinstitute.sting.queue.function.CommandLineFunction
type InProcessFunction = org.broadinstitute.sting.queue.function.InProcessFunction
type ScatterGatherableFunction = org.broadinstitute.sting.queue.function.scattergather.ScatterGatherableFunction
type Scatter = org.broadinstitute.sting.queue.function.scattergather.Scatter
type Gather = org.broadinstitute.sting.queue.function.scattergather.Gather
type SimpleTextGatherFunction = org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction

View File

@ -33,10 +33,10 @@ class FunctionEdge(var function: QFunction) extends QEdge {
currentStatus = RunnerStatus.SKIPPED
}
def resetToPending() = {
def resetToPending(cleanOutputs: Boolean) = {
currentStatus = RunnerStatus.PENDING
function.doneOutputs.foreach(_.delete())
function.failOutputs.foreach(_.delete())
if (cleanOutputs)
function.deleteOutputs()
}
def inputs = function.inputs

View File

@ -21,11 +21,12 @@ import org.apache.commons.lang.StringUtils
class QGraph extends Logging {
var dryRun = true
var bsubAllJobs = false
var startClean = false
var startFromScratch = false
var dotFile: File = _
var expandedDotFile: File = _
var qSettings: QSettings = _
var debugMode = false
var deleteIntermediates = false
var statusEmailFrom: String = _
var statusEmailTo: List[String] = _
@ -127,6 +128,7 @@ class QGraph extends Logging {
// build up the full DAG with scatter-gather jobs
fillGraph
logger.info("Checking pipeline status.")
updateGraphStatus(false)
logStatus
}
@ -241,24 +243,18 @@ class QGraph extends Logging {
* Dry-runs the jobs by traversing the graph.
*/
private def dryRunJobs() = {
traverseFunctions(edge => {
edge.function match {
case qFunction => {
if (logger.isDebugEnabled) {
logger.debug(qFunction.commandDirectory + " > " + qFunction.description)
} else {
logger.info(qFunction.description)
}
logger.info("Output written to " + qFunction.jobOutputFile)
if (qFunction.jobErrorFile != null) {
logger.info("Errors written to " + qFunction.jobErrorFile)
} else {
if (logger.isDebugEnabled)
logger.info("Errors also written to " + qFunction.jobOutputFile)
}
}
}
})
updateGraphStatus(false)
traverseFunctions(edge => logEdge(edge))
}
private def logEdge(edge: FunctionEdge) = {
logger.info("-------")
logger.info(StringUtils.capitalize(edge.status.toString) + ": " + edge.function.description)
if (logger.isDebugEnabled)
logger.debug(edge.function.commandDirectory + " > " + edge.function.description)
logger.info("Log: " + edge.function.jobOutputFile.getAbsolutePath)
if (edge.function.jobErrorFile != null)
logger.info("Error: " + edge.function.jobErrorFile.getAbsolutePath)
}
/**
@ -266,12 +262,11 @@ class QGraph extends Logging {
*/
private def runJobs() = {
try {
traverseFunctions(edge => {
if (startClean)
edge.resetToPending()
else
checkDone(edge)
})
if (startFromScratch) {
logger.info("Removing outputs from previous runs.")
foreachFunction(_.resetToPending(true))
} else
updateGraphStatus(true)
var readyJobs = getReadyJobs
var runningJobs = Set.empty[FunctionEdge]
@ -303,6 +298,8 @@ class QGraph extends Logging {
Thread.sleep(30000L)
readyJobs = getReadyJobs
}
deleteIntermediateOutputs()
} catch {
case e =>
logger.error("Uncaught error running jobs.", e)
@ -312,13 +309,22 @@ class QGraph extends Logging {
}
}
/**
* Updates the status of edges in the graph.
* @param cleanOutputs If true will delete outputs when setting edges to pending.
*/
private def updateGraphStatus(cleanOutputs: Boolean) = {
traverseFunctions(edge => checkDone(edge, cleanOutputs))
}
/**
* Checks if an edge is done or if it's an intermediate edge if it can be skipped.
* This function may modify previous edges if it discovers that the edge passed in
* is dependent jobs that were previously marked as skipped.
* @param edge Edge to check to see if it's done or can be skipped.
* @param cleanOutputs If true will delete outputs when setting edges to pending.
*/
private def checkDone(edge: FunctionEdge) = {
private def checkDone(edge: FunctionEdge, cleanOutputs: Boolean) = {
if (edge.function.isIntermediate) {
// By default we do not need to run intermediate edges.
// Mark any intermediate edges as skipped, if they're not already done.
@ -329,8 +335,8 @@ class QGraph extends Logging {
val isDone = edge.status == RunnerStatus.DONE &&
previous.forall(edge => edge.status == RunnerStatus.DONE || edge.status == RunnerStatus.SKIPPED)
if (!isDone) {
edge.resetToPending()
resetPreviousSkipped(edge, previous)
edge.resetToPending(cleanOutputs)
resetPreviousSkipped(edge, previous, cleanOutputs)
}
}
}
@ -341,11 +347,12 @@ class QGraph extends Logging {
* to pending.
* @param edge Dependent edge.
* @param previous Previous edges that provide inputs to edge.
* @param cleanOutputs If true will clean up the output files when resetting skipped jobs to pending.
*/
private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge]): Unit = {
private def resetPreviousSkipped(edge: FunctionEdge, previous: List[FunctionEdge], cleanOutputs: Boolean): Unit = {
for (previousEdge <- previous.filter(_.status == RunnerStatus.SKIPPED)) {
previousEdge.resetToPending()
resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge))
previousEdge.resetToPending(cleanOutputs)
resetPreviousSkipped(previousEdge, this.previousFunctions(previousEdge), cleanOutputs)
}
}
@ -468,7 +475,7 @@ class QGraph extends Logging {
foreachFunction(edge => {
val name = edge.function.analysisName
if (name != null) {
updateStatus(statuses.find(_.analysisName == name) match {
updateAnalysisStatus(statuses.find(_.analysisName == name) match {
case Some(status) => status
case None =>
val status = new AnalysisStatus(name)
@ -484,11 +491,13 @@ class QGraph extends Logging {
val sgDone = status.scatter.done + status.gather.done
val sgFailed = status.scatter.failed + status.gather.failed
val sgSkipped = status.scatter.skipped + status.gather.skipped
val gatherTotal = status.gather.total
val gatherDone = status.gather.done
if (sgTotal > 0) {
var sgStatus = RunnerStatus.PENDING
if (sgFailed > 0)
sgStatus = RunnerStatus.FAILED
else if (sgDone == sgTotal)
else if (gatherDone == gatherTotal)
sgStatus = RunnerStatus.DONE
else if (sgDone + sgSkipped == sgTotal)
sgStatus = RunnerStatus.SKIPPED
@ -510,7 +519,7 @@ class QGraph extends Logging {
/**
* Updates a status map with scatter/gather status information (e.g. counts)
*/
private def updateStatus(stats: AnalysisStatus, edge: FunctionEdge) = {
private def updateAnalysisStatus(stats: AnalysisStatus, edge: FunctionEdge) = {
if (edge.function.isInstanceOf[GatherFunction]) {
updateSGStatus(stats.gather, edge)
} else if (edge.function.isInstanceOf[CloneFunction]) {
@ -671,7 +680,19 @@ class QGraph extends Logging {
}
})
iterator.foreach(_ => {})
}
}
private def deleteIntermediateOutputs() = {
if (this.deleteIntermediates && !hasFailed) {
logger.info("Deleting intermediate files.")
traverseFunctions(edge => {
if (edge.function.isIntermediate) {
logger.debug("Deleting intermediates:" + edge.function.description)
edge.function.deleteOutputs()
}
})
}
}
/**
* Outputs the graph to a .dot file.
@ -705,13 +726,8 @@ class QGraph extends Logging {
def logFailed = {
foreachFunction(edge => {
if (edge.status == RunnerStatus.FAILED) {
logger.error("-----")
logger.error("Failed: " + edge.function.description)
logger.error("Log: " + edge.function.jobOutputFile.getAbsolutePath)
if (edge.function.jobErrorFile != null)
logger.error("Error: " + edge.function.jobErrorFile.getAbsolutePath)
}
if (edge.status == RunnerStatus.FAILED)
logEdge(edge)
})
}

View File

@ -1,8 +1,6 @@
package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.commandline.ArgumentSource
import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, ScatterFunction}
import org.broadinstitute.sting.utils.interval.IntervalUtils
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceDataSource
import java.io.File
@ -10,6 +8,10 @@ import net.sf.picard.util.IntervalList
import net.sf.samtools.SAMFileHeader
import collection.JavaConversions._
import org.broadinstitute.sting.utils.{GenomeLoc, GenomeLocSortedSet, GenomeLocParser}
import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction}
import org.broadinstitute.sting.queue.function.{QFunction, InProcessFunction}
import org.broadinstitute.sting.queue.QException
/**
* An interval scatter function.
@ -18,19 +20,45 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
var splitByContig = false
private var referenceSequence: File = _
private var intervalField: ArgumentSource = _
private var intervals: List[String] = Nil
override def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = {
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = {
if (originalFunction.isInstanceOf[CommandLineGATK]) {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
gatk.reference_sequence != null
} else false
}
def setScatterGatherable(originalFunction: ScatterGatherableFunction) = {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
referenceSequence = gatk.reference_sequence
intervals = gatk.intervalsString
if (gatk.intervals != null)
intervals ::= gatk.intervals.toString
this.referenceSequence = gatk.reference_sequence
this.intervals ++= gatk.intervalsString
this.intervals ++= gatk.intervals.map(_.toString)
this.intervalField = QFunction.findField(originalFunction.getClass, "intervals")
}
def initCloneInputs(cloneFunction: CloneFunction, index: Int) = {
cloneFunction.setFieldValue(this.intervalField, List(new File("scatter.intervals")))
}
def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = {
val scatterPart = cloneFunction.getFieldValue(this.intervalField)
.asInstanceOf[List[File]]
.map(file => IOUtils.subDir(cloneFunction.commandDirectory, file))
cloneFunction.setFieldValue(this.intervalField, scatterPart)
this.scatterParts ++= scatterPart
}
def run() = {
val referenceSource = new ReferenceDataSource(referenceSequence)
GenomeLocParser.setupRefContigOrdering(referenceSource.getReference);
IntervalScatterFunction.scatter(this.intervals, this.scatterParts, this.referenceSequence, this.splitByContig)
}
}
object IntervalScatterFunction {
def scatter(intervals: List[String], scatterParts: List[File], reference: File, splitByContig: Boolean) = {
val referenceSource = new ReferenceDataSource(reference)
GenomeLocParser.setupRefContigOrdering(referenceSource.getReference)
val locs = {
// TODO: Abstract genome analysis engine has richer logic for parsing. We need to use it!
if (intervals.size == 0) {
@ -47,6 +75,9 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
var fileIndex = -1
var locIndex = 0
if (locs == null || locs.size == 0)
throw new QException("Locs produced an empty interval list: " + intervals.mkString(", "))
if (splitByContig) {
var contig: String = null
for (loc <- locs) {
@ -54,29 +85,48 @@ class IntervalScatterFunction extends ScatterFunction with InProcessFunction {
if (fileIndex >= 0)
intervalList.write(scatterParts(fileIndex))
fileIndex += 1
contig = loc.getContig
intervalList = new IntervalList(fileHeader)
}
locIndex += 1
intervalList.add(toInterval(loc, locIndex))
}
intervalList.write(scatterParts(fileIndex))
if ((fileIndex + 1) != scatterParts.size)
throw new QException("Only able to write contigs into %d of %d files.".format(fileIndex + 1, scatterParts.size))
} else {
var locsPerFile = locs.size / this.scatterParts.size
if (locs.size % this.scatterParts.size != 0) locsPerFile += 1
var locsPerFile = locs.size / scatterParts.size
val locRemainder = locs.size % scatterParts.size
// At the start, put an extra loc per file
locsPerFile += 1
var locsLeftFile = 0
for (loc <- locs) {
if (locIndex % locsPerFile == 0) {
if (locsLeftFile == 0) {
if (fileIndex >= 0)
intervalList.write(scatterParts(fileIndex))
fileIndex += 1
intervalList = new IntervalList(fileHeader)
// When we have put enough locs into each file,
// reduce the number of locs per file back
// to the original calculated value.
if (fileIndex == locRemainder)
locsPerFile -= 1
locsLeftFile = locsPerFile
}
locsLeftFile -= 1
locIndex += 1
intervalList.add(toInterval(loc, locIndex))
}
intervalList.write(scatterParts(fileIndex))
if ((fileIndex + 1) != scatterParts.size)
throw new QException("Only able to write intervals into %d of %d files.".format(fileIndex + 1, scatterParts.size))
}
}
private def toInterval(loc: GenomeLoc, locIndex: Int) =
new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, true, "interval_" + locIndex)
new net.sf.picard.util.Interval(loc.getContig, loc.getStart.toInt, loc.getStop.toInt, false, "interval_" + locIndex)
}

View File

@ -0,0 +1,46 @@
package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import java.io.{FileReader, PrintWriter}
import org.apache.commons.io.{LineIterator, IOUtils, FileUtils}
/**
* Merges a vcf text file.
*/
class VcfGatherFunction extends GatherFunction with InProcessFunction {
def run() = {
if (gatherParts.size < 1) {
throw new QException("No files to gather to output: " + originalOutput)
} else {
val writer = new PrintWriter(originalOutput)
try {
var reader = new FileReader(gatherParts(0))
try {
IOUtils.copy(reader, writer)
} finally {
IOUtils.closeQuietly(reader)
}
for (file <- gatherParts.tail) {
var inHeaders = true
val itor = FileUtils.lineIterator(file)
try {
while (itor.hasNext) {
val nextLine = itor.nextLine
if (inHeaders && nextLine(0) != '#')
inHeaders = false
if (!inHeaders)
writer.println(nextLine)
}
} finally {
LineIterator.closeQuietly(itor)
}
}
} finally {
IOUtils.closeQuietly(writer)
}
}
}
}

View File

@ -152,6 +152,15 @@ trait QFunction {
dirs
}
/**
* Deletes the output files and all the status files for this function.
*/
def deleteOutputs() = {
outputs.foreach(_.delete())
doneOutputs.foreach(_.delete())
failOutputs.foreach(_.delete())
}
/**
* Creates the output directories for this function if it doesn't exist.
*/
@ -416,6 +425,19 @@ object QFunction {
*/
private var classFieldsMap = Map.empty[Class[_], ClassFields]
/**
* Returns the field on clazz.
* @param clazz Class to search.
* @param name Name of the field to return.
* @return Argument source for the field.
*/
def findField(clazz: Class[_], name: String) = {
classFields(clazz).functionFields.find(_.field.getName == name) match {
case Some(source) => source
case None => throw new QException("Could not find a field on class %s with name %s".format(clazz, name))
}
}
/**
* Returns the fields for a class.
* @param clazz Class to retrieve fields for.

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.commandline.ArgumentSource
import java.io.File
import org.broadinstitute.sting.queue.QException
/**
* Shadow clones another command line function.
@ -61,7 +62,7 @@ class CloneFunction extends CommandLineFunction {
}
}
override def setFieldValue(source: ArgumentSource, value: Any) = {
override def setFieldValue(source: ArgumentSource, value: Any): Unit = {
source.field.getName match {
case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File]
case "jobErrorFile" => jobErrorFile = value.asInstanceOf[File]

View File

@ -1,7 +1,7 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output}
import org.broadinstitute.sting.commandline.{Input, Output}
import org.broadinstitute.sting.queue.function.QFunction
/**
@ -13,19 +13,4 @@ trait GatherFunction extends QFunction {
@Output(doc="The original output of the scattered function")
var originalOutput: File = _
/**
* Sets the original function used to create this scatter function.
* @param originalFunction The ScatterGatherableFunction.
* @param gatherField The field being gathered.
*/
def setOriginalFunction(originalFunction: ScatterGatherableFunction, gatherField: ArgumentSource) = {}
/**
* Sets the clone function creating one of the inputs for this gather function.
* @param cloneFunction The clone wrapper for the original ScatterGatherableFunction.
* @param index The one based index (from 1..scatterCount inclusive) of the scatter piece.
* @param gatherField The field to be gathered.
*/
def setCloneFunction(cloneFunction: CloneFunction, index: Int, gatherField: ArgumentSource) = {}
}

View File

@ -1,31 +1,47 @@
package org.broadinstitute.sting.queue.function.scattergather
import java.io.File
import org.broadinstitute.sting.commandline.{ArgumentSource, Input, Output}
import org.broadinstitute.sting.commandline.{Input, Output}
import org.broadinstitute.sting.queue.function.QFunction
/**
* Base class for Scatter command line functions.
* Base class for Scatter functions.
*/
trait ScatterFunction extends QFunction {
@Input(doc="Original input to scatter")
var originalInput: File = _
@Input(doc="Original inputs to scatter")
var originalInputs: Set[File] = _
@Output(doc="Scattered parts of the original input, one per temp directory")
@Output(doc="Scattered parts of the original inputs, one set per temp directory")
var scatterParts: List[File] = Nil
/**
* Sets the original function used to create this scatter function.
* @param originalFunction The ScatterGatherableFunction.
* @param scatterField The field being scattered.
* Returns true if the scatter function can scatter this original function.
* @param originalFunction The original function to check.
* @return true if the scatter function can scatter this original function.
*/
def setOriginalFunction(originalFunction: ScatterGatherableFunction, scatterField: ArgumentSource) = {}
def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean
/**
* Sets the clone function using one of the outputs of this scatter function.
* @param cloneFunction The clone wrapper for the original ScatterGatherableFunction.
* @param index The one based index (from 1..scatterCount inclusive) of the scatter piece.
* @param scatterField The field being scattered.
* Sets the original ScatterGatherableFunction to be scattered.
* @param originalFunction The original function to with inputs bind to this scatter function.
*/
def setCloneFunction(cloneFunction: CloneFunction, index: Int, scatterField: ArgumentSource) = {}
def setScatterGatherable(originalFunction: ScatterGatherableFunction)
/**
* Initializes the input fields for the clone function.
* The input values should be set to their defaults
* and may be changed by the user.
* @param cloneFunction CloneFunction to initialize.
* @param index The one based scatter index.
*/
def initCloneInputs(cloneFunction: CloneFunction, index: Int)
/**
* Binds the input fields for the clone function to this scatter function.
* The input values should be set to their absolute values and added
* to scatter parts.
* @param cloneFunction CloneFunction to bind.
* @param index The one based scatter index.
*/
def bindCloneInputs(cloneFunction: CloneFunction, index: Int)
}

View File

@ -4,6 +4,7 @@ import java.io.File
import org.broadinstitute.sting.queue.util._
import org.broadinstitute.sting.commandline.ArgumentSource
import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
import org.broadinstitute.sting.queue.QException
/**
* A function that can be run faster by splitting it up into pieces and then joining together the results.
@ -29,9 +30,8 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Allows external modification of the ScatterFunction that will create the scatter pieces in the temporary directories.
* @param scatterFunction The function that will create the scatter pieces in the temporary directories.
* @param scatterField The input field being scattered.
*/
var setupScatterFunction: PartialFunction[(ScatterFunction, ArgumentSource), Unit] = _
var setupScatterFunction: PartialFunction[ScatterFunction, Unit] = _
/**
* Allows external modification of the GatherFunction that will collect the gather pieces in the temporary directories.
@ -50,10 +50,10 @@ trait ScatterGatherableFunction extends CommandLineFunction {
/**
* Returns true if the function is ready to be scatter / gathered.
* The base implementation checks if the scatter count is greater than one,
* and that the scatter field has a value.
* and that the scatter function can scatter this instance.
* @return true if the function is ready to be scatter / gathered.
*/
def scatterGatherable = this.scatterCount > 1 && hasFieldValue(this.scatterField)
def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable(this)
/**
* Returns a list of scatter / gather and clones of this function
@ -68,17 +68,15 @@ trait ScatterGatherableFunction extends CommandLineFunction {
val inputFieldsWithValues = this.inputFields.filter(hasFieldValue(_))
// Only gather up fields that will have a value
val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_))
// The field containing the file to split
val originalInput = getFieldFile(scatterField)
// Create the scatter function based on @Scatter
val scatterFunction = this.newScatterFunction(this.scatterField)
syncFunction(scatterFunction)
scatterFunction.addOrder = this.addOrder :+ 1
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter-" + scatterField.field.getName)
scatterFunction.originalInput = originalInput
scatterFunction.setOriginalFunction(this, scatterField)
initScatterFunction(scatterFunction, this.scatterField)
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter")
scatterFunction.originalInputs = this.inputs
scatterFunction.isIntermediate = true
scatterFunction.setScatterGatherable(this)
initScatterFunction(scatterFunction)
functions :+= scatterFunction
// Create the gather functions for each output field
@ -92,7 +90,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = this.getFieldFile(gatherField)
gatherFunction.setOriginalFunction(this, gatherField)
gatherFunction.isIntermediate = this.isIntermediate
initGatherFunction(gatherFunction, gatherField)
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
@ -110,11 +108,11 @@ trait ScatterGatherableFunction extends CommandLineFunction {
cloneFunction.index = i
cloneFunction.addOrder = this.addOrder :+ (i+1)
cloneFunction.memoryLimit = this.memoryLimit
cloneFunction.isIntermediate = true
// Setup the fields on the clone function, outputting each as a relative file in the sg directory.
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)
var scatterPart = new File(originalInput.getName)
cloneFunction.setFieldValue(scatterField, scatterPart)
scatterFunction.initCloneInputs(cloneFunction, i)
for (gatherField <- outputFieldsWithValues) {
val gatherPart = new File(gatherOutputs(gatherField).getName)
cloneFunction.setFieldValue(gatherField, gatherPart)
@ -124,9 +122,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
initCloneFunction(cloneFunction, i)
// Get absolute paths to the files and bind the sg functions to the clone function via the absolute paths.
scatterPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(scatterField))
cloneFunction.setFieldValue(scatterField, scatterPart)
scatterFunction.scatterParts :+= scatterPart
scatterFunction.bindCloneInputs(cloneFunction, i)
for (gatherField <- outputFieldsWithValues) {
val gatherPart = IOUtils.subDir(cloneFunction.commandDirectory, cloneFunction.getFieldFile(gatherField))
cloneFunction.setFieldValue(gatherField, gatherPart)
@ -137,7 +133,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
}
functions ++= cloneFunctions
// Return all the various functions we created
// Return all the various created functions.
functions
}
@ -155,34 +151,24 @@ trait ScatterGatherableFunction extends CommandLineFunction {
}
/**
* Retrieves the scatter field from the first field that has the annotation @Scatter.
* Creates a new ScatterFunction.
* @return A ScatterFunction instantiated scatterClass
*/
protected lazy val scatterField =
this.inputFields.find(field => ReflectionUtils.hasAnnotation(field.field, classOf[Scatter])).get
/**
* Creates a new ScatterFunction for the scatterField.
* @param scatterField Field that defined @Scatter.
* @return A ScatterFunction instantiated from @Scatter or scatterClass if scatterClass was set on this ScatterGatherableFunction.
*/
protected def newScatterFunction(scatterField: ArgumentSource): ScatterFunction = {
var scatterClass = this.scatterClass
if (scatterClass == null)
scatterClass = ReflectionUtils.getAnnotation(scatterField.field, classOf[Scatter])
.value.asSubclass(classOf[ScatterFunction])
scatterClass.newInstance.asInstanceOf[ScatterFunction]
protected def newScatterFunction(): ScatterFunction = {
if (this.scatterClass == null)
throw new QException("scatterClass is null.")
this.scatterClass.newInstance.asInstanceOf[ScatterFunction]
}
/**
* Initializes the ScatterFunction created by newScatterFunction() that will create the scatter pieces in the temporary directories.
* Calls setupScatterFunction with scatterFunction.
* @param scatterFunction The function that will create the scatter pieces in the temporary directories.
* @param scatterField The input field being scattered.
*/
protected def initScatterFunction(scatterFunction: ScatterFunction, scatterField: ArgumentSource) = {
protected def initScatterFunction(scatterFunction: ScatterFunction) = {
if (this.setupScatterFunction != null)
if (this.setupScatterFunction.isDefinedAt(scatterFunction, scatterField))
this.setupScatterFunction(scatterFunction, scatterField)
if (this.setupScatterFunction.isDefinedAt(scatterFunction))
this.setupScatterFunction(scatterFunction)
}
/**
@ -236,7 +222,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* @param newFunction newly created function.
*/
protected def syncFunction(newFunction: QFunction) = {
newFunction.isIntermediate = this.isIntermediate
newFunction.analysisName = this.analysisName
newFunction.qSettings = this.qSettings
newFunction.jobTempDir = this.jobTempDir
@ -249,6 +234,11 @@ trait ScatterGatherableFunction extends CommandLineFunction {
}
}
/**
* The scatter function.
*/
private lazy val scatterFunction = newScatterFunction()
/**
* Returns a temporary directory under this scatter gather directory.
* @param Sub directory under the scatter gather directory.

View File

@ -1,23 +1,14 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.commandline.Argument
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.apache.commons.io.FileUtils
import org.broadinstitute.sting.queue.QException
import collection.JavaConversions._
import java.io.PrintWriter
import org.apache.commons.io.{LineIterator, IOUtils, FileUtils}
/**
* Merges a text file.
* The script can be changed by setting rmdirScript.
* By default uses mergeText.sh in Sting/shell.
* The format of the call is <mergeTextScript> <file_output> <file_1> [.. <file_n>]
*/
class SimpleTextGatherFunction extends GatherFunction with InProcessFunction {
@Argument(doc="merge text script")
var mergeTextScript = "mergeText.sh"
def run() = {
if (gatherParts.size < 1) {
throw new QException("No files to gather to output: " + originalOutput)
@ -25,38 +16,48 @@ class SimpleTextGatherFunction extends GatherFunction with InProcessFunction {
FileUtils.copyFile(gatherParts(0), originalOutput)
} else {
val writer = new PrintWriter(originalOutput)
var startLine = 0
try {
var startLine = 0
val readerA = FileUtils.lineIterator(gatherParts(0))
val readerB = FileUtils.lineIterator(gatherParts(1))
var headersMatch = true
while (headersMatch) {
if (readerA.hasNext && readerB.hasNext) {
val headerA = readerA.nextLine
val headerB = readerB.nextLine
headersMatch = headerA == headerB
if (headersMatch) {
startLine += 1
writer.println(headerA)
val readerA = FileUtils.lineIterator(gatherParts(0))
val readerB = FileUtils.lineIterator(gatherParts(1))
try {
var headersMatch = true
while (headersMatch) {
if (readerA.hasNext && readerB.hasNext) {
val headerA = readerA.nextLine
val headerB = readerB.nextLine
headersMatch = headerA == headerB
if (headersMatch) {
startLine += 1
writer.println(headerA)
}
} else {
headersMatch = false
}
}
} else {
headersMatch = false
} finally {
LineIterator.closeQuietly(readerA)
LineIterator.closeQuietly(readerB)
}
}
readerA.close
readerB.close
for (file <- gatherParts) {
val reader = FileUtils.lineIterator(file)
var lineNum = 0
while (reader.hasNext && lineNum < startLine) {
reader.nextLine
lineNum += 1
for (file <- gatherParts) {
val reader = FileUtils.lineIterator(file)
try {
var lineNum = 0
while (reader.hasNext && lineNum < startLine) {
reader.nextLine
lineNum += 1
}
while (reader.hasNext)
writer.println(reader.nextLine)
} finally {
LineIterator.closeQuietly(reader)
}
}
while (reader.hasNext)
writer.println(reader.nextLine)
} finally {
IOUtils.closeQuietly(writer)
}
writer.close
}
}
}

View File

@ -23,7 +23,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) {
trait StandardCommandLineGATK extends CommandLineGATK {
this.reference_sequence = library.attributes.getProject.getReferenceFile
this.intervals = library.attributes.getProject.getIntervalList
this.intervals :+= library.attributes.getProject.getIntervalList
this.DBSNP = library.attributes.getProject.getDbsnpFile
this.memoryLimit = Some(2)
this.jarFile = library.gatkJar
@ -35,7 +35,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) {
*/
def StandardRealignerTargetCreator(bam: File, contigs: List[String], output: File) : RealignerTargetCreator = {
var rtc = new RealignerTargetCreator with StandardCommandLineGATK
rtc.intervals = null
rtc.intervals = Nil
rtc.intervalsString = contigs
rtc.input_file :+= bam
rtc.out = output
@ -51,7 +51,7 @@ class BamProcessing(yaml: File, gatkJar: File, fixMatesJar: File) {
def StandardIndelCleaner(bam: File, contigs: List[String], targets: File, outBam: File) : IndelRealigner = {
var realigner = new IndelRealigner with StandardCommandLineGATK
realigner.intervalsString = contigs
realigner.intervals = null
realigner.intervals = Nil
realigner.input_file :+= bam
realigner.out = outBam
realigner.targetIntervals = targets

View File

@ -23,7 +23,7 @@ class VariantCalling(yaml: File,gatkJar: File) {
*/
trait StandardCommandLineGATK extends CommandLineGATK {
this.reference_sequence = vc.attributes.getProject.getReferenceFile
this.intervals = vc.attributes.getProject.getIntervalList
this.intervals :+= vc.attributes.getProject.getIntervalList
this.DBSNP = vc.attributes.getProject.getDbsnpFile
// set global memory limit on the low side. Additional input bams will affect it.
this.memoryLimit = Some(2)

View File

@ -19,6 +19,19 @@ object ReflectionUtils {
*/
def hasAnnotation(field: Field, annotation: Class[_ <: Annotation]) = field.getAnnotation(annotation) != null
/**
* Returns true if clazz or one of its superclasses has the annotation.
* @param clazz Class to check.
* @param annotation Class of the annotation to look for.
* @return true if field has the annotation.
*/
def hasAnnotation(clazz: Class[_], annotation: Class[_ <: Annotation]) = {
var foundAnnotation = false
while (!foundAnnotation && clazz != null)
foundAnnotation = (clazz.getAnnotation(annotation) != null)
foundAnnotation
}
/**
* Gets the annotation or throws an exception if the annotation is not found.
* @param field Field to check.
@ -26,9 +39,26 @@ object ReflectionUtils {
* @return The annotation.
*/
def getAnnotation[T <: Annotation](field: Field, annotation: Class[T]): T = {
if (!hasAnnotation(field, annotation))
throw new QException("Field %s is missing annotation %s".format(field, annotation))
field.getAnnotation(annotation).asInstanceOf[T]
field.getAnnotation(annotation) match {
case null =>
throw new QException("Field %s is missing annotation %s".format(field, annotation))
case fieldAnnotation => fieldAnnotation.asInstanceOf[T]
}
}
/**
* Gets the annotation or throws an exception if the annotation is not found.
* @param clazz Class to check.
* @param annotation Class of the annotation to look for.
* @return The annotation.
*/
def getAnnotation[T <: Annotation](clazz: Class[_], annotation: Class[T]): T = {
var result: T = null.asInstanceOf[T]
while (result == null && clazz != null)
result = clazz.getAnnotation(annotation)
if (result == null)
throw new QException("Class %s is missing annotation %s".format(clazz, annotation))
result
}
/**

View File

@ -0,0 +1,234 @@
package org.broadinstitute.sting.queue.extensions.gatk
import collection.JavaConversions._
import java.io.File
import org.junit.{Before, Assert, Test}
import org.broadinstitute.sting.BaseTest
import org.broadinstitute.sting.utils.interval.IntervalUtils
import org.broadinstitute.sting.utils.GenomeLocParser
import org.broadinstitute.sting.queue.QException
import net.sf.picard.reference.IndexedFastaSequenceFile
class IntervalScatterFunctionUnitTest extends BaseTest {
private def reference = new File(BaseTest.b36KGReference)
private var header: IndexedFastaSequenceFile = _
@Before
def setup() {
header = new IndexedFastaSequenceFile(reference)
GenomeLocParser.setupRefContigOrdering(header.getSequenceDictionary())
}
@Test
def testBasicScatter = {
val chr1 = GenomeLocParser.parseGenomeInterval("1")
val chr2 = GenomeLocParser.parseGenomeInterval("2")
val chr3 = GenomeLocParser.parseGenomeInterval("3")
val files = (1 to 3).toList.map(index => new File(testDir + "basic." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2", "3"), files, reference, false)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(1, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3, locs3.get(0))
}
@Test
def testScatterLessFiles = {
val chr1 = GenomeLocParser.parseGenomeInterval("1")
val chr2 = GenomeLocParser.parseGenomeInterval("2")
val chr3 = GenomeLocParser.parseGenomeInterval("3")
val chr4 = GenomeLocParser.parseGenomeInterval("4")
val files = (1 to 3).toList.map(index => new File(testDir + "less." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2", "3", "4"), files, reference, false)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(2, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2, locs1.get(1))
Assert.assertEquals(chr3, locs2.get(0))
Assert.assertEquals(chr4, locs3.get(0))
}
@Test(expected=classOf[QException])
def testScatterMoreFiles = {
val files = (1 to 3).toList.map(index => new File(testDir + "more." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2"), files, reference, false)
}
@Test
def testScatterIntervals = {
val intervals = List("1:1-2", "1:4-5", "2:1-1", "3:2-2")
val chr1a = GenomeLocParser.parseGenomeInterval("1:1-2")
val chr1b = GenomeLocParser.parseGenomeInterval("1:4-5")
val chr2 = GenomeLocParser.parseGenomeInterval("2:1-1")
val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2")
val files = (1 to 3).toList.map(index => new File(testDir + "split." + index + ".intervals"))
IntervalScatterFunction.scatter(intervals, files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(2, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1a, locs1.get(0))
Assert.assertEquals(chr1b, locs1.get(1))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3, locs3.get(0))
}
@Test
def testBasicScatterByContig = {
val chr1 = GenomeLocParser.parseGenomeInterval("1")
val chr2 = GenomeLocParser.parseGenomeInterval("2")
val chr3 = GenomeLocParser.parseGenomeInterval("3")
val files = (1 to 3).toList.map(index => new File(testDir + "contig_basic." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2", "3"), files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(1, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3, locs3.get(0))
}
@Test
def testScatterByContigLessFiles = {
val chr1 = GenomeLocParser.parseGenomeInterval("1")
val chr2 = GenomeLocParser.parseGenomeInterval("2")
val chr3 = GenomeLocParser.parseGenomeInterval("3")
val chr4 = GenomeLocParser.parseGenomeInterval("4")
val files = (1 to 3).toList.map(index => new File(testDir + "contig_less." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2", "3", "4"), files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(1, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(2, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3, locs3.get(0))
Assert.assertEquals(chr4, locs3.get(1))
}
@Test(expected=classOf[QException])
def testScatterByContigMoreFiles = {
val files = (1 to 3).toList.map(index => new File(testDir + "contig_more." + index + ".intervals"))
IntervalScatterFunction.scatter(List("1", "2"), files, reference, true)
}
@Test
def testScatterByContigIntervalsStart = {
val intervals = List("1:1-2", "1:4-5", "2:1-1", "3:2-2")
val chr1a = GenomeLocParser.parseGenomeInterval("1:1-2")
val chr1b = GenomeLocParser.parseGenomeInterval("1:4-5")
val chr2 = GenomeLocParser.parseGenomeInterval("2:1-1")
val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2")
val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_start." + index + ".intervals"))
IntervalScatterFunction.scatter(intervals, files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(2, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1a, locs1.get(0))
Assert.assertEquals(chr1b, locs1.get(1))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3, locs3.get(0))
}
@Test
def testScatterByContigIntervalsMiddle = {
val intervals = List("1:1-1", "2:1-2", "2:4-5", "3:2-2")
val chr1 = GenomeLocParser.parseGenomeInterval("1:1-1")
val chr2a = GenomeLocParser.parseGenomeInterval("2:1-2")
val chr2b = GenomeLocParser.parseGenomeInterval("2:4-5")
val chr3 = GenomeLocParser.parseGenomeInterval("3:2-2")
val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_middle." + index + ".intervals"))
IntervalScatterFunction.scatter(intervals, files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(1, locs1.size)
Assert.assertEquals(2, locs2.size)
Assert.assertEquals(1, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2a, locs2.get(0))
Assert.assertEquals(chr2b, locs2.get(1))
Assert.assertEquals(chr3, locs3.get(0))
}
@Test
def testScatterByContigIntervalsEnd = {
val intervals = List("1:1-1", "2:2-2", "3:1-2", "3:4-5")
val chr1 = GenomeLocParser.parseGenomeInterval("1:1-1")
val chr2 = GenomeLocParser.parseGenomeInterval("2:2-2")
val chr3a = GenomeLocParser.parseGenomeInterval("3:1-2")
val chr3b = GenomeLocParser.parseGenomeInterval("3:4-5")
val files = (1 to 3).toList.map(index => new File(testDir + "contig_split_end." + index + ".intervals"))
IntervalScatterFunction.scatter(intervals, files, reference, true)
val locs1 = IntervalUtils.parseIntervalArguments(List(files(0).toString), false)
val locs2 = IntervalUtils.parseIntervalArguments(List(files(1).toString), false)
val locs3 = IntervalUtils.parseIntervalArguments(List(files(2).toString), false)
Assert.assertEquals(1, locs1.size)
Assert.assertEquals(1, locs2.size)
Assert.assertEquals(2, locs3.size)
Assert.assertEquals(chr1, locs1.get(0))
Assert.assertEquals(chr2, locs2.get(0))
Assert.assertEquals(chr3a, locs3.get(0))
Assert.assertEquals(chr3b, locs3.get(1))
}
}