2010-06-15 12:43:46 +08:00
package org.broadinstitute.sting.queue.function
2010-08-10 00:42:48 +08:00
import java.io.File
2010-10-07 02:29:56 +08:00
import java.lang.annotation.Annotation
import org.broadinstitute.sting.commandline._
import org.broadinstitute.sting.queue. { QException , QSettings }
import collection.JavaConversions._
2010-10-16 01:01:36 +08:00
import org.broadinstitute.sting.queue.function.scattergather. { Gather , SimpleTextGatherFunction }
2010-10-21 14:37:28 +08:00
import org.broadinstitute.sting.queue.util. { Logging , CollectionUtils , IOUtils , ReflectionUtils }
2010-08-10 00:42:48 +08:00
2010-06-23 02:39:20 +08:00
/* *
* The base interface for all functions in Queue .
2010-06-26 04:51:13 +08:00
* Inputs and outputs are specified as Sets of values .
2010-06-23 02:39:20 +08:00
* Inputs are matched to other outputs by using . equals ( )
*/
2010-10-21 14:37:28 +08:00
trait QFunction extends Logging {
2010-10-07 02:29:56 +08:00
/* *
* Analysis function name
*/
var analysisName : String = _
2010-10-16 01:01:36 +08:00
/* * Prefix for automatic job name creation */
var jobNamePrefix : String = _
/* * The name name of the job */
var jobName : String = _
2010-10-07 02:29:56 +08:00
/* * Default settings */
var qSettings : QSettings = _
/* * Directory to run the command in. */
2010-11-13 04:14:28 +08:00
var commandDirectory : File = new File ( "." )
2010-10-07 02:29:56 +08:00
2010-10-16 01:01:36 +08:00
/* * Temporary directory to write any files */
2010-11-13 04:14:28 +08:00
var jobTempDir : File = null
2010-10-16 01:01:36 +08:00
2010-10-16 04:00:35 +08:00
/* * Order the function was added to the graph. */
var addOrder : List [ Int ] = Nil
2010-10-16 01:01:36 +08:00
/* * File to redirect any output. Defaults to <jobName>.out */
@Output ( doc = "File to redirect any output" , required = false )
@Gather ( classOf [ SimpleTextGatherFunction ] )
var jobOutputFile : File = _
/* * File to redirect any errors. Defaults to <jobName>.out */
@Output ( doc = "File to redirect any errors" , required = false )
@Gather ( classOf [ SimpleTextGatherFunction ] )
var jobErrorFile : File = _
2010-10-07 02:29:56 +08:00
/* *
* Description of this command line function .
*/
def description : String
/* *
* The function description in . dot files
*/
2010-10-16 01:01:36 +08:00
def dotString = jobName + " => " + description
2010-10-07 02:29:56 +08:00
2010-10-08 14:39:55 +08:00
/* *
* Returns true if the function is done , false if it 's
* not done and None if the done status is unknown .
*/
def isDone = {
val files = doneOutputs
if ( files . size == 0 )
None
else
Some ( files . forall ( _ . exists ) )
}
/* *
* Returns true if the function has failed , false if it
* has not failed and None if the fail status is unknown .
*/
def isFail = {
val files = failOutputs
if ( files . size == 0 )
None
else
Some ( files . exists ( _ . exists ) )
}
2010-10-07 09:19:18 +08:00
/* *
* Returns true if the file should be used for status output .
* @return true if the file should be used for status output .
*/
2010-11-19 04:22:01 +08:00
def useStatusOutput ( file : File ) = ! isLogFile ( file )
/* *
* Returns true if the file is a log file for this function .
*/
protected def isLogFile ( file : File ) =
file == jobOutputFile || file == jobErrorFile
2010-10-07 02:29:56 +08:00
/* *
* Returns the output files for this function .
* @return Set [ File ] outputs for this function .
*/
private def statusPaths = outputs
. filter ( file => useStatusOutput ( file ) )
. map ( file => file . getParentFile + "/." + file . getName )
/* *
* Returns the output files for this function .
* @return Set [ File ] outputs for this function .
*/
def doneOutputs = statusPaths . map ( path => new File ( path + ".done" ) )
/* *
* Returns the output files for this function .
* @return Set [ File ] outputs for this function .
*/
def failOutputs = statusPaths . map ( path => new File ( path + ".fail" ) )
/* * The complete list of fields on this CommandLineFunction. */
2010-10-14 06:40:02 +08:00
def functionFields = QFunction . classFields ( this . functionFieldClass ) . functionFields
2010-10-07 02:29:56 +08:00
/* * The @Input fields on this CommandLineFunction. */
2010-10-14 06:40:02 +08:00
def inputFields = QFunction . classFields ( this . functionFieldClass ) . inputFields
2010-10-07 02:29:56 +08:00
/* * The @Output fields on this CommandLineFunction. */
2010-10-14 06:40:02 +08:00
def outputFields = QFunction . classFields ( this . functionFieldClass ) . outputFields
2010-10-07 02:29:56 +08:00
/* * The @Argument fields on this CommandLineFunction. */
2010-10-14 06:40:02 +08:00
def argumentFields = QFunction . classFields ( this . functionFieldClass ) . argumentFields
2010-10-19 06:11:14 +08:00
/* *
* If true , unless another unfinished function is dependent on this function ,
* this function will NOT be run even if the outputs have not been created .
*/
var isIntermediate = false
2010-10-14 06:22:01 +08:00
2010-10-07 09:19:18 +08:00
/* *
2010-10-14 06:40:02 +08:00
* Returns the class that should be used for looking up fields .
2010-10-07 09:19:18 +08:00
*/
2010-10-14 06:40:02 +08:00
protected def functionFieldClass = this . getClass
2010-10-07 09:19:18 +08:00
2010-10-07 02:29:56 +08:00
/* *
* Returns the input files for this function .
* @return Set [ File ] inputs for this function .
*/
def inputs = getFieldFiles ( inputFields )
/* *
* Returns the output files for this function .
* @return Set [ File ] outputs for this function .
*/
def outputs = getFieldFiles ( outputFields )
2010-10-16 01:01:36 +08:00
/* *
* Returns the set of directories where files may be written .
*/
def outputDirectories = {
var dirs = Set . empty [ File ]
dirs += commandDirectory
if ( jobTempDir != null )
dirs += jobTempDir
dirs ++= outputs . map ( _ . getParentFile )
dirs
}
2010-10-21 14:37:28 +08:00
/* *
* Deletes the log files for this function .
*/
def deleteLogs ( ) = {
2010-11-11 05:00:58 +08:00
IOUtils . tryDelete ( jobOutputFile )
2010-10-21 14:37:28 +08:00
if ( jobErrorFile != null )
2010-11-11 05:00:58 +08:00
IOUtils . tryDelete ( jobErrorFile )
2010-10-21 14:37:28 +08:00
}
2010-10-21 05:43:52 +08:00
/* *
* Deletes the output files and all the status files for this function .
*/
def deleteOutputs ( ) = {
2010-11-19 04:22:01 +08:00
outputs . filterNot ( file => isLogFile ( file ) ) . foreach ( file => IOUtils . tryDelete ( file ) )
2010-11-11 05:00:58 +08:00
doneOutputs . foreach ( file => IOUtils . tryDelete ( file ) )
failOutputs . foreach ( file => IOUtils . tryDelete ( file ) )
2010-10-21 05:43:52 +08:00
}
2010-10-16 01:01:36 +08:00
/* *
* Creates the output directories for this function if it doesn 't exist .
*/
def mkOutputDirectories ( ) = {
outputDirectories . foreach ( dir => {
if ( ! dir . exists && ! dir . mkdirs )
throw new QException ( "Unable to create directory: " + dir )
} )
}
2010-10-07 02:29:56 +08:00
/* *
* Returns fields that do not have values which are required .
* @return List [ String ] names of fields missing values .
*/
def missingFields : List [ String ] = {
val missingInputs = missingFields ( inputFields , classOf [ Input ] )
val missingOutputs = missingFields ( outputFields , classOf [ Output ] )
val missingArguments = missingFields ( argumentFields , classOf [ Argument ] )
( missingInputs | missingOutputs | missingArguments ) . toList . sorted
}
/* *
* Returns fields that do not have values which are required .
* @param sources Fields to check .
* @param annotation Annotation .
* @return Set [ String ] names of fields missing values .
*/
private def missingFields ( sources : List [ ArgumentSource ] , annotation : Class [ _ <: Annotation ] ) : Set [ String ] = {
var missing = Set . empty [ String ]
for ( source <- sources ) {
if ( isRequired ( source , annotation ) )
if ( ! hasFieldValue ( source ) )
if ( ! exclusiveOf ( source , annotation ) . exists ( otherSource => hasFieldValue ( otherSource ) ) )
missing += "@%s: %s - %s" . format ( annotation . getSimpleName , source . field . getName , doc ( source , annotation ) )
}
missing
}
/* *
* Gets the files from the fields . The fields must be a File , a FileExtension , or a List or Set of either .
* @param fields Fields to get files .
* @return Set [ File ] for the fields .
*/
private def getFieldFiles ( fields : List [ ArgumentSource ] ) : Set [ File ] = {
var files = Set . empty [ File ]
for ( field <- fields )
files ++= getFieldFiles ( field )
files
}
/* *
* Gets the files from the field . The field must be a File , a FileExtension , or a List or Set of either .
* @param fields Field to get files .
* @return Set [ File ] for the field .
*/
def getFieldFiles ( field : ArgumentSource ) : Set [ File ] = {
var files = Set . empty [ File ]
CollectionUtils . foreach ( getFieldValue ( field ) , ( fieldValue ) => {
val file = fieldValueToFile ( field , fieldValue )
if ( file != null )
files += file
} )
files
}
/* *
* Gets the file from the field . The field must be a File or a FileExtension and not a List or Set .
* @param field Field to get the file .
* @return File for the field .
*/
def getFieldFile ( field : ArgumentSource ) : File =
fieldValueToFile ( field , getFieldValue ( field ) )
/* *
* Converts the field value to a file . The field must be a File or a FileExtension .
* @param field Field to get the file .
* @param value Value of the File or FileExtension or null .
* @return Null if value is null , otherwise the File .
* @throws QException if the value is not a File or FileExtension .
*/
private def fieldValueToFile ( field : ArgumentSource , value : Any ) : File = value match {
case file : File => file
case null => null
case unknown => throw new QException ( "Non-file found. Try removing the annotation, change the annotation to @Argument, or extend File with FileExtension: %s: %s" . format ( field . field , unknown ) )
}
2010-06-23 02:39:20 +08:00
/* *
* After a function is frozen no more updates are allowed by the user .
* The function is allow to make necessary updates internally to make sure
* the inputs and outputs will be equal to other inputs and outputs .
*/
2010-10-07 02:29:56 +08:00
final def freeze = {
freezeFieldValues
canonFieldValues
}
2010-10-16 01:01:36 +08:00
/* *
* Sets all field values .
*/
2010-10-07 02:29:56 +08:00
def freezeFieldValues = {
2010-10-16 01:01:36 +08:00
if ( jobNamePrefix == null )
jobNamePrefix = qSettings . jobNamePrefix
if ( jobName == null )
jobName = QFunction . nextJobName ( jobNamePrefix )
if ( jobOutputFile == null )
jobOutputFile = new File ( jobName + ".out" )
2010-11-13 04:14:28 +08:00
if ( jobTempDir == null )
jobTempDir = qSettings . tempDirectory
// If the command directory is relative, insert the run directory ahead of it.
2010-11-16 01:59:39 +08:00
commandDirectory = IOUtils . absolute ( new File ( qSettings . runDirectory , commandDirectory . getPath ) )
2010-10-07 02:29:56 +08:00
}
2010-06-23 02:39:20 +08:00
/* *
2010-10-07 02:29:56 +08:00
* Makes all field values canonical so that the graph can match the
* inputs of one function to the output of another using equals ( ) .
2010-06-23 02:39:20 +08:00
*/
2010-10-07 02:29:56 +08:00
def canonFieldValues = {
for ( field <- this . functionFields ) {
var fieldValue = this . getFieldValue ( field )
fieldValue = CollectionUtils . updated ( fieldValue , canon ) . asInstanceOf [ AnyRef ]
this . setFieldValue ( field , fieldValue )
}
}
2010-06-23 02:39:20 +08:00
/* *
2010-10-07 02:29:56 +08:00
* Set value to a uniform value across functions .
* Base implementation changes any relative path to an absolute path .
* @param value to be updated
* @return the modified value , or a copy if the value is immutable
2010-06-23 02:39:20 +08:00
*/
2010-10-07 02:29:56 +08:00
protected def canon ( value : Any ) = {
value match {
case fileExtension : FileExtension =>
val newFile = absolute ( fileExtension ) ;
val newFileExtension = fileExtension . withPath ( newFile . getPath )
newFileExtension
case file : File =>
if ( file . getClass != classOf [ File ] )
throw new QException ( "Extensions of file must also extend with FileExtension so that the path can be modified." ) ;
absolute ( file )
case x => x
}
}
2010-07-17 04:54:51 +08:00
2010-08-10 00:42:48 +08:00
/* *
2010-11-13 04:14:28 +08:00
* Returns the absolute path to the file relative to the run directory and the job command directory .
2010-10-07 02:29:56 +08:00
* @param file File to root relative to the command directory if it is not already absolute .
* @return The absolute path to file .
2010-08-10 00:42:48 +08:00
*/
2010-11-13 04:14:28 +08:00
private def absolute ( file : File ) = IOUtils . absolute ( commandDirectory , file )
2010-10-07 02:29:56 +08:00
/* *
* Scala sugar type for checking annotation required and exclusiveOf .
*/
private type ArgumentAnnotation = {
def required ( ) : Boolean
def exclusiveOf ( ) : String
def doc ( ) : String
}
/* *
* Returns the isRequired value from the field .
* @param field Field to check .
* @param annotation Annotation .
* @return the isRequired value from the field annotation .
*/
private def isRequired ( field : ArgumentSource , annotation : Class [ _ <: Annotation ] ) =
ReflectionUtils . getAnnotation ( field . field , annotation ) . asInstanceOf [ ArgumentAnnotation ] . required
/* *
* Returns an array of ArgumentSources from functionFields listed in the exclusiveOf of the original field
* @param field Field to check .
* @param annotation Annotation .
* @return the Array [ ArgumentSource ] that may be set instead of the field .
*/
private def exclusiveOf ( field : ArgumentSource , annotation : Class [ _ <: Annotation ] ) =
ReflectionUtils . getAnnotation ( field . field , annotation ) . asInstanceOf [ ArgumentAnnotation ] . exclusiveOf
. split ( "," ) . map ( _ . trim ) . filter ( _ . length > 0 )
. map ( fieldName => functionFields . find ( fieldName == _ . field . getName ) match {
case Some ( x ) => x
case None => throw new QException ( "Unable to find exclusion field %s on %s" . format ( fieldName , this . getClass . getSimpleName ) )
} )
/* *
* Returns the doc value from the field .
* @param field Field to check .
* @param annotation Annotation .
* @return the doc value from the field annotation .
*/
private def doc ( field : ArgumentSource , annotation : Class [ _ <: Annotation ] ) =
ReflectionUtils . getAnnotation ( field . field , annotation ) . asInstanceOf [ ArgumentAnnotation ] . doc
/* *
* Returns true if the field has a value .
* @param source Field to check for a value .
* @return true if the field has a value .
*/
protected def hasFieldValue ( source : ArgumentSource ) = this . hasValue ( this . getFieldValue ( source ) )
/* *
* Returns false if the value is null or an empty collection .
* @param value Value to test for null , or a collection to test if it is empty .
* @return false if the value is null , or false if the collection is empty , otherwise true .
*/
protected def hasValue ( param : Any ) = CollectionUtils . isNotNullOrNotEmpty ( param )
/* *
* Gets the value of a field .
* @param source Field to get the value for .
* @return value of the field .
*/
def getFieldValue ( source : ArgumentSource ) = ReflectionUtils . getValue ( invokeObj ( source ) , source . field )
/* *
* Gets the value of a field .
* @param source Field to set the value for .
* @return value of the field .
*/
def setFieldValue ( source : ArgumentSource , value : Any ) = ReflectionUtils . setValue ( invokeObj ( source ) , source . field , value )
/* *
* Walks gets the fields in this object or any collections in that object
* recursively to find the object holding the field to be retrieved or set .
* @param source Field find the invoke object for .
* @return Object to invoke the field on .
*/
private def invokeObj ( source : ArgumentSource ) = source . parentFields . foldLeft [ AnyRef ] ( this ) ( ReflectionUtils . getValue ( _ , _ ) )
2010-06-15 12:43:46 +08:00
}
2010-10-14 06:22:01 +08:00
object QFunction {
2010-10-16 01:01:36 +08:00
/* * Job index counter for this run of Queue. */
private var jobIndex = 0
2010-10-28 03:44:55 +08:00
var parsingEngine : ParsingEngine = _
2010-10-16 01:01:36 +08:00
/* *
* Returns the next job name using the prefix .
* @param prefix Prefix of the job name .
* @return the next job name .
*/
private def nextJobName ( prefix : String ) = {
jobIndex += 1
prefix + "-" + jobIndex
}
2010-10-14 06:22:01 +08:00
/* *
* The list of fields defined on a class
* @param clazz The class to lookup fields .
*/
private class ClassFields ( clazz : Class [ _ ] ) {
/* * The complete list of fields on this CommandLineFunction. */
2010-10-28 03:44:55 +08:00
val functionFields : List [ ArgumentSource ] = parsingEngine . extractArgumentSources ( clazz ) . toList
2010-10-14 06:22:01 +08:00
/* * The @Input fields on this CommandLineFunction. */
val inputFields = functionFields . filter ( source => ReflectionUtils . hasAnnotation ( source . field , classOf [ Input ] ) )
/* * The @Output fields on this CommandLineFunction. */
val outputFields = functionFields . filter ( source => ReflectionUtils . hasAnnotation ( source . field , classOf [ Output ] ) )
/* * The @Argument fields on this CommandLineFunction. */
val argumentFields = functionFields . filter ( source => ReflectionUtils . hasAnnotation ( source . field , classOf [ Argument ] ) )
}
/* *
* The mapping from class to fields .
*/
private var classFieldsMap = Map . empty [ Class [ _ ] , ClassFields ]
2010-10-21 05:43:52 +08:00
/* *
* Returns the field on clazz .
* @param clazz Class to search .
* @param name Name of the field to return .
* @return Argument source for the field .
*/
def findField ( clazz : Class [ _ ] , name : String ) = {
classFields ( clazz ) . functionFields . find ( _ . field . getName == name ) match {
case Some ( source ) => source
case None => throw new QException ( "Could not find a field on class %s with name %s" . format ( clazz , name ) )
}
}
2010-10-14 06:22:01 +08:00
/* *
* Returns the fields for a class .
* @param clazz Class to retrieve fields for .
* @return the fields for the class .
*/
private def classFields ( clazz : Class [ _ ] ) = {
classFieldsMap . get ( clazz ) match {
case Some ( classFields ) => classFields
case None =>
val classFields = new ClassFields ( clazz )
classFieldsMap += clazz -> classFields
classFields
}
}
}