diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala new file mode 100644 index 000000000..953fe0830 --- /dev/null +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/DistributedScatterFunction.scala @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.extensions.gatk + +import java.io.File +import org.broadinstitute.sting.queue.function.scattergather.CloneFunction +import org.broadinstitute.sting.queue.function.InProcessFunction + +/** + * An scatter function that uses the Distributed GATK. + */ +class DistributedScatterFunction extends GATKScatterFunction with InProcessFunction { + private final val processingTracker = "processingTracker" + + this.scatterOutputFiles = List(new File(processingTracker)) + + override def initCloneInputs(cloneFunction: CloneFunction, index: Int) { + cloneFunction.setFieldValue("processingTracker", new File(this.commandDirectory, this.processingTracker)) + } + + override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) { + /* no further work needed after init. */ + } + + def run() { + /* doesn't actually need to run. */ + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala index f4da2aa4c..4abac23d3 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/GATKScatterFunction.scala @@ -28,9 +28,8 @@ import org.broadinstitute.sting.utils.interval.IntervalUtils import java.io.File import collection.JavaConversions._ import org.broadinstitute.sting.queue.util.IOUtils -import org.broadinstitute.sting.queue.function.QFunction import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction} -import org.broadinstitute.sting.commandline.{Output, ArgumentSource} +import org.broadinstitute.sting.commandline.Output trait GATKScatterFunction extends ScatterFunction { /** The total number of clone jobs that will be created. */ @@ -40,10 +39,10 @@ trait GATKScatterFunction extends ScatterFunction { protected var referenceSequence: File = _ /** The runtime field to set for specifying an interval file. */ - protected var intervalsField: ArgumentSource = _ + private final val intervalsField = "intervals" /** The runtime field to set for specifying an interval string. */ - protected var intervalsStringField: ArgumentSource = _ + private final val intervalsStringField = "intervalsString" /** The list of interval files ("/path/to/interval.list") or interval strings ("chr1", "chr2") to parse into smaller parts. */ protected var intervals: List[String] = Nil @@ -60,22 +59,19 @@ trait GATKScatterFunction extends ScatterFunction { * @return true if the function is a GATK function with the reference sequence set. * @throws IllegalArgumentException if -BTI or -BTIMR are set. QScripts should not try to scatter gather with those option set. */ - def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = { - if (originalFunction.isInstanceOf[CommandLineGATK]) { - val gatk = originalFunction.asInstanceOf[CommandLineGATK] - if ( gatk.BTI != null && gatk.BTIMR == null) throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)") - gatk.reference_sequence != null - } else false + override def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = { + val gatk = originalFunction.asInstanceOf[CommandLineGATK] + if (gatk.BTI != null && gatk.BTIMR == null) + throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)") + gatk.reference_sequence != null } /** * Sets the scatter gatherable function. * @param originalFunction Function to bind. */ - def setScatterGatherable(originalFunction: ScatterGatherableFunction) = { + override def setScatterGatherable(originalFunction: ScatterGatherableFunction) = { val gatk = originalFunction.asInstanceOf[CommandLineGATK] - this.intervalsField = QFunction.findField(originalFunction.getClass, "intervals") - this.intervalsStringField = QFunction.findField(originalFunction.getClass, "intervalsString") this.referenceSequence = gatk.reference_sequence if (gatk.intervals.isEmpty && gatk.intervalsString.isEmpty) { this.intervals ++= IntervalUtils.distinctContigs(this.referenceSequence).toList diff --git a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala index 7dc15ba44..95f311505 100644 --- a/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/extensions/gatk/VcfGatherFunction.scala @@ -1,47 +1,31 @@ package org.broadinstitute.sting.queue.extensions.gatk -import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.QException -import org.broadinstitute.sting.queue.function.scattergather.GatherFunction -import java.io.{FileReader, PrintWriter} -import org.apache.commons.io.{LineIterator, IOUtils, FileUtils} +import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, GatherFunction} /** * Merges a vcf text file. */ -class VcfGatherFunction extends GatherFunction with InProcessFunction { - def run() = { - waitForGatherParts - if (gatherParts.size < 1) { - throw new QException("No files to gather to output: " + originalOutput) - } else { - val writer = new PrintWriter(originalOutput) - try { - var reader = new FileReader(gatherParts(0)) - try { - IOUtils.copy(reader, writer) - } finally { - IOUtils.closeQuietly(reader) - } +class VcfGatherFunction extends CombineVariants with GatherFunction { - for (file <- gatherParts.tail) { - var inHeaders = true - val itor = FileUtils.lineIterator(file) - try { - while (itor.hasNext) { - val nextLine = itor.nextLine - if (inHeaders && nextLine(0) != '#') - inHeaders = false - if (!inHeaders) - writer.println(nextLine) - } - } finally { - LineIterator.closeQuietly(itor) - } - } - } finally { - IOUtils.closeQuietly(writer) - } - } + private var originalGATK: CommandLineGATK = _ + + override def setScatterGatherable(originalFunction: ScatterGatherableFunction) { + this.originalGATK = originalFunction.asInstanceOf[CommandLineGATK] } -} \ No newline at end of file + + override def freezeFieldValues = { + this.memoryLimit = Some(1) + + this.jarFile = this.originalGATK.jarFile + this.reference_sequence = this.originalGATK.reference_sequence + this.intervals = this.originalGATK.intervals + this.intervalsString = this.originalGATK.intervalsString + + this.rodBind = this.gatherParts.zipWithIndex map { case (input, index) => new RodBind("input"+index, "VCF", input) } + this.rod_priority_list = (0 until this.gatherParts.size).map("input"+_).mkString(",") + this.out = this.originalOutput + this.assumeIdenticalSamples = true + + super.freezeFieldValues + } +} diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala index f0bb1598b..bb983e6c0 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/CloneFunction.scala @@ -1,9 +1,9 @@ package org.broadinstitute.sting.queue.function.scattergather -import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.commandline.ArgumentSource import java.io.File import org.broadinstitute.sting.queue.QException +import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} /** * Shadow clones another command line function. @@ -47,7 +47,12 @@ class CloneFunction extends CommandLineFunction { def commandLine = withScatterPart(() => originalFunction.commandLine) - override def getFieldValue(source: ArgumentSource) = { + def getFieldValue(field: String): AnyRef = { + val source = QFunction.findField(originalFunction.getClass, field) + getFieldValue(source) + } + + override def getFieldValue(source: ArgumentSource): AnyRef = { source.field.getName match { case "jobOutputFile" => jobOutputFile case "jobErrorFile" => jobErrorFile @@ -62,6 +67,11 @@ class CloneFunction extends CommandLineFunction { } } + def setFieldValue(field: String, value: Any): Unit = { + val source = QFunction.findField(originalFunction.getClass, field) + setFieldValue(source, value) + } + override def setFieldValue(source: ArgumentSource, value: Any): Unit = { source.field.getName match { case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File] diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala index 28c3e89a2..cd60cbe02 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/GatherFunction.scala @@ -19,6 +19,12 @@ trait GatherFunction extends QFunction { @Output(doc="The original output of the scattered function") var originalOutput: File = _ + /** + * Sets the original ScatterGatherableFunction to be gathered. + * @param originalFunction The original function to with inputs bind to this scatter function. + */ + def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} + /** * Waits for gather parts to propagate over NFS or throws an exception. */ diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala index 790c575bf..2639521d9 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterFunction.scala @@ -16,13 +16,13 @@ trait ScatterFunction extends QFunction { * @param originalFunction The original function to check. * @return true if the scatter function can scatter this original function. */ - def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean + def isScatterGatherable(originalFunction: ScatterGatherableFunction) = true /** * Sets the original ScatterGatherableFunction to be scattered. * @param originalFunction The original function to with inputs bind to this scatter function. */ - def setScatterGatherable(originalFunction: ScatterGatherableFunction) + def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} /** * After a call to setScatterGatherable(), returns the number of clones that should be created. diff --git a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala index 91561609b..202d7230a 100644 --- a/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala +++ b/scala/src/org/broadinstitute/sting/queue/function/scattergather/ScatterGatherableFunction.scala @@ -77,6 +77,7 @@ trait ScatterGatherableFunction extends CommandLineFunction { scatterFunction.isIntermediate = true scatterFunction.setScatterGatherable(this) initScatterFunction(scatterFunction) + scatterFunction.absoluteCommandDirectory() functions :+= scatterFunction // Ask the scatter function how many clones to create. @@ -98,7 +99,9 @@ trait ScatterGatherableFunction extends CommandLineFunction { gatherFunction.addOrder = this.addOrder :+ gatherAddOrder gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName) gatherFunction.originalOutput = gatherOutput + gatherFunction.setScatterGatherable(this) initGatherFunction(gatherFunction, gatherField) + gatherFunction.absoluteCommandDirectory() functions :+= gatherFunction gatherFunctions += gatherField -> gatherFunction gatherOutputs += gatherField -> gatherOutput