Removed default use of @Output syntax.

If compile completes for QScripts, sending runtime errors during execute.
This commit is contained in:
kshakir 2012-11-29 13:31:08 -05:00
parent cf56ca3bc9
commit a6c1fcd151
3 changed files with 97 additions and 112 deletions

View File

@ -110,95 +110,103 @@ class QCommandLine extends CommandLineProgram with Logging {
* functions, and then builds and runs a QGraph based on the dependencies.
*/
def execute = {
ClassFieldCache.parsingEngine = this.parser
var success = false
var result = 1
try {
ClassFieldCache.parsingEngine = this.parser
if (settings.qSettings.runName == null)
settings.qSettings.runName = FilenameUtils.removeExtension(scripts.head.getName)
if (IOUtils.isDefaultTempDir(settings.qSettings.tempDirectory))
settings.qSettings.tempDirectory = IOUtils.absolute(settings.qSettings.runDirectory, ".queue/tmp")
qGraph.initializeWithSettings(settings)
if (settings.qSettings.runName == null)
settings.qSettings.runName = FilenameUtils.removeExtension(scripts.head.getName)
if (IOUtils.isDefaultTempDir(settings.qSettings.tempDirectory))
settings.qSettings.tempDirectory = IOUtils.absolute(settings.qSettings.runDirectory, ".queue/tmp")
qGraph.initializeWithSettings(settings)
for (commandPlugin <- allCommandPlugins) {
loadArgumentsIntoObject(commandPlugin)
}
for (commandPlugin <- allCommandPlugins) {
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.started()
}
qGraph.messengers = allCommandPlugins.filter(_.statusMessenger != null).map(_.statusMessenger).toSeq
// TODO: Default command plugin argument?
val remoteFileConverter = (
for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null))
yield commandPlugin.remoteFileConverter
).headOption.getOrElse(null)
if (remoteFileConverter != null)
loadArgumentsIntoObject(remoteFileConverter)
val allQScripts = qScriptPluginManager.createAllTypes()
for (script <- allQScripts) {
logger.info("Scripting " + qScriptPluginManager.getName(script.getClass.asSubclass(classOf[QScript])))
loadArgumentsIntoObject(script)
allCommandPlugins.foreach(_.initScript(script))
// TODO: Pulling inputs can be time/io expensive! Some scripts are using the files to generate functions-- even for dry runs-- so pull it all down for now.
//if (settings.run)
script.pullInputs()
script.qSettings = settings.qSettings
try {
script.script()
} catch {
case e: Exception =>
throw new UserException.CannotExecuteQScript(script.getClass.getSimpleName + ".script() threw the following exception: " + e, e)
for (commandPlugin <- allCommandPlugins) {
loadArgumentsIntoObject(commandPlugin)
}
if (remoteFileConverter != null) {
if (remoteFileConverter.convertToRemoteEnabled)
script.mkRemoteOutputs(remoteFileConverter)
}
script.functions.foreach(qGraph.add(_))
logger.info("Added " + script.functions.size + " functions")
}
// Execute the job graph
qGraph.run()
val functionsAndStatus = qGraph.getFunctionsAndStatus
val success = qGraph.success
// walk over each script, calling onExecutionDone
for (script <- allQScripts) {
val scriptFunctions = functionsAndStatus.filterKeys(f => script.functions.contains(f))
script.onExecutionDone(scriptFunctions, success)
}
logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", functionsAndStatus.size))
// write the final complete job report
logger.info("Writing final jobs report...")
qGraph.writeJobsReport()
if (!success) {
logger.info("Done with errors")
qGraph.logFailed()
for (commandPlugin <- allCommandPlugins)
for (commandPlugin <- allCommandPlugins) {
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.exit("Done with errors: %s".format(qGraph.formattedStatusCounts))
1
} else {
if (settings.run) {
allQScripts.foreach(_.pushOutputs())
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null) {
val allInputs = allQScripts.map(_.remoteInputs)
val allOutputs = allQScripts.map(_.remoteOutputs)
commandPlugin.statusMessenger.done(allInputs, allOutputs)
}
commandPlugin.statusMessenger.started()
}
qGraph.messengers = allCommandPlugins.filter(_.statusMessenger != null).map(_.statusMessenger).toSeq
// TODO: Default command plugin argument?
val remoteFileConverter = (
for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null))
yield commandPlugin.remoteFileConverter
).headOption.getOrElse(null)
if (remoteFileConverter != null)
loadArgumentsIntoObject(remoteFileConverter)
val allQScripts = qScriptPluginManager.createAllTypes()
for (script <- allQScripts) {
logger.info("Scripting " + qScriptPluginManager.getName(script.getClass.asSubclass(classOf[QScript])))
loadArgumentsIntoObject(script)
allCommandPlugins.foreach(_.initScript(script))
// TODO: Pulling inputs can be time/io expensive! Some scripts are using the files to generate functions-- even for dry runs-- so pull it all down for now.
//if (settings.run)
script.pullInputs()
script.qSettings = settings.qSettings
try {
script.script()
} catch {
case e: Exception =>
throw new UserException.CannotExecuteQScript(script.getClass.getSimpleName + ".script() threw the following exception: " + e, e)
}
if (remoteFileConverter != null) {
if (remoteFileConverter.convertToRemoteEnabled)
script.mkRemoteOutputs(remoteFileConverter)
}
script.functions.foreach(qGraph.add(_))
logger.info("Added " + script.functions.size + " functions")
}
// Execute the job graph
qGraph.run()
val functionsAndStatus = qGraph.getFunctionsAndStatus
// walk over each script, calling onExecutionDone
for (script <- allQScripts) {
val scriptFunctions = functionsAndStatus.filterKeys(f => script.functions.contains(f))
script.onExecutionDone(scriptFunctions, success)
}
logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", functionsAndStatus.size))
// write the final complete job report
logger.info("Writing final jobs report...")
qGraph.writeJobsReport()
if (qGraph.success) {
if (settings.run) {
allQScripts.foreach(_.pushOutputs())
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null) {
val allInputs = allQScripts.map(_.remoteInputs)
val allOutputs = allQScripts.map(_.remoteOutputs)
commandPlugin.statusMessenger.done(allInputs, allOutputs)
}
}
success = true
result = 0
}
} finally {
if (!success) {
logger.info("Done with errors")
qGraph.logFailed()
if (settings.run) {
for (commandPlugin <- allCommandPlugins)
if (commandPlugin.statusMessenger != null)
commandPlugin.statusMessenger.exit("Done with errors: %s".format(qGraph.formattedStatusCounts))
}
}
0
}
result
}
/**

View File

@ -124,49 +124,26 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
}
/**
* Pull all remote files to the local disk.
* Pull all remote files to the local disk
*/
def pullInputs() {
val inputs = ClassFieldCache.getFieldFiles(this, inputFields)
for (remoteFile <- filterRemoteFiles(inputs)) {
logger.info("Pulling %s from %s".format(remoteFile.getAbsolutePath, remoteFile.remoteDescription))
remoteFile.pullToLocal()
}
}
/**
* Push all remote files from the local disk.
* Push all remote files from the local disk
*/
def pushOutputs() {
val outputs = ClassFieldCache.getFieldFiles(this, outputFields)
for (remoteFile <- filterRemoteFiles(outputs)) {
logger.info("Pushing %s to %s".format(remoteFile.getAbsolutePath, remoteFile.remoteDescription))
remoteFile.pushToRemote()
}
}
/**
* List out the remote outputs
* @return the RemoteFile outputs by argument source
* @return the inputs or null if there are no inputs
*/
def remoteInputs: Map[String, Seq[RemoteFile]] = tagMap(remoteFieldMap(inputFields))
def remoteInputs: AnyRef = null
/**
* List out the remote outputs
* @return the RemoteFile outputs by argument source
* @return the outputs or null if there are no outputs
*/
def remoteOutputs: Map[String, Seq[RemoteFile]] = tagMap(remoteFieldMap(outputFields))
private def tagMap(remoteFieldMap: Map[ArgumentSource, Seq[RemoteFile]]): Map[String, Seq[RemoteFile]] = {
remoteFieldMap.collect{ case (k, v) => ClassFieldCache.fullName(k) -> v }.toMap
}
private def remoteFieldMap(fields: Seq[ArgumentSource]): Map[ArgumentSource, Seq[RemoteFile]] = {
fields.map(field => (field -> filterRemoteFiles(ClassFieldCache.getFieldFiles(this, field)))).filter(tuple => !tuple._2.isEmpty).toMap
}
private def filterRemoteFiles(fields: Seq[File]): Seq[RemoteFile] =
fields.filter(field => field != null && field.isInstanceOf[RemoteFile]).map(_.asInstanceOf[RemoteFile])
def remoteOutputs: AnyRef = null
/** The complete list of fields. */
def functionFields: Seq[ArgumentSource] = ClassFieldCache.classFunctionFields(this.getClass)

View File

@ -7,7 +7,7 @@ import org.broadinstitute.sting.queue.util.RemoteFile
*/
trait QStatusMessenger {
def started()
def done(inputs: Seq[Map[String, Seq[RemoteFile]]], outputs: Seq[Map[String, Seq[RemoteFile]]])
def done(inputs: Seq[_], outputs: Seq[_])
def exit(message: String)
def started(job: String)