Want to get this into Ryan's hands asap: First working version of a distributed scatter function.

More refactoring to do so that other new scatter functions can be implemented very easily and annotated on walkers.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5363 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-03-03 01:40:08 +00:00
parent 0181d95fe4
commit a0309e7fb0
7 changed files with 105 additions and 56 deletions

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.extensions.gatk
import java.io.File
import org.broadinstitute.sting.queue.function.scattergather.CloneFunction
import org.broadinstitute.sting.queue.function.InProcessFunction
/**
* An scatter function that uses the Distributed GATK.
*/
class DistributedScatterFunction extends GATKScatterFunction with InProcessFunction {
private final val processingTracker = "processingTracker"
this.scatterOutputFiles = List(new File(processingTracker))
override def initCloneInputs(cloneFunction: CloneFunction, index: Int) {
cloneFunction.setFieldValue("processingTracker", new File(this.commandDirectory, this.processingTracker))
}
override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) {
/* no further work needed after init. */
}
def run() {
/* doesn't actually need to run. */
}
}

View File

@ -28,9 +28,8 @@ import org.broadinstitute.sting.utils.interval.IntervalUtils
import java.io.File
import collection.JavaConversions._
import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.function.QFunction
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction}
import org.broadinstitute.sting.commandline.{Output, ArgumentSource}
import org.broadinstitute.sting.commandline.Output
trait GATKScatterFunction extends ScatterFunction {
/** The total number of clone jobs that will be created. */
@ -40,10 +39,10 @@ trait GATKScatterFunction extends ScatterFunction {
protected var referenceSequence: File = _
/** The runtime field to set for specifying an interval file. */
protected var intervalsField: ArgumentSource = _
private final val intervalsField = "intervals"
/** The runtime field to set for specifying an interval string. */
protected var intervalsStringField: ArgumentSource = _
private final val intervalsStringField = "intervalsString"
/** The list of interval files ("/path/to/interval.list") or interval strings ("chr1", "chr2") to parse into smaller parts. */
protected var intervals: List[String] = Nil
@ -60,22 +59,19 @@ trait GATKScatterFunction extends ScatterFunction {
* @return true if the function is a GATK function with the reference sequence set.
* @throws IllegalArgumentException if -BTI or -BTIMR are set. QScripts should not try to scatter gather with those option set.
*/
def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = {
if (originalFunction.isInstanceOf[CommandLineGATK]) {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
if ( gatk.BTI != null && gatk.BTIMR == null) throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)")
gatk.reference_sequence != null
} else false
override def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
if (gatk.BTI != null && gatk.BTIMR == null)
throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)")
gatk.reference_sequence != null
}
/**
* Sets the scatter gatherable function.
* @param originalFunction Function to bind.
*/
def setScatterGatherable(originalFunction: ScatterGatherableFunction) = {
override def setScatterGatherable(originalFunction: ScatterGatherableFunction) = {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
this.intervalsField = QFunction.findField(originalFunction.getClass, "intervals")
this.intervalsStringField = QFunction.findField(originalFunction.getClass, "intervalsString")
this.referenceSequence = gatk.reference_sequence
if (gatk.intervals.isEmpty && gatk.intervalsString.isEmpty) {
this.intervals ++= IntervalUtils.distinctContigs(this.referenceSequence).toList

View File

@ -1,47 +1,31 @@
package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
import java.io.{FileReader, PrintWriter}
import org.apache.commons.io.{LineIterator, IOUtils, FileUtils}
import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, GatherFunction}
/**
* Merges a vcf text file.
*/
class VcfGatherFunction extends GatherFunction with InProcessFunction {
def run() = {
waitForGatherParts
if (gatherParts.size < 1) {
throw new QException("No files to gather to output: " + originalOutput)
} else {
val writer = new PrintWriter(originalOutput)
try {
var reader = new FileReader(gatherParts(0))
try {
IOUtils.copy(reader, writer)
} finally {
IOUtils.closeQuietly(reader)
}
class VcfGatherFunction extends CombineVariants with GatherFunction {
for (file <- gatherParts.tail) {
var inHeaders = true
val itor = FileUtils.lineIterator(file)
try {
while (itor.hasNext) {
val nextLine = itor.nextLine
if (inHeaders && nextLine(0) != '#')
inHeaders = false
if (!inHeaders)
writer.println(nextLine)
}
} finally {
LineIterator.closeQuietly(itor)
}
}
} finally {
IOUtils.closeQuietly(writer)
}
}
private var originalGATK: CommandLineGATK = _
override def setScatterGatherable(originalFunction: ScatterGatherableFunction) {
this.originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
}
}
override def freezeFieldValues = {
this.memoryLimit = Some(1)
this.jarFile = this.originalGATK.jarFile
this.reference_sequence = this.originalGATK.reference_sequence
this.intervals = this.originalGATK.intervals
this.intervalsString = this.originalGATK.intervalsString
this.rodBind = this.gatherParts.zipWithIndex map { case (input, index) => new RodBind("input"+index, "VCF", input) }
this.rod_priority_list = (0 until this.gatherParts.size).map("input"+_).mkString(",")
this.out = this.originalOutput
this.assumeIdenticalSamples = true
super.freezeFieldValues
}
}

View File

@ -1,9 +1,9 @@
package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.queue.function.CommandLineFunction
import org.broadinstitute.sting.commandline.ArgumentSource
import java.io.File
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
/**
* Shadow clones another command line function.
@ -47,7 +47,12 @@ class CloneFunction extends CommandLineFunction {
def commandLine = withScatterPart(() => originalFunction.commandLine)
override def getFieldValue(source: ArgumentSource) = {
def getFieldValue(field: String): AnyRef = {
val source = QFunction.findField(originalFunction.getClass, field)
getFieldValue(source)
}
override def getFieldValue(source: ArgumentSource): AnyRef = {
source.field.getName match {
case "jobOutputFile" => jobOutputFile
case "jobErrorFile" => jobErrorFile
@ -62,6 +67,11 @@ class CloneFunction extends CommandLineFunction {
}
}
def setFieldValue(field: String, value: Any): Unit = {
val source = QFunction.findField(originalFunction.getClass, field)
setFieldValue(source, value)
}
override def setFieldValue(source: ArgumentSource, value: Any): Unit = {
source.field.getName match {
case "jobOutputFile" => jobOutputFile = value.asInstanceOf[File]

View File

@ -19,6 +19,12 @@ trait GatherFunction extends QFunction {
@Output(doc="The original output of the scattered function")
var originalOutput: File = _
/**
* Sets the original ScatterGatherableFunction to be gathered.
* @param originalFunction The original function to with inputs bind to this scatter function.
*/
def setScatterGatherable(originalFunction: ScatterGatherableFunction) {}
/**
* Waits for gather parts to propagate over NFS or throws an exception.
*/

View File

@ -16,13 +16,13 @@ trait ScatterFunction extends QFunction {
* @param originalFunction The original function to check.
* @return true if the scatter function can scatter this original function.
*/
def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = true
/**
* Sets the original ScatterGatherableFunction to be scattered.
* @param originalFunction The original function to with inputs bind to this scatter function.
*/
def setScatterGatherable(originalFunction: ScatterGatherableFunction)
def setScatterGatherable(originalFunction: ScatterGatherableFunction) {}
/**
* After a call to setScatterGatherable(), returns the number of clones that should be created.

View File

@ -77,6 +77,7 @@ trait ScatterGatherableFunction extends CommandLineFunction {
scatterFunction.isIntermediate = true
scatterFunction.setScatterGatherable(this)
initScatterFunction(scatterFunction)
scatterFunction.absoluteCommandDirectory()
functions :+= scatterFunction
// Ask the scatter function how many clones to create.
@ -98,7 +99,9 @@ trait ScatterGatherableFunction extends CommandLineFunction {
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = gatherOutput
gatherFunction.setScatterGatherable(this)
initGatherFunction(gatherFunction, gatherField)
gatherFunction.absoluteCommandDirectory()
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput