Added another utility that can convert to RemoteFiles.

QScripts will now generate remote versions of files if the caller has not already passed in remote versions (or the QScript replaces the passed in remote references... not good)
Instead of having yet another plugin, combined QStatusMessenger and RemoteFileConverter under general QCommandPlugin trait.
This commit is contained in:
kshakir 2012-10-17 19:39:03 -04:00
parent 32ee2c7dff
commit 55ac4ba70b
5 changed files with 100 additions and 19 deletions

View File

@ -24,7 +24,6 @@
package org.broadinstitute.sting.queue
import function.QFunction
import java.io.File
import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue.util._
@ -96,18 +95,18 @@ class QCommandLine extends CommandLineProgram with Logging {
new PluginManager[QScript](classOf[QScript], Seq(qScriptClasses.toURI.toURL))
}
private lazy val qStatusMessengerPluginManager = {
new PluginManager[QStatusMessenger](classOf[QStatusMessenger])
private lazy val qCommandPlugin = {
new PluginManager[QCommandPlugin](classOf[QCommandPlugin])
}
ClassFieldCache.parsingEngine = new ParsingEngine(this)
/**
* Takes the QScripts passed in, runs their script() methods, retrieves their generated
* functions, and then builds and runs a QGraph based on the dependencies.
*/
def execute = {
val allStatusMessengers = qStatusMessengerPluginManager.createAllTypes()
ClassFieldCache.parsingEngine = this.parser
val allCommandPlugins = qCommandPlugin.createAllTypes()
if (settings.qSettings.runName == null)
settings.qSettings.runName = FilenameUtils.removeExtension(scripts.head.getName)
@ -115,14 +114,24 @@ class QCommandLine extends CommandLineProgram with Logging {
settings.qSettings.tempDirectory = IOUtils.absolute(settings.qSettings.runDirectory, ".queue/tmp")
qGraph.initializeWithSettings(settings)
for (statusMessenger <- allStatusMessengers) {
loadArgumentsIntoObject(statusMessenger)
for (commandPlugin <- allCommandPlugins) {
loadArgumentsIntoObject(commandPlugin)
}
for (statusMessenger <- allStatusMessengers) {
statusMessenger.started()
for (commandPlugin <- allCommandPlugins) {
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.started()
}
// TODO: Default command plugin argument?
val remoteFileConverter = (
for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null))
yield commandPlugin.remoteFileConverter
).headOption.getOrElse(null)
if (remoteFileConverter != null)
loadArgumentsIntoObject(remoteFileConverter)
val allQScripts = qScriptPluginManager.createAllTypes()
for (script <- allQScripts) {
logger.info("Scripting " + qScriptPluginManager.getName(script.getClass.asSubclass(classOf[QScript])))
@ -137,10 +146,15 @@ class QCommandLine extends CommandLineProgram with Logging {
case e: Exception =>
throw new UserException.CannotExecuteQScript(script.getClass.getSimpleName + ".script() threw the following exception: " + e, e)
}
if (remoteFileConverter != null) {
if (remoteFileConverter.convertToRemoteEnabled)
script.mkRemoteOutputs(remoteFileConverter)
}
script.functions.foreach(qGraph.add(_))
logger.info("Added " + script.functions.size + " functions")
}
// Execute the job graph
qGraph.run()
@ -162,14 +176,16 @@ class QCommandLine extends CommandLineProgram with Logging {
if (!success) {
logger.info("Done with errors")
qGraph.logFailed()
for (statusMessenger <- allStatusMessengers)
statusMessenger.exit("Done with errors")
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.exit("Done with errors")
1
} else {
if (settings.run) {
allQScripts.foreach(_.pushOutputs())
for (statusMessenger <- allStatusMessengers)
statusMessenger.done(allQScripts.map(_.remoteOutputs))
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.done(allQScripts.map(_.remoteOutputs))
}
0
}
@ -189,7 +205,7 @@ class QCommandLine extends CommandLineProgram with Logging {
override def getArgumentSources = {
var plugins = Seq.empty[Class[_]]
plugins ++= qScriptPluginManager.getPlugins
plugins ++= qStatusMessengerPluginManager.getPlugins
plugins ++= qCommandPlugin.getPlugins
plugins.toArray
}
@ -200,11 +216,10 @@ class QCommandLine extends CommandLineProgram with Logging {
override def getArgumentSourceName(source: Class[_]) = {
if (classOf[QScript].isAssignableFrom(source))
qScriptPluginManager.getName(source.asSubclass(classOf[QScript]))
else if (classOf[QStatusMessenger].isAssignableFrom(source))
qStatusMessengerPluginManager.getName(source.asSubclass(classOf[QStatusMessenger]))
else if (classOf[QCommandPlugin].isAssignableFrom(source))
qCommandPlugin.getName(source.asSubclass(classOf[QCommandPlugin]))
else
null
}
/**

View File

@ -0,0 +1,9 @@
package org.broadinstitute.sting.queue
import engine.QStatusMessenger
import util.RemoteFileConverter
trait QCommandPlugin {
def statusMessenger: QStatusMessenger = null
def remoteFileConverter: RemoteFileConverter = null
}

View File

@ -108,6 +108,24 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
functions.foreach( f => add(f) )
}
/**
* Convert all @Output files to remote output files.
* @param remoteFileConverter Converter for files to remote files.
*/
def mkRemoteOutputs(remoteFileConverter: RemoteFileConverter) {
for (field <- outputFields) {
val fieldFile = ClassFieldCache.getFieldFile(this, field)
if (fieldFile != null && !fieldFile.isInstanceOf[RemoteFile]) {
val fieldName = ClassFieldCache.fullName(field)
val remoteFile = remoteFileConverter.convertToRemote(fieldFile, fieldName)
ClassFieldCache.setFieldValue(this, field, remoteFile)
}
}
}
/**
* Pull all remote files to the local disk.
*/
def pullInputs() {
val inputs = ClassFieldCache.getFieldFiles(this, inputFields)
for (remoteFile <- filterRemoteFiles(inputs)) {
@ -116,6 +134,9 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
}
}
/**
* Push all remote files from the local disk.
*/
def pushOutputs() {
val outputs = ClassFieldCache.getFieldFiles(this, outputFields)
for (remoteFile <- filterRemoteFiles(outputs)) {
@ -124,6 +145,10 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
}
}
/**
* List out the remote outputs
* @return the RemoteFile outputs by argument source
*/
def remoteOutputs: Map[ArgumentSource, Seq[RemoteFile]] =
outputFields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap

View File

@ -180,4 +180,15 @@ object ClassFieldCache {
case unknown => throw new QException("Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s".format(field.field, unknown))
}
//
// other utilities
//
/**
* Retrieves the fullName of the argument
* @param field ArgumentSource to check
* @return Full name of the argument source
*/
def fullName(field: ArgumentSource) = field.createArgumentDefinitions().get(0).fullName
}

View File

@ -0,0 +1,21 @@
package org.broadinstitute.sting.queue.util
import java.io.File
trait RemoteFileConverter {
type RemoteFileType <: RemoteFile
/**
* If this remote file creator is capable of converting to a remote file.
* @return true if ready to convert
*/
def convertToRemoteEnabled: Boolean
/**
* Converts to a remote file
* @param file The original file
* @param name A "name" to use for the remote file
* @return The new version of this remote file.
*/
def convertToRemote(file: File, name: String): RemoteFileType
}