Finally figured out what data in the LSF C API call lsb_readjobinfo is causing JNA to SIGSEGV with a strlen error:

- LSF recycles memory for C arrays, but sets a separate variable setting the size of the array to zero.
- JNA only sees the non-NULL pointer and starts to auto-access it, sometimes causing a SIGSEGV.
- In the short term neutered the jobInfoEnt structure so that this bad array is not autoRead().

QGraph updates:
- Job status is now checked in bulk every 30 seconds instead of one job at a time, even in the middle of dispatching jobs.
- If there is a hiccup (unexpected but not fatal error) during status check then the the error is ignored and status is checked again 30 seconds later.
- Jobs prefer to dispatch depth vs. breadth first.

More refactoring of SG framework separating the reusable code from the implementations.
The DistributedScatterFunction is still a work in progress and is not enabled yet. Still need to think through how Queue handles when a job dies.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5387 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
kshakir 2011-03-07 04:29:06 +00:00
parent 60ddc08cdf
commit 92045ecaa6
16 changed files with 475 additions and 316 deletions

View File

@ -6409,7 +6409,28 @@ public class LibBat {
/** /**
* < Detail reservation information for each kind of resource * < Detail reservation information for each kind of resource
*/ */
public reserveItem.ByReference items; /*
TODO: Go against the JNA recommendations at
http://jna.java.net/#structure_use and instead change each structure
to use Pointers instead of Structure.ByReference while also providing
Structure(Pointer) constructors for use for future API users.
LSF will often reuse memory for structure arrays but will set the
array size / count (reserveCnt above) to zero when the array should
not be accessed. When LSF has reused memory and points to a non-null
structure pointer (items) the inner structure may contain further
garbage pointers (especially items->resName).
When JNA sees a non-null Structure.ByReference it will autoRead() the
member. When autoRead() eventually gets to the items->resName trying
to run strlen on the bad memory address causes a SIGSEGV.
By using a Pointer instead of the Structure.ByReference JNA will not
automatically autoRead(), and the API user will have to pass the
pointer to the Structure on their own.
*/
//public reserveItem.ByReference items;
public Pointer items;
/** /**
* < Absolute priority scheduling (APS) string set by administrators to denote ADMIN factor APS value. * < Absolute priority scheduling (APS) string set by administrators to denote ADMIN factor APS value.

View File

@ -15,15 +15,6 @@ trait CommandLineJobRunner extends JobRunner[CommandLineFunction] with Logging {
/** Which directory to use for the job status files. */ /** Which directory to use for the job status files. */
protected def jobStatusDir = function.jobTempDir protected def jobStatusDir = function.jobTempDir
/** The last time the status returned unknown. */
protected var firstUnknownTime: Option[Long] = None
/** Amount of time the status can return unknown before giving up. */
protected val unknownStatusMaxSeconds = 5 * 60
/** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
protected val retryExpiredSeconds = 5 * 60
/** /**
* Writes the function command line to an exec file. * Writes the function command line to an exec file.
*/ */

View File

@ -18,6 +18,11 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
*/ */
var retries = 0 var retries = 0
/**
* The depth of this edge in the graph.
*/
var depth = -1
/** /**
* Initializes with the current status of the function. * Initializes with the current status of the function.
*/ */

View File

@ -17,9 +17,16 @@ trait JobManager[TFunction <: QFunction, TRunner <: JobRunner[TFunction]] {
*/ */
def create(function: TFunction): TRunner def create(function: TFunction): TRunner
/**
* Updates the status on a list of functions.
* @param runners Runners to update.
*/
def updateStatus(runners: List[TRunner]) {
}
/** /**
* Stops a list of functions. * Stops a list of functions.
* @param runner Runners to stop. * @param runners Runners to stop.
*/ */
def tryStop(runners: List[TRunner]) { def tryStop(runners: List[TRunner]) {
} }

View File

@ -9,5 +9,7 @@ class Lsf706JobManager extends JobManager[CommandLineFunction, Lsf706JobRunner]
def runnerType = classOf[Lsf706JobRunner] def runnerType = classOf[Lsf706JobRunner]
def functionType = classOf[CommandLineFunction] def functionType = classOf[CommandLineFunction]
def create(function: CommandLineFunction) = new Lsf706JobRunner(function) def create(function: CommandLineFunction) = new Lsf706JobRunner(function)
override def updateStatus(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.updateStatus(runners) }
override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) } override def tryStop(runners: List[Lsf706JobRunner]) { Lsf706JobRunner.tryStop(runners) }
} }

View File

@ -7,7 +7,6 @@ import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.utils.Utils
import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.clibrary.LibC
import java.util.Date
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
import com.sun.jna.ptr.IntByReference import com.sun.jna.ptr.IntByReference
import com.sun.jna.{StringArray, NativeLong} import com.sun.jna.{StringArray, NativeLong}
@ -21,10 +20,13 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
Lsf706JobRunner Lsf706JobRunner
/** Job Id of the currently executing job. */ /** Job Id of the currently executing job. */
var jobId = -1L private var jobId = -1L
/** Last known run status */ /** Last known status */
private var runStatus: RunnerStatus.Value = _ private var lastStatus: RunnerStatus.Value = _
/** The last time the status was updated */
protected var lastStatusUpdate: Long = _
/** /**
* Dispatches the function on the LSF cluster. * Dispatches the function on the LSF cluster.
@ -82,7 +84,7 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
// Allow advanced users to update the request. // Allow advanced users to update the request.
updateJobRun(request) updateJobRun(request)
runStatus = RunnerStatus.RUNNING updateStatus(RunnerStatus.RUNNING)
Retry.attempt(() => { Retry.attempt(() => {
val reply = new submitReply val reply = new submitReply
jobId = LibBat.lsb_submit(request, reply) jobId = LibBat.lsb_submit(request, reply)
@ -93,93 +95,24 @@ class Lsf706JobRunner(val function: CommandLineFunction) extends CommandLineJobR
} }
} }
/** def status = this.lastStatus
* Updates and returns the status.
*/
def status = {
Lsf706JobRunner.lsfLibLock.synchronized {
var jobStatus = LibBat.JOB_STAT_UNKWN
var exitStatus = 0
var exitInfo = 0
var endTime: NativeLong = null
var result = 0 private def updateStatus(updatedStatus: RunnerStatus.Value) = {
Retry.attempt(() => { this.lastStatus = updatedStatus
result = LibBat.lsb_openjobinfo(jobId, null, null, null, null, LibBat.ALL_JOB) this.lastStatusUpdate = System.currentTimeMillis
if (result < 0)
throw new QException(LibBat.lsb_sperror("Unable to open LSF job info for job id: " + jobId))
}, 0.5, 1, 2)
try {
if (result > 1)
throw new QException(LibBat.lsb_sperror("Recieved " + result + " LSF results for job id: " + jobId))
else if (result == 1) {
val more = new IntByReference(result)
val jobInfo = LibBat.lsb_readjobinfo(more)
if (jobInfo == null)
throw new QException(LibBat.lsb_sperror("lsb_readjobinfo returned null for job id: " + jobId))
jobStatus = jobInfo.status
exitStatus = jobInfo.exitStatus
exitInfo = jobInfo.exitInfo
endTime = jobInfo.endTime
}
} finally {
LibBat.lsb_closejobinfo()
}
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(jobId, jobStatus, exitStatus, exitInfo))
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_UNKWN)) {
val now = new Date().getTime
if (firstUnknownTime.isEmpty) {
firstUnknownTime = Some(now)
logger.debug("First unknown status for job id: " + jobId)
}
if ((firstUnknownTime.get - now) >= (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runStatus = RunnerStatus.FAILED
logger.error("Unknown status for %d seconds: job id %d: %s".format(unknownStatusMaxSeconds, jobId, function.description))
}
} else {
// Reset the last time an unknown status was seen.
firstUnknownTime = None
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
runStatus = RunnerStatus.FAILED
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
runStatus = RunnerStatus.DONE
}
}
runStatus
}
} }
/**
* Returns true if LSF is expected to retry running the function.
* @param exitInfo The reason the job exited.
* @param endTime THe time the job exited.
* @return true if LSF is expected to retry running the function.
*/
private def willRetry(exitInfo: Int, endTime: NativeLong) = {
exitInfo match {
case LibBat.EXIT_NORMAL => false
case _ => {
val seconds = LibC.difftime(LibC.time(null), endTime)
(seconds <= retryExpiredSeconds)
}
}
}
} }
object Lsf706JobRunner extends Logging { object Lsf706JobRunner extends Logging {
private val lsfLibLock = new Object private val lsfLibLock = new Object
private val SIGTERM = 15 private val SIGTERM = 15
/** Number of seconds for a non-normal exit status before we give up on expecting LSF to retry the function. */
private val retryExpiredSeconds = 5 * 60
/** Amount of time a job can go without status before giving up. */
private val unknownStatusMaxSeconds = 5 * 60
init() init()
/** The name of the default queue. */ /** The name of the default queue. */
@ -238,6 +171,44 @@ object Lsf706JobRunner extends Logging {
} }
} }
/**
* Updates the status of a list of jobs.
*/
def updateStatus(runners: List[Lsf706JobRunner]) {
var updatedRunners = List.empty[Lsf706JobRunner]
Lsf706JobRunner.lsfLibLock.synchronized {
val result = LibBat.lsb_openjobinfo(0L, null, null, null, null, LibBat.ALL_JOB)
if (result < 0) {
logger.error(LibBat.lsb_sperror("Unable to check LSF job info"))
} else {
try {
val more = new IntByReference(result)
while (more.getValue > 0) {
val jobInfo = LibBat.lsb_readjobinfo(more)
if (jobInfo == null) {
logger.error(LibBat.lsb_sperror("Unable to read LSF job info"))
more.setValue(0)
} else {
runners.find(runner => runner.jobId == jobInfo.jobId) match {
case Some(runner) =>
updateRunnerStatus(runner, jobInfo)
updatedRunners :+= runner
case None => /* not our job */
}
}
}
} finally {
LibBat.lsb_closejobinfo()
}
}
}
for (runner <- runners.diff(updatedRunners)) {
checkUnknownStatus(runner)
}
}
/** /**
* Tries to stop any running jobs. * Tries to stop any running jobs.
* @param runners Runners to stop. * @param runners Runners to stop.
@ -246,15 +217,67 @@ object Lsf706JobRunner extends Logging {
lsfLibLock.synchronized { lsfLibLock.synchronized {
// lsb_killbulkjobs does not seem to forward SIGTERM, // lsb_killbulkjobs does not seem to forward SIGTERM,
// only SIGKILL, so send the Ctrl-C (SIGTERM) one by one. // only SIGKILL, so send the Ctrl-C (SIGTERM) one by one.
for (jobRunner <- runners.filterNot(_.jobId < 0)) { for (runner <- runners.filterNot(_.jobId < 0)) {
try { try {
if (LibBat.lsb_signaljob(jobRunner.jobId, SIGTERM) < 0) if (LibBat.lsb_signaljob(runner.jobId, SIGTERM) < 0)
logger.error(LibBat.lsb_sperror("Unable to kill job " + jobRunner.jobId)) logger.error(LibBat.lsb_sperror("Unable to kill job " + runner.jobId))
} catch { } catch {
case e => case e =>
logger.error("Unable to kill job " + jobRunner.jobId, e) logger.error("Unable to kill job " + runner.jobId, e)
} }
} }
} }
} }
private def updateRunnerStatus(runner: Lsf706JobRunner, jobInfo: LibBat.jobInfoEnt) {
val jobStatus = jobInfo.status
val exitStatus = jobInfo.exitStatus
val exitInfo = jobInfo.exitInfo
val endTime = jobInfo.endTime
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo))
runner.updateStatus(
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
// Done successfully.
RunnerStatus.DONE
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
// Exited function that (probably) won't be retried.
RunnerStatus.FAILED
} else {
// Note that we still saw the job in the system.
RunnerStatus.RUNNING
}
)
}
private def checkUnknownStatus(runner: Lsf706JobRunner) {
// TODO: Need a second pass through either of the two archive logs using lsb_geteventrecbyline() for disappeared jobs.
// Can also tell if we wake up and the last time we saw status was greater than lsb_parameterinfo().cleanPeriod
// LSB_SHAREDIR/cluster_name/logdir/lsb.acct (man bacct)
// LSB_SHAREDIR/cluster_name/logdir/lsb.events (man bhist)
logger.debug("Job Id %s status / exitStatus / exitInfo: ??? / ??? / ???".format(runner.jobId))
val unknownStatusSeconds = (System.currentTimeMillis - runner.lastStatusUpdate)
if (unknownStatusSeconds > (unknownStatusMaxSeconds * 1000L)) {
// Unknown status has been returned for a while now.
runner.updateStatus(RunnerStatus.FAILED)
logger.error("Unable to read LSF status for %d minutes: job id %d: %s".format(unknownStatusSeconds/60, runner.jobId, runner.function.description))
}
}
/**
* Returns true if LSF is expected to retry running the function.
* @param exitInfo The reason the job exited.
* @param endTime THe time the job exited.
* @return true if LSF is expected to retry running the function.
*/
private def willRetry(exitInfo: Int, endTime: NativeLong) = {
exitInfo match {
case LibBat.EXIT_NORMAL => false
case _ => {
val seconds = LibC.difftime(LibC.time(null), endTime)
(seconds <= retryExpiredSeconds)
}
}
}
} }

View File

@ -13,7 +13,7 @@ import org.broadinstitute.sting.queue.function.{InProcessFunction, CommandLineFu
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, GatherFunction, ScatterGatherableFunction}
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.broadinstitute.sting.queue.util._ import org.broadinstitute.sting.queue.util._
import collection.immutable.TreeMap import collection.immutable.{TreeSet, TreeMap}
/** /**
* The internal dependency tracker between sets of function input and output files. * The internal dependency tracker between sets of function input and output files.
@ -26,18 +26,24 @@ class QGraph extends Logging {
private var numMissingValues = 0 private var numMissingValues = 0
private val jobGraph = newGraph private val jobGraph = newGraph
private val functionOrdering = Ordering.by[FunctionEdge, Iterable[Int]](edge => -graphDepth(edge) +: edge.function.addOrder)
private val fileOrdering = Ordering.by[File,String](_.getAbsolutePath)
// A map of nodes by list of files. // A map of nodes by list of files.
private var nodeMap = TreeMap.empty[Iterable[File], QNode](Ordering.Iterable(Ordering.by[File,String](_.getAbsolutePath))) private var nodeMap = TreeMap.empty[Iterable[File], QNode](Ordering.Iterable(fileOrdering))
// The next unique id for a node if not found in the nodeMap. // The next unique id for a node if not found in the nodeMap.
private var nextNodeId = 0 private var nextNodeId = 0
private var running = true private var running = true
private val runningLock = new Object private val runningLock = new Object
private var runningJobs = Set.empty[FunctionEdge]
private val nl = "%n".format() private val nl = "%n".format()
private val inProcessManager = new InProcessJobManager private val inProcessManager = new InProcessJobManager
private var commandLineManager: JobManager[CommandLineFunction, _<:JobRunner[CommandLineFunction]] = _ private var commandLineManager: JobManager[CommandLineFunction, _<:JobRunner[CommandLineFunction]] = _
private def managers = List[Any](inProcessManager, commandLineManager)
/** /**
* Adds a QScript created CommandLineFunction to the graph. * Adds a QScript created CommandLineFunction to the graph.
* @param command Function to add to the graph. * @param command Function to add to the graph.
@ -48,8 +54,8 @@ class QGraph extends Logging {
if (running) { if (running) {
command.qSettings = settings.qSettings command.qSettings = settings.qSettings
command.freeze command.freeze
val inputs = getQNode(command.inputs.toList.sortWith(_.compareTo(_) < 0)) val inputs = getQNode(command.inputs.toList.sorted(fileOrdering))
val outputs = getQNode(command.outputs.toList.sortWith(_.compareTo(_) < 0)) val outputs = getQNode(command.outputs.toList.sorted(fileOrdering))
addEdge(new FunctionEdge(command, inputs, outputs)) addEdge(new FunctionEdge(command, inputs, outputs))
} }
} }
@ -143,19 +149,17 @@ class QGraph extends Logging {
} }
/** /**
* Walks up the graph looking for the previous command line edges. * Walks up the graph looking for the previous function edges.
* @param function Function to examine for a previous command line job. * @param edge Graph edge to examine for the previous functions.
* @param qGraph The graph that contains the jobs. * @return A list of prior function edges.
* @return A list of prior jobs.
*/ */
private def previousFunctions(edge: QEdge): List[FunctionEdge] = { private def previousFunctions(edge: QEdge): List[FunctionEdge] = {
var previous = List.empty[FunctionEdge] var previous = List.empty[FunctionEdge]
val source = this.jobGraph.getEdgeSource(edge) val source = this.jobGraph.getEdgeSource(edge)
for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) { for (incomingEdge <- this.jobGraph.incomingEdgesOf(source)) {
incomingEdge match { incomingEdge match {
// Stop recursing when we find a job along the edge and return its job id // Stop recursing when we find a function edge and return it
case functionEdge: FunctionEdge => previous :+= functionEdge case functionEdge: FunctionEdge => previous :+= functionEdge
// For any other type of edge find the jobs preceding the edge // For any other type of edge find the jobs preceding the edge
@ -190,19 +194,12 @@ class QGraph extends Logging {
}) })
} }
private def getReadyJobs = { private def getReadyJobs(): Set[FunctionEdge] = {
jobGraph.edgeSet.filter{ jobGraph.edgeSet.filter{
case f: FunctionEdge => case f: FunctionEdge =>
this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING this.previousFunctions(f).forall(_.status == RunnerStatus.DONE) && f.status == RunnerStatus.PENDING
case _ => false case _ => false
}.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_)) }.toSet.asInstanceOf[Set[FunctionEdge]]
}
private def getRunningJobs = {
jobGraph.edgeSet.filter{
case f: FunctionEdge => f.status == RunnerStatus.RUNNING
case _ => false
}.toList.asInstanceOf[List[FunctionEdge]].sortWith(compare(_,_))
} }
/** /**
@ -260,8 +257,13 @@ class QGraph extends Logging {
* Dry-runs the jobs by traversing the graph. * Dry-runs the jobs by traversing the graph.
*/ */
private def dryRunJobs() { private def dryRunJobs() {
updateGraphStatus(false) if (settings.startFromScratch) {
var readyJobs = getReadyJobs logger.info("Will remove outputs from previous runs.")
foreachFunction(_.resetToPending(false))
} else
updateGraphStatus(false)
var readyJobs = getReadyJobs()
while (running && readyJobs.size > 0) { while (running && readyJobs.size > 0) {
logger.debug("+++++++") logger.debug("+++++++")
readyJobs.foreach(edge => { readyJobs.foreach(edge => {
@ -270,7 +272,7 @@ class QGraph extends Logging {
edge.markAsDone edge.markAsDone
} }
}) })
readyJobs = getReadyJobs readyJobs = getReadyJobs()
} }
} }
@ -311,39 +313,45 @@ class QGraph extends Logging {
} else } else
updateGraphStatus(true) updateGraphStatus(true)
var readyJobs = getReadyJobs var readyJobs = TreeSet.empty[FunctionEdge](functionOrdering)
var runningJobs = Set.empty[FunctionEdge] readyJobs ++= getReadyJobs()
runningJobs = Set.empty[FunctionEdge]
var lastRunningCheck = System.currentTimeMillis
while (running && readyJobs.size + runningJobs.size > 0) { while (running && readyJobs.size + runningJobs.size > 0) {
var exitedJobs = List.empty[FunctionEdge]
while (running && readyJobs.size > 0 && nextRunningCheck(lastRunningCheck) > 0) {
val edge = readyJobs.head
edge.runner = newRunner(edge.function)
edge.start()
runningJobs += edge
readyJobs -= edge
}
if (readyJobs.size == 0 && runningJobs.size > 0)
Thread.sleep(nextRunningCheck(lastRunningCheck))
lastRunningCheck = System.currentTimeMillis
updateStatus()
var doneJobs = List.empty[FunctionEdge]
var failedJobs = List.empty[FunctionEdge] var failedJobs = List.empty[FunctionEdge]
runningJobs.foreach(runner => runner.status match { runningJobs.foreach(edge => edge.status match {
case RunnerStatus.DONE => doneJobs :+= edge
case RunnerStatus.FAILED => failedJobs :+= edge
case RunnerStatus.RUNNING => /* do nothing while still running */ case RunnerStatus.RUNNING => /* do nothing while still running */
case RunnerStatus.FAILED => exitedJobs :+= runner; failedJobs :+= runner
case RunnerStatus.DONE => exitedJobs :+= runner
})
exitedJobs.foreach(runner => runningJobs -= runner)
readyJobs.foreach(f => {
if (running) {
f.runner = newRunner(f.function)
f.start()
f.status match {
case RunnerStatus.RUNNING => runningJobs += f
case RunnerStatus.FAILED => failedJobs :+= f
case RunnerStatus.DONE => /* do nothing and move on */
}
}
}) })
if (failedJobs.size > 0) { runningJobs --= doneJobs
runningJobs --= failedJobs
if (running && failedJobs.size > 0) {
emailFailedJobs(failedJobs) emailFailedJobs(failedJobs)
checkRetryJobs(failedJobs) checkRetryJobs(failedJobs)
} }
if (readyJobs.size == 0 && runningJobs.size > 0) readyJobs ++= getReadyJobs()
Thread.sleep(30000L)
readyJobs = getReadyJobs
} }
deleteIntermediateOutputs() deleteIntermediateOutputs()
@ -356,6 +364,8 @@ class QGraph extends Logging {
} }
} }
private def nextRunningCheck(lastRunningCheck: Long) = 0L max ((30 * 1000L) - (System.currentTimeMillis - lastRunningCheck))
/** /**
* Updates the status of edges in the graph. * Updates the status of edges in the graph.
* @param cleanOutputs If true will delete outputs when setting edges to pending. * @param cleanOutputs If true will delete outputs when setting edges to pending.
@ -388,6 +398,22 @@ class QGraph extends Logging {
} }
} }
/**
* Returns the graph depth for the function.
* @param edge Function edge to get the edge for.
* @return the graph depth for the function.
*/
private def graphDepth(edge: FunctionEdge): Int = {
if (edge.depth < 0) {
val previous = previousFunctions(edge)
if (previous.size == 0)
edge.depth = 0
else
edge.depth = previous.map(f => graphDepth(f)).max + 1
}
edge.depth
}
/** /**
* From the previous edges, resets any that are marked as skipped to pending. * From the previous edges, resets any that are marked as skipped to pending.
* If those that are reset have skipped edges, those skipped edges are recursively also set * If those that are reset have skipped edges, those skipped edges are recursively also set
@ -415,7 +441,7 @@ class QGraph extends Logging {
} }
private def emailFailedJobs(failed: List[FunctionEdge]) { private def emailFailedJobs(failed: List[FunctionEdge]) {
if (running && settings.statusEmailTo.size > 0) { if (settings.statusEmailTo.size > 0) {
val emailMessage = new EmailMessage val emailMessage = new EmailMessage
emailMessage.from = settings.statusEmailFrom emailMessage.from = settings.statusEmailFrom
emailMessage.to = settings.statusEmailTo emailMessage.to = settings.statusEmailTo
@ -700,32 +726,10 @@ class QGraph extends Logging {
jobGraph.edgeSet.toList jobGraph.edgeSet.toList
.filter(_.isInstanceOf[FunctionEdge]) .filter(_.isInstanceOf[FunctionEdge])
.asInstanceOf[List[FunctionEdge]] .asInstanceOf[List[FunctionEdge]]
.sortWith(compare(_,_)) .sorted(functionOrdering)
.foreach(edge => if (running) f(edge)) .foreach(edge => if (running) f(edge))
} }
private def compare(f1: FunctionEdge, f2: FunctionEdge): Boolean =
compare(f1.function, f2.function)
private def compare(f1: QFunction, f2: QFunction): Boolean = {
val len1 = f1.addOrder.size
val len2 = f2.addOrder.size
val len = len1 min len2
for (i <- 0 until len) {
val order1 = f1.addOrder(i)
val order2 = f2.addOrder(i)
if (order1 < order2)
return true
if (order1 > order2)
return false
}
if (len1 < len2)
return true
else
return false
}
/** /**
* Utility function for running a method over all functions, but traversing the nodes in order of dependency. * Utility function for running a method over all functions, but traversing the nodes in order of dependency.
* @param edgeFunction Function to run for each FunctionEdge. * @param edgeFunction Function to run for each FunctionEdge.
@ -800,6 +804,25 @@ class QGraph extends Logging {
}) })
} }
private def updateStatus() {
val runners = runningJobs.map(_.runner)
for (mgr <- managers) {
if (mgr != null) {
val manager = mgr.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]]
val managerRunners = runners
.filter(runner => manager.runnerType.isAssignableFrom(runner.getClass))
.asInstanceOf[Set[JobRunner[QFunction]]]
if (managerRunners.size > 0)
try {
manager.updateStatus(managerRunners.toList)
} catch {
case e => /* ignore */
}
}
}
}
/** /**
* Returns true if the graph was shutdown instead of exiting on its own. * Returns true if the graph was shutdown instead of exiting on its own.
*/ */
@ -813,26 +836,29 @@ class QGraph extends Logging {
running = false running = false
// Wait for the thread to finish and exit normally. // Wait for the thread to finish and exit normally.
runningLock.synchronized { runningLock.synchronized {
val runners = getRunningJobs.map(_.runner) val runners = runningJobs.map(_.runner)
val manager = commandLineManager.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]] runningJobs = Set.empty[FunctionEdge]
if (manager != null) { for (mgr <- managers) {
val managerRunners = runners if (mgr != null) {
.filter(runner => manager.runnerType.isAssignableFrom(runner.getClass)) val manager = mgr.asInstanceOf[JobManager[QFunction,JobRunner[QFunction]]]
.asInstanceOf[List[JobRunner[QFunction]]] val managerRunners = runners
if (managerRunners.size > 0) .filter(runner => manager.runnerType.isAssignableFrom(runner.getClass))
try { .asInstanceOf[Set[JobRunner[QFunction]]]
manager.tryStop(managerRunners) if (managerRunners.size > 0)
} catch { try {
case e => /* ignore */ manager.tryStop(managerRunners.toList)
} catch {
case e => /* ignore */
}
for (runner <- managerRunners) {
try {
runner.removeTemporaryFiles()
} catch {
case e => /* ignore */
}
} }
}
runners.foreach(runner =>
try {
runner.removeTemporaryFiles()
} catch {
case e => /* ignore */
} }
) }
} }
} }
} }

View File

@ -35,12 +35,8 @@ class ContigScatterFunction extends GATKScatterFunction with InProcessFunction {
// Include unmapped reads by default. // Include unmapped reads by default.
this.includeUnmapped = true this.includeUnmapped = true
protected override def maxIntervals = { protected override def maxIntervals =
if (this.intervalFilesExist) IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, true)
IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, true)
else
this.scatterCount
}
def run() { def run() {
IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, true) IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, true)

View File

@ -25,23 +25,24 @@
package org.broadinstitute.sting.queue.extensions.gatk package org.broadinstitute.sting.queue.extensions.gatk
import java.io.File import java.io.File
import org.broadinstitute.sting.queue.function.scattergather.CloneFunction
import org.broadinstitute.sting.queue.function.InProcessFunction import org.broadinstitute.sting.queue.function.InProcessFunction
import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction}
import org.broadinstitute.sting.commandline.Output
import util.Random
/** /**
* An scatter function that uses the Distributed GATK. * An scatter function that uses the Distributed GATK.
*/ */
class DistributedScatterFunction extends GATKScatterFunction with InProcessFunction { class DistributedScatterFunction extends ScatterFunction with InProcessFunction {
private final val processingTracker = "processingTracker" @Output(doc="processingTracker")
var processingTracker: File = _
this.scatterOutputFiles = List(new File(processingTracker)) override def init() {
this.processingTracker = new File(this.commandDirectory, "processingTracker.%8d".format(Random.nextInt(100000000)))
override def initCloneInputs(cloneFunction: CloneFunction, index: Int) {
cloneFunction.setFieldValue("processingTracker", new File(this.commandDirectory, this.processingTracker))
} }
override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) { override def initCloneInputs(cloneFunction: CloneFunction, index: Int) {
/* no further work needed after init. */ cloneFunction.setFieldValue("processingTracker", this.processingTracker)
} }
def run() { def run() {

View File

@ -28,64 +28,53 @@ import org.broadinstitute.sting.utils.interval.IntervalUtils
import java.io.File import java.io.File
import collection.JavaConversions._ import collection.JavaConversions._
import org.broadinstitute.sting.queue.util.IOUtils import org.broadinstitute.sting.queue.util.IOUtils
import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterGatherableFunction, ScatterFunction} import org.broadinstitute.sting.queue.function.scattergather.{CloneFunction, ScatterFunction}
import org.broadinstitute.sting.commandline.Output import org.broadinstitute.sting.commandline.Output
trait GATKScatterFunction extends ScatterFunction { trait GATKScatterFunction extends ScatterFunction {
/** The total number of clone jobs that will be created. */
var scatterCount: Int = _
/** The reference sequence for the GATK function. */
protected var referenceSequence: File = _
/** The runtime field to set for specifying an interval file. */ /** The runtime field to set for specifying an interval file. */
private final val intervalsField = "intervals" private final val intervalsField = "intervals"
/** The runtime field to set for specifying an interval string. */ /** The runtime field to set for specifying an interval string. */
private final val intervalsStringField = "intervalsString" private final val intervalsStringField = "intervalsString"
@Output(doc="Scatter function outputs")
var scatterOutputFiles: List[File] = Nil
/** The original GATK function. */
protected var originalGATK: CommandLineGATK = _
/** The reference sequence for the GATK function. */
protected var referenceSequence: File = _
/** The list of interval files ("/path/to/interval.list") or interval strings ("chr1", "chr2") to parse into smaller parts. */ /** The list of interval files ("/path/to/interval.list") or interval strings ("chr1", "chr2") to parse into smaller parts. */
protected var intervals: List[String] = Nil protected var intervals: List[String] = Nil
/** Whether the last scatter job should also include any unmapped reads. */ /** Whether the last scatter job should also include any unmapped reads. */
protected var includeUnmapped: Boolean = _ protected var includeUnmapped: Boolean = _
@Output(doc="Scatter function outputs") /** The total number of clone jobs that will be created. */
var scatterOutputFiles: List[File] = Nil override def scatterCount = if (intervalFilesExist) super.scatterCount min this.maxIntervals else super.scatterCount
/** override def init() {
* Checks if the function is scatter gatherable. this.originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK]
* @param originalFunction Function to check. this.referenceSequence = this.originalGATK.reference_sequence
* @return true if the function is a GATK function with the reference sequence set. if (this.originalGATK.intervals.isEmpty && this.originalGATK.intervalsString.isEmpty) {
* @throws IllegalArgumentException if -BTI or -BTIMR are set. QScripts should not try to scatter gather with those option set.
*/
override def isScatterGatherable(originalFunction: ScatterGatherableFunction): Boolean = {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
if (gatk.BTI != null && gatk.BTIMR == null)
throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)")
gatk.reference_sequence != null
}
/**
* Sets the scatter gatherable function.
* @param originalFunction Function to bind.
*/
override def setScatterGatherable(originalFunction: ScatterGatherableFunction) = {
val gatk = originalFunction.asInstanceOf[CommandLineGATK]
this.referenceSequence = gatk.reference_sequence
if (gatk.intervals.isEmpty && gatk.intervalsString.isEmpty) {
this.intervals ++= IntervalUtils.distinctContigs(this.referenceSequence).toList this.intervals ++= IntervalUtils.distinctContigs(this.referenceSequence).toList
} else { } else {
this.intervals ++= gatk.intervals.map(_.toString) this.intervals ++= this.originalGATK.intervals.map(_.toString)
this.intervals ++= gatk.intervalsString.filterNot(interval => IntervalUtils.isUnmapped(interval)) this.intervals ++= this.originalGATK.intervalsString.filterNot(interval => IntervalUtils.isUnmapped(interval))
this.includeUnmapped = gatk.intervalsString.exists(interval => IntervalUtils.isUnmapped(interval)) this.includeUnmapped = this.originalGATK.intervalsString.exists(interval => IntervalUtils.isUnmapped(interval))
} }
this.scatterCount = originalFunction.scatterCount
this.scatterCount = this.scatterCount min this.maxIntervals
} }
def initCloneInputs(cloneFunction: CloneFunction, index: Int) = { override def isScatterGatherable = {
if (this.originalGATK.BTI != null && this.originalGATK.BTIMR.isEmpty)
throw new IllegalArgumentException("BTI requires BTIMR for use with scatter-gather (recommended: INTERSECTION)")
this.originalGATK.reference_sequence != null
}
override def initCloneInputs(cloneFunction: CloneFunction, index: Int) {
cloneFunction.setFieldValue(this.intervalsField, List(new File("scatter.intervals"))) cloneFunction.setFieldValue(this.intervalsField, List(new File("scatter.intervals")))
if (index == this.scatterCount && this.includeUnmapped) if (index == this.scatterCount && this.includeUnmapped)
cloneFunction.setFieldValue(this.intervalsStringField, List("unmapped")) cloneFunction.setFieldValue(this.intervalsStringField, List("unmapped"))
@ -93,7 +82,7 @@ trait GATKScatterFunction extends ScatterFunction {
cloneFunction.setFieldValue(this.intervalsStringField, List.empty[String]) cloneFunction.setFieldValue(this.intervalsStringField, List.empty[String])
} }
def bindCloneInputs(cloneFunction: CloneFunction, index: Int) = { override def bindCloneInputs(cloneFunction: CloneFunction, index: Int) {
val scatterPart = cloneFunction.getFieldValue(this.intervalsField) val scatterPart = cloneFunction.getFieldValue(this.intervalsField)
.asInstanceOf[List[File]] .asInstanceOf[List[File]]
.map(file => IOUtils.absolute(cloneFunction.commandDirectory, file)) .map(file => IOUtils.absolute(cloneFunction.commandDirectory, file))
@ -112,5 +101,5 @@ trait GATKScatterFunction extends ScatterFunction {
* Returns the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time. * Returns the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time.
* @return the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time. * @return the maximum number of intervals or this.scatterCount if the maximum can't be determined ahead of time.
*/ */
protected def maxIntervals = this.scatterCount protected def maxIntervals: Int
} }

View File

@ -32,12 +32,8 @@ import org.broadinstitute.sting.queue.function.InProcessFunction
* An interval scatter function. * An interval scatter function.
*/ */
class IntervalScatterFunction extends GATKScatterFunction with InProcessFunction { class IntervalScatterFunction extends GATKScatterFunction with InProcessFunction {
protected override def maxIntervals = { protected override def maxIntervals =
if (this.intervalFilesExist) IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, false)
IntervalUtils.countIntervalArguments(this.referenceSequence, this.intervals, false)
else
this.scatterCount
}
def run() { def run() {
IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, false) IntervalUtils.scatterIntervalArguments(this.referenceSequence, this.intervals, this.scatterOutputFiles, false)

View File

@ -1,17 +1,37 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.extensions.gatk package org.broadinstitute.sting.queue.extensions.gatk
import org.broadinstitute.sting.queue.function.scattergather.{ScatterGatherableFunction, GatherFunction} import org.broadinstitute.sting.queue.function.scattergather.GatherFunction
/** /**
* Merges a vcf text file. * Merges a vcf text file.
*/ */
class VcfGatherFunction extends CombineVariants with GatherFunction { class VcfGatherFunction extends CombineVariants with GatherFunction {
private var originalGATK: CommandLineGATK = _ private lazy val originalGATK = this.originalFunction.asInstanceOf[CommandLineGATK]
override def setScatterGatherable(originalFunction: ScatterGatherableFunction) {
this.originalGATK = originalFunction.asInstanceOf[CommandLineGATK]
}
override def freezeFieldValues = { override def freezeFieldValues = {
this.memoryLimit = Some(1) this.memoryLimit = Some(1)

View File

@ -1,8 +1,31 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.function.scattergather package org.broadinstitute.sting.queue.function.scattergather
import org.broadinstitute.sting.commandline.ArgumentSource import org.broadinstitute.sting.commandline.ArgumentSource
import java.io.File import java.io.File
import org.broadinstitute.sting.queue.QException
import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction} import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
/** /**
@ -10,7 +33,7 @@ import org.broadinstitute.sting.queue.function.{QFunction, CommandLineFunction}
*/ */
class CloneFunction extends CommandLineFunction { class CloneFunction extends CommandLineFunction {
var originalFunction: ScatterGatherableFunction = _ var originalFunction: ScatterGatherableFunction = _
var index: Int = _ var cloneIndex: Int = _
private var overriddenFields = Map.empty[ArgumentSource, Any] private var overriddenFields = Map.empty[ArgumentSource, Any]
private var withScatterPartCount = 0 private var withScatterPartCount = 0

View File

@ -10,20 +10,21 @@ import org.broadinstitute.sting.queue.util.IOUtils
* Base class for Gather command line functions. * Base class for Gather command line functions.
*/ */
trait GatherFunction extends QFunction { trait GatherFunction extends QFunction {
var originalFunction: ScatterGatherableFunction = _
@Output(doc="The original output of the scattered function")
var originalOutput: File = _
@Input(doc="Parts to gather back into the original output") @Input(doc="Parts to gather back into the original output")
var gatherParts: List[File] = Nil var gatherParts: List[File] = Nil
@Input(doc="Other log files that will be gathered before this output", required=false) @Input(doc="Other log files that will be gathered before this output", required=false)
var originalLogFiles: List[File] = Nil var originalLogFiles: List[File] = Nil
@Output(doc="The original output of the scattered function")
var originalOutput: File = _
/** /**
* Sets the original ScatterGatherableFunction to be gathered. * Called to initialize the gather function values after all other values have been setup for this function.
* @param originalFunction The original function to with inputs bind to this scatter function.
*/ */
def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} def init() {}
/** /**
* Waits for gather parts to propagate over NFS or throws an exception. * Waits for gather parts to propagate over NFS or throws an exception.

View File

@ -1,3 +1,27 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.function.scattergather package org.broadinstitute.sting.queue.function.scattergather
import java.io.File import java.io.File
@ -8,26 +32,27 @@ import org.broadinstitute.sting.queue.function.QFunction
* Base class for Scatter functions. * Base class for Scatter functions.
*/ */
trait ScatterFunction extends QFunction { trait ScatterFunction extends QFunction {
var originalFunction: ScatterGatherableFunction = _
@Input(doc="Original inputs to scatter") @Input(doc="Original inputs to scatter")
var originalInputs: Set[File] = _ var originalInputs: Set[File] = _
/** /**
* Returns true if the scatter function can scatter this original function. * Called to initialize scatter function values after all other values have been setup for this function.
* @param originalFunction The original function to check.
* @return true if the scatter function can scatter this original function.
*/
def isScatterGatherable(originalFunction: ScatterGatherableFunction) = true
/**
* Sets the original ScatterGatherableFunction to be scattered.
* @param originalFunction The original function to with inputs bind to this scatter function. * @param originalFunction The original function to with inputs bind to this scatter function.
*/ */
def setScatterGatherable(originalFunction: ScatterGatherableFunction) {} def init() {}
/** /**
* After a call to setScatterGatherable(), returns the number of clones that should be created. * Returns true if the scatter function can scatter this original function.
* @return true if the scatter function can scatter this original function.
*/ */
def scatterCount: Int def isScatterGatherable = true
/**
* Returns the number of clones that should be created.
*/
def scatterCount: Int = originalFunction.scatterCount
/** /**
* Initializes the input fields for the clone function. * Initializes the input fields for the clone function.
@ -36,7 +61,7 @@ trait ScatterFunction extends QFunction {
* @param cloneFunction CloneFunction to initialize. * @param cloneFunction CloneFunction to initialize.
* @param index The one based scatter index. * @param index The one based scatter index.
*/ */
def initCloneInputs(cloneFunction: CloneFunction, index: Int) def initCloneInputs(cloneFunction: CloneFunction, index: Int) {}
/** /**
* Binds the input fields for the clone function to this scatter function. * Binds the input fields for the clone function to this scatter function.
@ -45,5 +70,5 @@ trait ScatterFunction extends QFunction {
* @param cloneFunction CloneFunction to bind. * @param cloneFunction CloneFunction to bind.
* @param index The one based scatter index. * @param index The one based scatter index.
*/ */
def bindCloneInputs(cloneFunction: CloneFunction, index: Int) def bindCloneInputs(cloneFunction: CloneFunction, index: Int) {}
} }

View File

@ -1,3 +1,27 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.queue.function.scattergather package org.broadinstitute.sting.queue.function.scattergather
import java.io.File import java.io.File
@ -53,7 +77,40 @@ trait ScatterGatherableFunction extends CommandLineFunction {
* and that the scatter function can scatter this instance. * and that the scatter function can scatter this instance.
* @return true if the function is ready to be scatter / gathered. * @return true if the function is ready to be scatter / gathered.
*/ */
def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable(this) def scatterGatherable = this.scatterCount > 1 && scatterFunction.isScatterGatherable
/**
* Sets the scatter gather directory to the command directory if it is not already set.
*/
override def freezeFieldValues = {
super.freezeFieldValues
if (this.scatterGatherDirectory == null) {
if (qSettings.jobScatterGatherDirectory != null) {
this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory)
} else {
this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather")
}
}
}
/**
* The scatter function.
*/
private lazy val scatterFunction = {
val scatterFunction = newScatterFunction()
this.copySettingsTo(scatterFunction)
scatterFunction.originalFunction = this
scatterFunction.originalInputs = this.inputs
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter")
scatterFunction.isIntermediate = true
scatterFunction.addOrder = this.addOrder :+ 1
initScatterFunction(scatterFunction)
scatterFunction.absoluteCommandDirectory()
scatterFunction.init()
scatterFunction
}
/** /**
* Returns a list of scatter / gather and clones of this function * Returns a list of scatter / gather and clones of this function
@ -70,14 +127,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_)) val outputFieldsWithValues = this.outputFields.filter(hasFieldValue(_))
// Create the scatter function based on @Scatter // Create the scatter function based on @Scatter
this.copySettingsTo(scatterFunction)
scatterFunction.addOrder = this.addOrder :+ 1
scatterFunction.commandDirectory = this.scatterGatherTempDir("scatter")
scatterFunction.originalInputs = this.inputs
scatterFunction.isIntermediate = true
scatterFunction.setScatterGatherable(this)
initScatterFunction(scatterFunction)
scatterFunction.absoluteCommandDirectory()
functions :+= scatterFunction functions :+= scatterFunction
// Ask the scatter function how many clones to create. // Ask the scatter function how many clones to create.
@ -93,20 +142,13 @@ trait ScatterGatherableFunction extends CommandLineFunction {
var gatherOutputs = Map.empty[ArgumentSource, File] var gatherOutputs = Map.empty[ArgumentSource, File]
var gatherAddOrder = numClones + 2 var gatherAddOrder = numClones + 2
for (gatherField <- outputFieldsWithValues) { for (gatherField <- outputFieldsWithValues) {
val gatherFunction = this.newGatherFunction(gatherField)
val gatherOutput = getFieldFile(gatherField) val gatherOutput = getFieldFile(gatherField)
this.copySettingsTo(gatherFunction)
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
gatherFunction.originalOutput = gatherOutput
gatherFunction.setScatterGatherable(this)
initGatherFunction(gatherFunction, gatherField)
gatherFunction.absoluteCommandDirectory()
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput
gatherAddOrder += 1
val gatherFunction = this.newGatherFunction(gatherField)
this.copySettingsTo(gatherFunction)
gatherFunction.originalFunction = this
gatherFunction.originalOutput = gatherOutput
gatherFunction.commandDirectory = this.scatterGatherTempDir("gather-" + gatherField.field.getName)
// If this is a gather for a log file, make the gather intermediate just in case the log file name changes // If this is a gather for a log file, make the gather intermediate just in case the log file name changes
// Otherwise have the regular output function wait on the log files to gather // Otherwise have the regular output function wait on the log files to gather
if (isLogFile(gatherOutput)) { if (isLogFile(gatherOutput)) {
@ -117,6 +159,17 @@ trait ScatterGatherableFunction extends CommandLineFunction {
} else { } else {
gatherFunction.originalLogFiles = logFiles gatherFunction.originalLogFiles = logFiles
} }
gatherFunction.addOrder = this.addOrder :+ gatherAddOrder
initGatherFunction(gatherFunction, gatherField)
gatherFunction.absoluteCommandDirectory()
gatherFunction.init()
functions :+= gatherFunction
gatherFunctions += gatherField -> gatherFunction
gatherOutputs += gatherField -> gatherOutput
gatherAddOrder += 1
} }
// Create the clone functions for running the parallel jobs // Create the clone functions for running the parallel jobs
@ -126,12 +179,12 @@ trait ScatterGatherableFunction extends CommandLineFunction {
this.copySettingsTo(cloneFunction) this.copySettingsTo(cloneFunction)
cloneFunction.originalFunction = this cloneFunction.originalFunction = this
cloneFunction.index = i cloneFunction.cloneIndex = i
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)
cloneFunction.addOrder = this.addOrder :+ (i+1) cloneFunction.addOrder = this.addOrder :+ (i+1)
cloneFunction.isIntermediate = true cloneFunction.isIntermediate = true
// Setup the fields on the clone function, outputting each as a relative file in the sg directory. // Setup the fields on the clone function, outputting each as a relative file in the sg directory.
cloneFunction.commandDirectory = this.scatterGatherTempDir("temp-"+i)
scatterFunction.initCloneInputs(cloneFunction, i) scatterFunction.initCloneInputs(cloneFunction, i)
for (gatherField <- outputFieldsWithValues) { for (gatherField <- outputFieldsWithValues) {
val gatherPart = new File(gatherOutputs(gatherField).getName) val gatherPart = new File(gatherOutputs(gatherField).getName)
@ -160,21 +213,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
functions functions
} }
/**
* Sets the scatter gather directory to the command directory if it is not already set.
*/
override def freezeFieldValues = {
super.freezeFieldValues
if (this.scatterGatherDirectory == null) {
if (qSettings.jobScatterGatherDirectory != null) {
this.scatterGatherDirectory = IOUtils.absolute(qSettings.jobScatterGatherDirectory)
} else {
this.scatterGatherDirectory = IOUtils.absolute(this.commandDirectory, "queueScatterGather")
}
}
}
/** /**
* Creates a new ScatterFunction. * Creates a new ScatterFunction.
* @return A ScatterFunction instantiated scatterClass * @return A ScatterFunction instantiated scatterClass
@ -242,11 +280,6 @@ trait ScatterGatherableFunction extends CommandLineFunction {
this.setupCloneFunction(cloneFunction, index) this.setupCloneFunction(cloneFunction, index)
} }
/**
* The scatter function.
*/
private lazy val scatterFunction = newScatterFunction()
/** /**
* Returns a temporary directory under this scatter gather directory. * Returns a temporary directory under this scatter gather directory.
* @param Sub directory under the scatter gather directory. * @param Sub directory under the scatter gather directory.