From a9ba9455958ba9dbdd3a85413aa5b7aa166c44dc Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 23 Aug 2011 10:09:51 -0400 Subject: [PATCH 01/31] onExecutionDone(jobs, successFlag) added to QScript. -- This function is called when the Qscript ends, so scripts can overload this function if they want to run some code after all of the jobs have completed --- .../org/broadinstitute/sting/queue/QCommandLine.scala | 10 ++++++++-- .../src/org/broadinstitute/sting/queue/QScript.scala | 8 ++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index f19d60930..14e4adbf8 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -95,7 +95,8 @@ class QCommandLine extends CommandLineProgram with Logging { def execute = { qGraph.settings = settings - for (script <- pluginManager.createAllTypes()) { + val allQScripts = pluginManager.createAllTypes(); + for (script <- allQScripts) { logger.info("Scripting " + pluginManager.getName(script.getClass.asSubclass(classOf[QScript]))) loadArgumentsIntoObject(script) try { @@ -108,14 +109,19 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } + // Execute the job graph qGraph.run() + // walk over each script, calling onExecutionDone + for (script <- allQScripts) { + script.onExecutionDone(script.functions, qGraph.success) + } + if (!qGraph.success) { logger.info("Done with errors") qGraph.logFailed() 1 } else { - logger.info("Done") 0 } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala index 5cb8d1d29..27f3e275b 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -57,6 +57,14 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon */ def script() + /** + * A default handler for the onExecutionDone() function. By default this doesn't do anything + * except print out a fine status message. + */ + def onExecutionDone(jobs: List[QFunction], success: Boolean) { + logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", jobs.size)) + } + /** * The command line functions that will be executed for this QScript. */ From 6d6feb55406d55f14165d6aa295849c4794452b6 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 23 Aug 2011 10:56:37 -0400 Subject: [PATCH 02/31] Better error message when you cannot determine a ROD type because the file doesn't exist or cannot be read --- .../sting/commandline/ArgumentTypeDescriptor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java b/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java index ff992d77d..16358d05f 100644 --- a/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java +++ b/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java @@ -379,10 +379,14 @@ class RodBindingArgumentTypeDescriptor extends ArgumentTypeDescriptor { } if ( tribbleType == null ) - throw new UserException.CommandLineException( - String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " + - "Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s", - manager.userFriendlyListOfAvailableFeatures(parameterType))); + if ( ! file.canRead() | !! file.isFile() ) { + throw new UserException.BadArgumentValue(name, "Couldn't read file to determine type: " + file); + } else { + throw new UserException.CommandLineException( + String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " + + "Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s", + manager.userFriendlyListOfAvailableFeatures(parameterType))); + } } } From 31ec6e316c5647ca61602da8fd0f1d851cce3e1e Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 23 Aug 2011 16:51:54 -0400 Subject: [PATCH 03/31] First implementation of JobRunInfo -- onExecutionDone(Map(QFunction, JobRunInfo)) is the new signature, so that you can walk over your jobs and inspect their success/failure and runtime characteristics --- .../sting/queue/QCommandLine.scala | 2 +- .../broadinstitute/sting/queue/QScript.scala | 4 +- .../sting/queue/engine/FunctionEdge.scala | 3 + .../sting/queue/engine/JobRunInfo.scala | 75 +++++++++++++++++++ .../sting/queue/engine/JobRunner.scala | 5 ++ .../sting/queue/engine/QGraph.scala | 13 ++++ 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 14e4adbf8..94a6408c6 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -114,7 +114,7 @@ class QCommandLine extends CommandLineProgram with Logging { // walk over each script, calling onExecutionDone for (script <- allQScripts) { - script.onExecutionDone(script.functions, qGraph.success) + script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala index 27f3e275b..3120d5d62 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -24,6 +24,7 @@ package org.broadinstitute.sting.queue +import engine.JobRunInfo import org.broadinstitute.sting.queue.function.QFunction import annotation.target.field import io.Source @@ -61,8 +62,9 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon * A default handler for the onExecutionDone() function. By default this doesn't do anything * except print out a fine status message. */ - def onExecutionDone(jobs: List[QFunction], success: Boolean) { + def onExecutionDone(jobs: Map[QFunction, JobRunInfo], success: Boolean) { logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", jobs.size)) + for ( (f, info) <- jobs ) logger.info(" %s %s".format(f.jobName, info)) } /** diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 68bc7ae61..39354c5a4 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -23,6 +23,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod */ var depth = -1 + var runInfo: JobRunInfo = JobRunInfo.default // todo: replace after testing with _ + /** * Initializes with the current status of the function. */ @@ -88,6 +90,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod tailError() } else if (currentStatus == RunnerStatus.DONE) { try { + runInfo = runner.getRunInfo runner.cleanup() function.doneOutputs.foreach(_.createNewFile()) } catch { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala new file mode 100644 index 000000000..384874613 --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine + +import java.util.Date + +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +/** + * Base class containing all of the information about a job run. + */ +class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String, status: RunnerStatus.Value) { + def getStatus = status + def getStartTime = startTime + def getDoneTime = doneTime + def getMemoryUsedInGb = memUsedInGb + def getHostname = hostName + + def getRuntimeInMs: Long = { + getDoneTime.getTime - getStartTime.getTime + } + + override def toString: String = + "started %s ended %s runtime %s on host %s using %d Gb memory".format(getStartTime, getDoneTime, getRuntimeInMs, getHostname, getMemoryUsedInGb) +} + +object JobRunInfo { + def default = new JobRunInfo(new Date(), new Date(), 1, "localhost", RunnerStatus.DONE) + def detailed(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String) = + new JobRunInfo(startTime, doneTime, memUsedInGb, hostName, RunnerStatus.DONE) +} \ No newline at end of file diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index de5fbde05..510aa33eb 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -69,6 +69,11 @@ trait JobRunner[TFunction <: QFunction] { def cleanup() { } + /** + * Must be overloaded + */ + def getRunInfo: JobRunInfo = JobRunInfo.default + /** * Calls back to a hook that an expert user can setup to modify a job. * @param value Value to modify. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 766d9db94..0baa5dfe2 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils import org.broadinstitute.sting.queue.util._ import collection.immutable.{TreeSet, TreeMap} import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction} +import java.util.Date /** * The internal dependency tracker between sets of function input and output files. @@ -939,6 +940,14 @@ class QGraph extends Logging { edges.sorted(functionOrdering).foreach(edge => if (running) f(edge)) } + /** + * Utility function for running a method over all function edges. + * @param edgeFunction Function to run for each FunctionEdge. + */ + private def getFunctionEdges: List[FunctionEdge] = { + jobGraph.edgeSet.toList.filter(_.isInstanceOf[FunctionEdge]).asInstanceOf[List[FunctionEdge]] + } + /** * Utility function for running a method over all functions, but traversing the nodes in order of dependency. * @param edgeFunction Function to run for each FunctionEdge. @@ -1028,6 +1037,10 @@ class QGraph extends Logging { */ def isShutdown = !running + def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = { + getFunctionEdges.map(edge => (edge.function, edge.runInfo)).toMap + } + /** * Kills any forked jobs still running. */ From 569e1a1089c766913ba17f727cc43db794d65693 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 23 Aug 2011 16:53:06 -0400 Subject: [PATCH 04/31] Walker.isDone() aborts execution early -- Useful if you want to have a parameter like MAX_RECORDS that wants the walker to stop after some number of map calls without having to resort to the old System.exit() call directly. --- .../gatk/executive/LinearMicroScheduler.java | 6 +++- .../gatk/traversals/TraverseDuplicates.java | 3 ++ .../sting/gatk/traversals/TraverseLoci.java | 11 +++++--- .../gatk/traversals/TraverseReadPairs.java | 4 +++ .../sting/gatk/traversals/TraverseReads.java | 3 ++ .../sting/gatk/walkers/Walker.java | 11 ++++++++ .../walkers/variantutils/VariantsToTable.java | 28 +++++++++---------- 7 files changed, 47 insertions(+), 19 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 48fd73e0b..65ff27497 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -48,9 +48,10 @@ public class LinearMicroScheduler extends MicroScheduler { walker.initialize(); Accumulator accumulator = Accumulator.create(engine,walker); + boolean done = walker.isDone(); int counter = 0; for (Shard shard : shardStrategy ) { - if ( shard == null ) // we ran out of shards that aren't owned + if ( done || shard == null ) // we ran out of shards that aren't owned break; if(shard.getShardType() == Shard.ShardType.LOCUS) { @@ -61,6 +62,7 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); + if ( walker.isDone() ) break; } windowMaker.close(); } @@ -70,6 +72,8 @@ public class LinearMicroScheduler extends MicroScheduler { accumulator.accumulate(dataProvider,result); dataProvider.close(); } + + done = walker.isDone(); } Object result = accumulator.finishTraversal(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 1ba48ca5f..046003154 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -173,7 +173,9 @@ public class TraverseDuplicates extends TraversalEngine those with the same mate pair position, for paired reads * -> those flagged as unpaired and duplicated but having the same start and end */ + boolean done = walker.isDone(); for (SAMRecord read : iter) { + if ( done ) break; // get the genome loc from the read GenomeLoc site = engine.getGenomeLocParser().createGenomeLoc(read); @@ -194,6 +196,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine,Locu logger.debug(String.format("TraverseLoci.traverse: Shard is %s", dataProvider)); LocusView locusView = getLocusView( walker, dataProvider ); + boolean done = false; if ( locusView.hasNext() ) { // trivial optimization to avoid unnecessary processing when there's nothing here at all @@ -46,7 +47,7 @@ public class TraverseLoci extends TraversalEngine,Locu LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider ); // We keep processing while the next reference location is within the interval - while( locusView.hasNext() ) { + while( locusView.hasNext() && ! done ) { AlignmentContext locus = locusView.next(); GenomeLoc location = locus.getLocation(); @@ -76,15 +77,17 @@ public class TraverseLoci extends TraversalEngine,Locu if (keepMeP) { M x = walker.map(tracker, refContext, locus); sum = walker.reduce(x, sum); + done = walker.isDone(); } printProgress(dataProvider.getShard(),locus.getLocation()); } } - // We have a final map call to execute here to clean up the skipped based from the - // last position in the ROD to that in the interval - if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA ) { + // We have a final map call to execute here to clean up the skipped based from the + // last position in the ROD to that in the interval + if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA && ! walker.isDone() ) { + // only do this if the walker isn't done! RodLocusView rodLocusView = (RodLocusView)locusView; long nSkipped = rodLocusView.getLastSkippedBases(); if ( nSkipped > 0 ) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 196d54036..dd4402d82 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -50,7 +50,9 @@ public class TraverseReadPairs extends TraversalEngine pairs = new ArrayList(); + boolean done = walker.isDone(); for(SAMRecord read: reads) { + if ( done ) break; dataProvider.getShard().getReadMetrics().incrementNumReadsSeen(); if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) { @@ -65,6 +67,8 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read // get the reference ordered data ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + boolean done = walker.isDone(); // while we still have more reads for (SAMRecord read : reads) { + if ( done ) break; // ReferenceContext -- the reference bases covered by the read ReferenceContext refContext = null; @@ -106,6 +108,7 @@ public class TraverseReads extends TraversalEngine,Read GenomeLoc locus = read.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX ? null : engine.getGenomeLocParser().createGenomeLoc(read.getReferenceName(),read.getAlignmentStart()); printProgress(dataProvider.getShard(),locus); + done = walker.isDone(); } return sum; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java index 9e261a0b1..c88c7c3c4 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java @@ -126,6 +126,17 @@ public abstract class Walker { public void initialize() { } + /** + * A function for overloading in subclasses providing a mechanism to abort early from a walker. + * + * If this ever returns true, then the Traversal engine will stop executing map calls + * and start the process of shutting down the walker in an orderly fashion. + * @return + */ + public boolean isDone() { + return false; + } + /** * Provide an initial value for reduce computations. * @return Initial value of reduce. diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java index 19db58e0c..5dd75c858 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java @@ -147,22 +147,22 @@ public class VariantsToTable extends RodWalker { if ( tracker == null ) // RodWalkers can make funky map calls return 0; - if ( ++nRecords < MAX_RECORDS || MAX_RECORDS == -1 ) { - for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) { - if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) { - List vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA); - out.println(Utils.join("\t", vals)); - } + nRecords++; + for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) { + if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) { + List vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA); + out.println(Utils.join("\t", vals)); } - - return 1; - } else { - if ( nRecords >= MAX_RECORDS ) { - logger.warn("Calling sys exit to leave after " + nRecords + " records"); - System.exit(0); // todo -- what's the recommend way to abort like this? - } - return 0; } + + return 1; + } + + @Override + public boolean isDone() { + boolean done = MAX_RECORDS != -1 && nRecords >= MAX_RECORDS; + if ( done) logger.warn("isDone() will return true to leave after " + nRecords + " records"); + return done ; } private static final boolean isWildCard(String s) { From b8bc03bb4238fde0ee84c05947dacfe12a51d503 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 23 Aug 2011 17:11:22 -0400 Subject: [PATCH 05/31] JobRunInfo improvements -- dry-run now adds some info, for testing -- InProcessRunner adds some, but not all, of the information we want --- .../sting/queue/engine/FunctionEdge.scala | 7 +++++-- .../sting/queue/engine/InProcessRunner.scala | 5 +++++ .../sting/queue/engine/JobRunInfo.scala | 12 ++++++++---- .../sting/queue/engine/JobRunner.scala | 3 ++- .../broadinstitute/sting/queue/engine/QGraph.scala | 4 +++- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 39354c5a4..ef7f2afb0 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -23,7 +23,7 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod */ var depth = -1 - var runInfo: JobRunInfo = JobRunInfo.default // todo: replace after testing with _ + val myRunInfo: JobRunInfo = JobRunInfo.default // purely for dryRun testing /** * Initializes with the current status of the function. @@ -90,7 +90,6 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod tailError() } else if (currentStatus == RunnerStatus.DONE) { try { - runInfo = runner.getRunInfo runner.cleanup() function.doneOutputs.foreach(_.createNewFile()) } catch { @@ -182,4 +181,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod printWriter.close IOUtils.writeContents(functionErrorFile, stackTrace.toString) } + + def getRunInfo = { + if ( runner == null ) myRunInfo else runner.getRunInfo + } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index d583a55ef..85c3db699 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -2,6 +2,7 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction import org.broadinstitute.sting.queue.util.IOUtils +import java.util.Date /** * Runs a function that executes in process and does not fork out an external process. @@ -10,8 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce private var runStatus: RunnerStatus.Value = _ def start() = { + runInfo.startTime = new Date() runStatus = RunnerStatus.RUNNING + function.run() + + runInfo.doneTime = new Date() val content = "%s%nDone.".format(function.description) IOUtils.writeContents(function.jobOutputFile, content) runStatus = RunnerStatus.DONE diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 384874613..07bf1d1da 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -53,7 +53,13 @@ import java.util.Date /** * Base class containing all of the information about a job run. */ -class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String, status: RunnerStatus.Value) { +class JobRunInfo { + var startTime: Date = _ + var doneTime: Date = _ + var memUsedInGb: Int = -1 + var hostName: String = "localhost" + var status: RunnerStatus.Value = RunnerStatus.DONE + def getStatus = status def getStartTime = startTime def getDoneTime = doneTime @@ -69,7 +75,5 @@ class JobRunInfo(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: St } object JobRunInfo { - def default = new JobRunInfo(new Date(), new Date(), 1, "localhost", RunnerStatus.DONE) - def detailed(startTime: Date, doneTime: Date, memUsedInGb: Int, hostName: String) = - new JobRunInfo(startTime, doneTime, memUsedInGb, hostName, RunnerStatus.DONE) + def default: JobRunInfo = new JobRunInfo() } \ No newline at end of file diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index 510aa33eb..6dca5d89f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -72,7 +72,8 @@ trait JobRunner[TFunction <: QFunction] { /** * Must be overloaded */ - def getRunInfo: JobRunInfo = JobRunInfo.default + val runInfo = JobRunInfo.default + def getRunInfo = runInfo /** * Calls back to a hook that an expert user can setup to modify a job. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 0baa5dfe2..557230b05 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -320,7 +320,9 @@ class QGraph extends Logging { logger.debug("+++++++") foreachFunction(readyJobs.toList, edge => { if (running) { + edge.myRunInfo.startTime = new Date() logEdge(edge) + edge.myRunInfo.doneTime = new Date() edge.markAsDone } }) @@ -1038,7 +1040,7 @@ class QGraph extends Logging { def isShutdown = !running def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = { - getFunctionEdges.map(edge => (edge.function, edge.runInfo)).toMap + getFunctionEdges.map(edge => (edge.function, edge.getRunInfo)).toMap } /** From 28ee6dac418ca12309f26c3e52b84d9b57f303a0 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 10:14:45 -0400 Subject: [PATCH 06/31] Fixed spelling mistake --- .../sting/gatk/walkers/diffengine/DiffEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java index 4e3342609..2159bc839 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java @@ -234,7 +234,7 @@ public class DiffEngine { // now that we have a specific list of values we want to show, display them GATKReport report = new GATKReport(); - final String tableName = "diffences"; + final String tableName = "differences"; report.addTable(tableName, "Summarized differences between the master and test files. See http://www.broadinstitute.org/gsa/wiki/index.php/DiffEngine for more information", false); GATKReportTable table = report.getTable(tableName); table.addPrimaryKey("Difference", true); From 3ae68e23976e6439b694bdfdf3149ebbb86957ad Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 13:36:39 -0400 Subject: [PATCH 07/31] JobLogging trait now writes out GATKReport log of jobs --- .../sting/queue/QCommandLine.scala | 1 + .../sting/queue/util/JobLogging.scala | 129 ++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 94a6408c6..becd1e1cf 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -115,6 +115,7 @@ class QCommandLine extends CommandLineProgram with Logging { // walk over each script, calling onExecutionDone for (script <- allQScripts) { script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) + JobLogging.printLogs(qGraph.getFunctionsAndStatus(script.functions)) } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala b/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala new file mode 100644 index 000000000..d24ebe92b --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util +import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport} +import org.broadinstitute.sting.utils.exceptions.UserException +import org.broadinstitute.sting.queue.engine.JobRunInfo + +/** + * A mixin to add Job info to the class + */ +trait JobLogging extends QFunction { + private var group: String = _ + private var features: Map[String, String] = null + + def getGroup = group + def isEnabled = group != null + def getFeatureNames: List[String] = features.keys.toList + def getFeatures = features + def get(key: String): String = { + features.get(key) match { + case Some(x) => x + case None => throw new RuntimeException("Get called with key %s but no value was found".format(key)) + } + } + def getName: String = features.get("jobName").get + + private def addRunInfo(info: JobRunInfo) { + features = features ++ Map( + "analysisName" -> this.analysisName, + "jobName" -> this.jobName, + "intermediate" -> this.isIntermediate, + "startTime" -> info.getStartTime, + "doneTime" -> info.getDoneTime, + "memUsedInGb" -> info.getMemoryUsedInGb, + "runtime" -> info.getRuntimeInMs, + "hostName" -> info.getHostname).mapValues(_.toString) + } + + def setJobLogging(group: String) { + this.group = group + } + + def setJobLogging(group: String, features: Map[String, Any]) { + this.group = group + this.features = features.mapValues(_.toString) + } +} + +object JobLogging { + def printLogs(jobs: Map[QFunction, JobRunInfo]) { + val jobLogs: List[JobLogging] = jobLoggingSublist(jobs.keys.toList) + jobLogs.foreach((job: JobLogging) => job.addRunInfo(jobs.get(job).get)) + printJobLogging(jobLogs) + } + + private def jobLoggingSublist(l: List[QFunction]): List[JobLogging] = { + def asJogLogging(qf: QFunction): JobLogging = qf match { + case x: JobLogging => x + case _ => null + } + + l.map(asJogLogging).filter(_ != null) + } + + /** + * Prints the JobLogging logs to a GATKReport. First splits up the + * logs by group, and for each group generates a GATKReportTable + */ + private def printJobLogging(logs: List[JobLogging]) { + // create the report + val report: GATKReport = new GATKReport + + // create a table for each group of logs + for ( (group, groupLogs) <- groupLogs(logs) ) { + report.addTable(group, "Job logs for " + group) + val table: GATKReportTable = report.getTable(group) + table.addPrimaryKey("jobName", false) + val keys = logKeys(groupLogs) + + // add the columns + keys.foreach(table.addColumn(_, 0)) + for (log <- groupLogs) { + for ( key <- keys ) + table.set(log.getName, key, log.get(key)) + } + } + + report.print(System.out) + } + + private def groupLogs(logs: List[JobLogging]): Map[String, List[JobLogging]] = { + logs.groupBy(_.getGroup) + } + + private def logKeys(logs: List[JobLogging]): Set[String] = { + // the keys should be the same for each log, but we will check that + val keys = Set[String](logs(0).getFeatureNames : _*) + + for ( log <- logs ) + if ( keys.sameElements(Set(log.getFeatureNames)) ) + throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " + + "We found one with %s and another with %s").format(keys, log.getFeatureNames)) + + keys + } +} From d047c19ad1a6a5087425a0321340aa24adf7b82f Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 13:52:05 -0400 Subject: [PATCH 08/31] Writes output to file --- .../org/broadinstitute/sting/queue/QCommandLine.scala | 4 +++- .../org/broadinstitute/sting/queue/util/JobLogging.scala | 9 +++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index becd1e1cf..c6d1f3883 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -115,7 +115,9 @@ class QCommandLine extends CommandLineProgram with Logging { // walk over each script, calling onExecutionDone for (script <- allQScripts) { script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) - JobLogging.printLogs(qGraph.getFunctionsAndStatus(script.functions)) + val reportFile = new File("joblogging.gatkreport.txt") + logger.info("Writing JobLogging GATKReport to file " + reportFile) + JobLogging.printLogs(qGraph.getFunctionsAndStatus(script.functions), reportFile) } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala b/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala index d24ebe92b..3bb43508a 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala @@ -27,6 +27,7 @@ import org.broadinstitute.sting.queue.function.QFunction import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport} import org.broadinstitute.sting.utils.exceptions.UserException import org.broadinstitute.sting.queue.engine.JobRunInfo +import java.io.{FileOutputStream, PrintStream, File} /** * A mixin to add Job info to the class @@ -70,10 +71,10 @@ trait JobLogging extends QFunction { } object JobLogging { - def printLogs(jobs: Map[QFunction, JobRunInfo]) { + def printLogs(jobs: Map[QFunction, JobRunInfo], dest: File) { val jobLogs: List[JobLogging] = jobLoggingSublist(jobs.keys.toList) jobLogs.foreach((job: JobLogging) => job.addRunInfo(jobs.get(job).get)) - printJobLogging(jobLogs) + printJobLogging(jobLogs, new PrintStream(new FileOutputStream(dest))) } private def jobLoggingSublist(l: List[QFunction]): List[JobLogging] = { @@ -89,7 +90,7 @@ object JobLogging { * Prints the JobLogging logs to a GATKReport. First splits up the * logs by group, and for each group generates a GATKReportTable */ - private def printJobLogging(logs: List[JobLogging]) { + private def printJobLogging(logs: List[JobLogging], stream: PrintStream) { // create the report val report: GATKReport = new GATKReport @@ -108,7 +109,7 @@ object JobLogging { } } - report.print(System.out) + report.print(stream) } private def groupLogs(logs: List[JobLogging]): Map[String, List[JobLogging]] = { From 16d8360592ef30d00337828e6ce1f4466660c1d2 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 13:59:14 -0400 Subject: [PATCH 09/31] QJobReport is now the official capability name --- .../sting/queue/QCommandLine.scala | 5 ++-- .../sting/queue/engine/QGraphSettings.scala | 3 +++ .../{JobLogging.scala => QJobReport.scala} | 26 ++++++++++--------- 3 files changed, 19 insertions(+), 15 deletions(-) rename public/scala/src/org/broadinstitute/sting/queue/util/{JobLogging.scala => QJobReport.scala} (84%) diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index c6d1f3883..668bd9604 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -115,9 +115,8 @@ class QCommandLine extends CommandLineProgram with Logging { // walk over each script, calling onExecutionDone for (script <- allQScripts) { script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) - val reportFile = new File("joblogging.gatkreport.txt") - logger.info("Writing JobLogging GATKReport to file " + reportFile) - JobLogging.printLogs(qGraph.getFunctionsAndStatus(script.functions), reportFile) + logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile) + QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile) } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 6ece600dd..13c841778 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -69,6 +69,9 @@ class QGraphSettings { @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) var expandedDotFile: File = _ + @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) + var jobReportFile: File = new File("queue_job_report.gatkreport.txt") + @ArgumentCollection val qSettings = new QSettings } diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala similarity index 84% rename from public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala rename to public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index 3bb43508a..882831f6b 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/JobLogging.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -32,7 +32,7 @@ import java.io.{FileOutputStream, PrintStream, File} /** * A mixin to add Job info to the class */ -trait JobLogging extends QFunction { +trait QJobReport extends QFunction { private var group: String = _ private var features: Map[String, String] = null @@ -70,16 +70,18 @@ trait JobLogging extends QFunction { } } -object JobLogging { - def printLogs(jobs: Map[QFunction, JobRunInfo], dest: File) { - val jobLogs: List[JobLogging] = jobLoggingSublist(jobs.keys.toList) - jobLogs.foreach((job: JobLogging) => job.addRunInfo(jobs.get(job).get)) - printJobLogging(jobLogs, new PrintStream(new FileOutputStream(dest))) +object QJobReport { + def printReport(jobs: Map[QFunction, JobRunInfo], dest: File) { + val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList) + jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get)) + val stream = new PrintStream(new FileOutputStream(dest)) + printJobLogging(jobLogs, stream) + stream.close() } - private def jobLoggingSublist(l: List[QFunction]): List[JobLogging] = { - def asJogLogging(qf: QFunction): JobLogging = qf match { - case x: JobLogging => x + private def jobLoggingSublist(l: List[QFunction]): List[QJobReport] = { + def asJogLogging(qf: QFunction): QJobReport = qf match { + case x: QJobReport => x case _ => null } @@ -90,7 +92,7 @@ object JobLogging { * Prints the JobLogging logs to a GATKReport. First splits up the * logs by group, and for each group generates a GATKReportTable */ - private def printJobLogging(logs: List[JobLogging], stream: PrintStream) { + private def printJobLogging(logs: List[QJobReport], stream: PrintStream) { // create the report val report: GATKReport = new GATKReport @@ -112,11 +114,11 @@ object JobLogging { report.print(stream) } - private def groupLogs(logs: List[JobLogging]): Map[String, List[JobLogging]] = { + private def groupLogs(logs: List[QJobReport]): Map[String, List[QJobReport]] = { logs.groupBy(_.getGroup) } - private def logKeys(logs: List[JobLogging]): Set[String] = { + private def logKeys(logs: List[QJobReport]): Set[String] = { // the keys should be the same for each log, but we will check that val keys = Set[String](logs(0).getFeatureNames : _*) From 4918519a582333ab3d2f0423d6755068d7a12663 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 14:14:01 -0400 Subject: [PATCH 10/31] No more NPE in getRuntime() when you cntr-c out of Queue --- .../org/broadinstitute/sting/queue/engine/JobRunInfo.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 07bf1d1da..2316f3968 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -67,7 +67,10 @@ class JobRunInfo { def getHostname = hostName def getRuntimeInMs: Long = { - getDoneTime.getTime - getStartTime.getTime + if ( getDoneTime != null && getStartTime != null ) + getDoneTime.getTime - getStartTime.getTime + else + -1 } override def toString: String = From 06e30a81d120225a1247ae7a0047623c5f749431 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 15:30:09 -0400 Subject: [PATCH 11/31] Fixes throughout for getting job information -- no more hostname -- it's just not going to be important --- .../sting/queue/engine/InProcessRunner.scala | 6 +++--- .../sting/queue/engine/JobRunInfo.scala | 13 ++++++++++--- .../sting/queue/engine/QGraphSettings.scala | 2 +- .../sting/queue/engine/lsf/Lsf706JobRunner.scala | 14 ++++++++++++-- .../sting/queue/engine/shell/ShellJobRunner.scala | 3 +++ .../sting/queue/util/QJobReport.scala | 11 ++++++----- 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index 85c3db699..25baaca4f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -1,8 +1,8 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.util.IOUtils import java.util.Date +import org.broadinstitute.sting.queue.util.{Logging, IOUtils} /** * Runs a function that executes in process and does not fork out an external process. @@ -11,12 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce private var runStatus: RunnerStatus.Value = _ def start() = { - runInfo.startTime = new Date() + getRunInfo.startTime = new Date() runStatus = RunnerStatus.RUNNING function.run() - runInfo.doneTime = new Date() + getRunInfo.doneTime = new Date() val content = "%s%nDone.".format(function.description) IOUtils.writeContents(function.jobOutputFile, content) runStatus = RunnerStatus.DONE diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 2316f3968..563a32486 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -25,6 +25,7 @@ package org.broadinstitute.sting.queue.engine import java.util.Date +import java.text.SimpleDateFormat /* * Copyright (c) 2011, The Broad Institute @@ -57,14 +58,20 @@ class JobRunInfo { var startTime: Date = _ var doneTime: Date = _ var memUsedInGb: Int = -1 - var hostName: String = "localhost" var status: RunnerStatus.Value = RunnerStatus.DONE def getStatus = status + def getStartTime = startTime def getDoneTime = doneTime + def getFormattedStartTime = formatTime(getStartTime) + def getFormattedDoneTime = formatTime(getDoneTime) + + val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS"); + private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" + def getMemoryUsedInGb = memUsedInGb - def getHostname = hostName + def isFilledIn = startTime != null def getRuntimeInMs: Long = { if ( getDoneTime != null && getStartTime != null ) @@ -74,7 +81,7 @@ class JobRunInfo { } override def toString: String = - "started %s ended %s runtime %s on host %s using %d Gb memory".format(getStartTime, getDoneTime, getRuntimeInMs, getHostname, getMemoryUsedInGb) + "started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb) } object JobRunInfo { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 13c841778..166596316 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -70,7 +70,7 @@ class QGraphSettings { var expandedDotFile: File = _ @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) - var jobReportFile: File = new File("queue_job_report.gatkreport.txt") + var jobReportFile: File = new File("/dev/stdout") @ArgumentCollection val qSettings = new QSettings diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index bb711344c..47f0d2c18 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -31,11 +31,12 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} -import com.sun.jna.ptr.IntByReference import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} -import com.sun.jna.{Structure, StringArray, NativeLong} import java.util.regex.Pattern import java.lang.StringBuffer +import java.util.Date +import com.sun.jna.{Pointer, Structure, StringArray, NativeLong} +import com.sun.jna.ptr.{PointerByReference, IntByReference} /** * Runs jobs on an LSF compute cluster. @@ -271,12 +272,21 @@ object Lsf706JobRunner extends Logging { logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo)) + def updateRunInfo() { + runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) + runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) + runner.getRunInfo.hostName = "unavailable" // TODO : exHosts + runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem + } + runner.updateStatus( if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { // Done successfully. + updateRunInfo() RunnerStatus.DONE } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { // Exited function that (probably) won't be retried. + updateRunInfo() RunnerStatus.FAILED } else { // Note that we still saw the job in the system. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala index 03f9d3315..4124f65a0 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -27,6 +27,7 @@ package org.broadinstitute.sting.queue.engine.shell import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util.ShellJob import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import java.util.Date /** * Runs jobs one at a time locally @@ -50,8 +51,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu // Allow advanced users to update the job. updateJobRun(job) + getRunInfo.startTime = new Date() updateStatus(RunnerStatus.RUNNING) job.run() + getRunInfo.doneTime = new Date() updateStatus(RunnerStatus.DONE) } diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index 882831f6b..be0c2a5fc 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -49,15 +49,15 @@ trait QJobReport extends QFunction { def getName: String = features.get("jobName").get private def addRunInfo(info: JobRunInfo) { + logger.info("info " + info) features = features ++ Map( "analysisName" -> this.analysisName, "jobName" -> this.jobName, "intermediate" -> this.isIntermediate, - "startTime" -> info.getStartTime, - "doneTime" -> info.getDoneTime, + "startTime" -> info.getFormattedStartTime, + "doneTime" -> info.getFormattedDoneTime, "memUsedInGb" -> info.getMemoryUsedInGb, - "runtime" -> info.getRuntimeInMs, - "hostName" -> info.getHostname).mapValues(_.toString) + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") } def setJobLogging(group: String) { @@ -71,7 +71,8 @@ trait QJobReport extends QFunction { } object QJobReport { - def printReport(jobs: Map[QFunction, JobRunInfo], dest: File) { + def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) { + val jobs = jobsRaw.filter(_._2.isFilledIn) val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList) jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get)) val stream = new PrintStream(new FileOutputStream(dest)) From 08fb21f127bf00d7b15790be3f6ba8ca1ef8753b Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 24 Aug 2011 16:45:50 -0400 Subject: [PATCH 12/31] Removing hostname --- .../sting/queue/engine/lsf/Lsf706JobRunner.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index 47f0d2c18..b31a66150 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -275,8 +275,7 @@ object Lsf706JobRunner extends Logging { def updateRunInfo() { runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) - runner.getRunInfo.hostName = "unavailable" // TODO : exHosts - runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem + runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem // todo -- ask khalid about units here } runner.updateStatus( From a7d6946b22e3689f658361be0b1a79d63775847a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 13:13:55 -0400 Subject: [PATCH 14/31] Refactored QJobReport and QFunction, which is now automatically tracked -- All QFunctions, including sg ones, are tracked -- Removed memory information --- .../sting/queue/engine/JobRunInfo.scala | 6 +- .../queue/engine/lsf/Lsf706JobRunner.scala | 1 - .../sting/queue/function/QFunction.scala | 14 ++- .../sting/queue/util/QJobReport.scala | 105 +++++++++++------- 4 files changed, 75 insertions(+), 51 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index 563a32486..c99cbb1fc 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -57,7 +57,6 @@ import java.text.SimpleDateFormat class JobRunInfo { var startTime: Date = _ var doneTime: Date = _ - var memUsedInGb: Int = -1 var status: RunnerStatus.Value = RunnerStatus.DONE def getStatus = status @@ -67,10 +66,9 @@ class JobRunInfo { def getFormattedStartTime = formatTime(getStartTime) def getFormattedDoneTime = formatTime(getDoneTime) - val formatter = new SimpleDateFormat("dd.MM.yy/H:mm:ss:SSS"); + val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS"); private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" - def getMemoryUsedInGb = memUsedInGb def isFilledIn = startTime != null def getRuntimeInMs: Long = { @@ -81,7 +79,7 @@ class JobRunInfo { } override def toString: String = - "started %s ended %s runtime %s using %d Gb memory".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs, getMemoryUsedInGb) + "started %s ended %s runtime %s".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs) } object JobRunInfo { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index b31a66150..02066c74f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -275,7 +275,6 @@ object Lsf706JobRunner extends Logging { def updateRunInfo() { runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) - runner.getRunInfo.memUsedInGb = jobInfo.runRusage.mem // todo -- ask khalid about units here } runner.updateStatus( diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 7048b6413..c905581fa 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -30,14 +30,14 @@ import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.queue.{QException, QSettings} import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction -import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, ReflectionUtils} +import org.broadinstitute.sting.queue.util._ /** * The base interface for all functions in Queue. * Inputs and outputs are specified as Sets of values. * Inputs are matched to other outputs by using .equals() */ -trait QFunction extends Logging { +trait QFunction extends Logging with QJobReport { /** A short description of this step in the graph */ var analysisName: String = "" @@ -83,11 +83,17 @@ trait QFunction extends Logging { */ var deleteIntermediateOutputs = true + // ------------------------------------------------------- + // + // job run information + // + // ------------------------------------------------------- + /** * Copies settings from this function to another function. * @param function QFunction to copy values to. */ - def copySettingsTo(function: QFunction) { + override def copySettingsTo(function: QFunction) { function.analysisName = this.analysisName function.jobName = this.jobName function.qSettings = this.qSettings @@ -99,6 +105,8 @@ trait QFunction extends Logging { function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate function.deleteIntermediateOutputs = this.deleteIntermediateOutputs + function.reportGroup = this.reportGroup + function.reportFeatures = this.reportFeatures } /** File to redirect any output. Defaults to .out */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index be0c2a5fc..3e393e4c0 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -28,72 +28,91 @@ import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport} import org.broadinstitute.sting.utils.exceptions.UserException import org.broadinstitute.sting.queue.engine.JobRunInfo import java.io.{FileOutputStream, PrintStream, File} +import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction} /** * A mixin to add Job info to the class */ -trait QJobReport extends QFunction { - private var group: String = _ - private var features: Map[String, String] = null +// todo -- need to enforce QFunction to have copySettingTo work +trait QJobReport extends Logging { + self: QFunction => - def getGroup = group - def isEnabled = group != null - def getFeatureNames: List[String] = features.keys.toList - def getFeatures = features - def get(key: String): String = { - features.get(key) match { + // todo -- might make more sense to mix in the variables + protected var reportGroup: String = null + protected var reportFeatures: Map[String, String] = Map() + + def includeInReport = getReportGroup != null + def setRunInfo(info: JobRunInfo) { + logger.info("info " + info) + reportFeatures = reportFeatures ++ Map( + "analysisName" -> self.analysisName, + "jobName" -> QJobReport.workAroundSameJobNames(this), + "intermediate" -> self.isIntermediate, + "startTime" -> info.getStartTime.getTime, + "doneTime" -> info.getDoneTime.getTime, + "formattedStartTime" -> info.getFormattedStartTime, + "formattedDoneTime" -> info.getFormattedDoneTime, + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + } + + def getReportGroup = reportGroup + def getReportFeatures = reportFeatures + + def getReportFeatureNames: List[String] = getReportFeatures.keys.toList + def getReportFeature(key: String): String = { + getReportFeatures.get(key) match { case Some(x) => x case None => throw new RuntimeException("Get called with key %s but no value was found".format(key)) } } - def getName: String = features.get("jobName").get - private def addRunInfo(info: JobRunInfo) { - logger.info("info " + info) - features = features ++ Map( - "analysisName" -> this.analysisName, - "jobName" -> this.jobName, - "intermediate" -> this.isIntermediate, - "startTime" -> info.getFormattedStartTime, - "doneTime" -> info.getFormattedDoneTime, - "memUsedInGb" -> info.getMemoryUsedInGb, - "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + def getReportName: String = getReportFeature("jobName") + + def configureJobReport(group: String) { + this.reportGroup = group } - def setJobLogging(group: String) { - this.group = group + def configureJobReport(group: String, features: Map[String, Any]) { + this.reportGroup = group + this.reportFeatures = features.mapValues(_.toString) } - def setJobLogging(group: String, features: Map[String, Any]) { - this.group = group - this.features = features.mapValues(_.toString) + // copy the QJobReport information -- todo : what's the best way to do this? + override def copySettingsTo(function: QFunction) { + self.copySettingsTo(function) + function.reportGroup = this.reportGroup + function.reportFeatures = this.reportFeatures } } object QJobReport { + // todo -- fixme to have a unique name for Scatter/gather jobs as well + var seenCounter = 1 + var seenNames = Set[String]() + def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) { - val jobs = jobsRaw.filter(_._2.isFilledIn) - val jobLogs: List[QJobReport] = jobLoggingSublist(jobs.keys.toList) - jobLogs.foreach((job: QJobReport) => job.addRunInfo(jobs.get(job).get)) + val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport) + jobs foreach {case (qf, info) => qf.setRunInfo(info)} val stream = new PrintStream(new FileOutputStream(dest)) - printJobLogging(jobLogs, stream) + printJobLogging(jobs.keys.toList, stream) stream.close() } - private def jobLoggingSublist(l: List[QFunction]): List[QJobReport] = { - def asJogLogging(qf: QFunction): QJobReport = qf match { - case x: QJobReport => x - case _ => null + def workAroundSameJobNames(func: QFunction):String = { + if ( seenNames.apply(func.jobName) ) { + seenCounter += 1 + "%s_%d".format(func.jobName, seenCounter) + } else { + seenNames += func.jobName + func.jobName } - - l.map(asJogLogging).filter(_ != null) } /** * Prints the JobLogging logs to a GATKReport. First splits up the * logs by group, and for each group generates a GATKReportTable */ - private def printJobLogging(logs: List[QJobReport], stream: PrintStream) { + private def printJobLogging(logs: List[QFunction], stream: PrintStream) { // create the report val report: GATKReport = new GATKReport @@ -108,25 +127,25 @@ object QJobReport { keys.foreach(table.addColumn(_, 0)) for (log <- groupLogs) { for ( key <- keys ) - table.set(log.getName, key, log.get(key)) + table.set(log.getReportName, key, log.getReportFeature(key)) } } report.print(stream) } - private def groupLogs(logs: List[QJobReport]): Map[String, List[QJobReport]] = { - logs.groupBy(_.getGroup) + private def groupLogs(logs: List[QFunction]): Map[String, List[QFunction]] = { + logs.groupBy(_.getReportGroup) } - private def logKeys(logs: List[QJobReport]): Set[String] = { + private def logKeys(logs: List[QFunction]): Set[String] = { // the keys should be the same for each log, but we will check that - val keys = Set[String](logs(0).getFeatureNames : _*) + val keys = Set[String](logs(0).getReportFeatureNames : _*) for ( log <- logs ) - if ( keys.sameElements(Set(log.getFeatureNames)) ) + if ( keys.sameElements(Set(log.getReportFeatureNames)) ) throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " + - "We found one with %s and another with %s").format(keys, log.getFeatureNames)) + "We found one with %s and another with %s").format(keys, log.getReportFeatureNames)) keys } From d65faf509c208506153aa285b5f9371eed367ef1 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 13:15:20 -0400 Subject: [PATCH 15/31] Default output name for Queue JobReport is queue_jobreport.gatkreport.txt --- .../org/broadinstitute/sting/queue/engine/QGraphSettings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 166596316..c7a67363d 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -70,7 +70,7 @@ class QGraphSettings { var expandedDotFile: File = _ @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) - var jobReportFile: File = new File("/dev/stdout") + var jobReportFile: File = new File("queue_jobreport.gatkreport.txt") @ArgumentCollection val qSettings = new QSettings From 0f4be2c4a4ae36d209a50e00f5ba2374aacc0084 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 13:32:03 -0400 Subject: [PATCH 16/31] Argument to disable queueJobReport entirely -- Minor improvements to RodPerformanceGoals --- .../src/org/broadinstitute/sting/queue/QCommandLine.scala | 6 ++++-- .../broadinstitute/sting/queue/engine/QGraphSettings.scala | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index 668bd9604..d877575df 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -115,8 +115,10 @@ class QCommandLine extends CommandLineProgram with Logging { // walk over each script, calling onExecutionDone for (script <- allQScripts) { script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) - logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile) - QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile) + if ( ! settings.disableJobReport ) { + logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile) + QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile) + // todo -- execute Rscript here once generic RScript execution system is implemented } } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index c7a67363d..dbc3e3886 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -26,8 +26,8 @@ package org.broadinstitute.sting.queue.engine import java.io.File import org.broadinstitute.sting.queue.QSettings -import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument} import org.broadinstitute.sting.queue.util.SystemUtils +import org.broadinstitute.sting.commandline.{Advanced, ArgumentCollection, Argument} /** * Command line options for a QGraph. @@ -72,6 +72,10 @@ class QGraphSettings { @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) var jobReportFile: File = new File("queue_jobreport.gatkreport.txt") + @Advanced + @Argument(fullName="disableJobReport", shortName="disabpleJobReport", doc="If provided, we will not create a job report", required=false) + var disableJobReport: Boolean = false + @ArgumentCollection val qSettings = new QSettings } From 8bbef79fc275be0f250b2eaf8fdf6d2892b985a1 Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Thu, 25 Aug 2011 15:37:26 -0400 Subject: [PATCH 18/31] Create clipped alleles during allele parsing instead of creating a full VC, clipping alleles, and regenerating the VC from scratch. --- .../utils/codecs/vcf/AbstractVCFCodec.java | 143 +++++------------- .../utils/variantcontext/VariantContext.java | 5 +- .../variantcontext/VariantContextUtils.java | 74 ++++++++- 3 files changed, 111 insertions(+), 111 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java index a3100030e..3a4ca2478 100755 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java @@ -281,7 +281,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, VariantContext vc = null; try { - vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes); + vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes, ref.getBytes()[0]); } catch (Exception e) { generateException(e.getMessage()); } @@ -291,7 +291,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, vc.getGenotypes(); // Trim bases of all alleles if necessary - return createVariantContextWithTrimmedAlleles(vc); + return vc; } /** @@ -516,25 +516,44 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, return true; } - private static int computeForwardClipping(List unclippedAlleles, String ref) { + public static int computeForwardClipping(List unclippedAlleles, String ref) { boolean clipping = true; - // Note that the computation of forward clipping here is meant only to see whether there is a common - // base to all alleles, and to correctly compute reverse clipping, - // but it is not used for actually changing alleles - this is done in function - // createVariantContextWithTrimmedAlleles() below. - for (Allele a : unclippedAlleles) { - if (a.isSymbolic()) { + for ( Allele a : unclippedAlleles ) { + if ( a.isSymbolic() ) continue; - } - if (a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0])) { + + if ( a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0]) ) { clipping = false; + break; } } - return (clipping) ? 1 : 0; + return (clipping) ? 1 : 0; } + protected static int computeReverseClipping(List unclippedAlleles, String ref, int forwardClipping, int lineNo) { + int clipping = 0; + boolean stillClipping = true; + + while ( stillClipping ) { + for ( Allele a : unclippedAlleles ) { + if ( a.isSymbolic() ) + continue; + + if ( a.length() - clipping <= forwardClipping || a.length() - forwardClipping == 0 ) + stillClipping = false; + else if ( ref.length() == clipping ) + generateException("bad alleles encountered", lineNo); + else if ( a.getBases()[a.length()-clipping-1] != ref.getBytes()[ref.length()-clipping-1] ) + stillClipping = false; + } + if ( stillClipping ) + clipping++; + } + + return clipping; + } /** * clip the alleles, based on the reference * @@ -542,122 +561,30 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, * @param ref the reference string * @param unclippedAlleles the list of unclipped alleles * @param clippedAlleles output list of clipped alleles + * @param lineNo the current line number in the file * @return the new reference end position of this event */ protected static int clipAlleles(int position, String ref, List unclippedAlleles, List clippedAlleles, int lineNo) { - // Note that the computation of forward clipping here is meant only to see whether there is a common - // base to all alleles, and to correctly compute reverse clipping, - // but it is not used for actually changing alleles - this is done in function - // createVariantContextWithTrimmedAlleles() below. - int forwardClipping = computeForwardClipping(unclippedAlleles, ref); - - int reverseClipped = 0; - boolean clipping = true; - while (clipping) { - for (Allele a : unclippedAlleles) { - if (a.isSymbolic()) { - continue; - } - if (a.length() - reverseClipped <= forwardClipping || a.length() - forwardClipping == 0) - clipping = false; - else if (ref.length() == reverseClipped) - generateException("bad alleles encountered", lineNo); - else if (a.getBases()[a.length()-reverseClipped-1] != ref.getBytes()[ref.length()-reverseClipped-1]) - clipping = false; - } - if (clipping) reverseClipped++; - } + int reverseClipping = computeReverseClipping(unclippedAlleles, ref, forwardClipping, lineNo); if ( clippedAlleles != null ) { for ( Allele a : unclippedAlleles ) { if ( a.isSymbolic() ) { clippedAlleles.add(a); } else { - clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(),0,a.getBases().length-reverseClipped),a.isReference())); + clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(), forwardClipping, a.getBases().length-reverseClipping), a.isReference())); } } } // the new reference length - int refLength = ref.length() - reverseClipped; + int refLength = ref.length() - reverseClipping; return position+Math.max(refLength - 1,0); } - public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) { - // see if we need to trim common reference base from all alleles - boolean trimVC; - - // We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles - Allele refAllele = inputVC.getReference(); - if (!inputVC.isVariant()) - trimVC = false; - else if (refAllele.isNull()) - trimVC = false; - else { - trimVC = (computeForwardClipping(new ArrayList(inputVC.getAlternateAlleles()), - inputVC.getReference().getDisplayString()) > 0); - } - - // nothing to do if we don't need to trim bases - if (trimVC) { - List alleles = new ArrayList(); - Map genotypes = new TreeMap(); - - // set the reference base for indels in the attributes - Map attributes = new TreeMap(inputVC.getAttributes()); - - Map originalToTrimmedAlleleMap = new HashMap(); - - for (Allele a : inputVC.getAlleles()) { - if (a.isSymbolic()) { - alleles.add(a); - originalToTrimmedAlleleMap.put(a, a); - } else { - // get bases for current allele and create a new one with trimmed bases - byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length()); - Allele trimmedAllele = Allele.create(newBases, a.isReference()); - alleles.add(trimmedAllele); - originalToTrimmedAlleleMap.put(a, trimmedAllele); - } - } - - // detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation - // example: mixed records such as {TA*,TGA,TG} - boolean hasNullAlleles = false; - - for (Allele a: originalToTrimmedAlleleMap.values()) { - if (a.isNull()) - hasNullAlleles = true; - if (a.isReference()) - refAllele = a; - } - - if (!hasNullAlleles) - return inputVC; - // now we can recreate new genotypes with trimmed alleles - for ( Map.Entry sample : inputVC.getGenotypes().entrySet() ) { - - List originalAlleles = sample.getValue().getAlleles(); - List trimmedAlleles = new ArrayList(); - for ( Allele a : originalAlleles ) { - if ( a.isCalled() ) - trimmedAlleles.add(originalToTrimmedAlleleMap.get(a)); - else - trimmedAlleles.add(Allele.NO_CALL); - } - genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles)); - - } - return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0])); - - } - - return inputVC; - } - public final static boolean canDecodeFile(final File potentialInput, final String MAGIC_HEADER_LINE) { try { return isVCFStream(new FileInputStream(potentialInput), MAGIC_HEADER_LINE) || diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java index 1fde8879b..673fe4529 100755 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java @@ -258,9 +258,10 @@ public class VariantContext implements Feature { // to enable tribble intergrati * @param negLog10PError qual * @param filters filters: use null for unfiltered and empty set for passes filters * @param attributes attributes + * @param referenceBaseForIndel padded reference base */ - public VariantContext(String source, String contig, long start, long stop, Collection alleles, double negLog10PError, Set filters, Map attributes) { - this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, null, true); + public VariantContext(String source, String contig, long start, long stop, Collection alleles, double negLog10PError, Set filters, Map attributes, Byte referenceBaseForIndel) { + this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, referenceBaseForIndel, true); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java index 834ad0917..986d6305c 100755 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java @@ -657,12 +657,84 @@ public class VariantContextUtils { VariantContext merged = new VariantContext(name, loc.getContig(), loc.getStart(), loc.getStop(), alleles, genotypes, negLog10PError, filters, (mergeInfoWithMaxAC ? attributesWithMaxAC : attributes) ); // Trim the padded bases of all alleles if necessary - merged = AbstractVCFCodec.createVariantContextWithTrimmedAlleles(merged); + merged = createVariantContextWithTrimmedAlleles(merged); if ( printMessages && remapped ) System.out.printf("Remapped => %s%n", merged); return merged; } + public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) { + // see if we need to trim common reference base from all alleles + boolean trimVC; + + // We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles + Allele refAllele = inputVC.getReference(); + if (!inputVC.isVariant()) + trimVC = false; + else if (refAllele.isNull()) + trimVC = false; + else { + trimVC = (AbstractVCFCodec.computeForwardClipping(new ArrayList(inputVC.getAlternateAlleles()), + inputVC.getReference().getDisplayString()) > 0); + } + + // nothing to do if we don't need to trim bases + if (trimVC) { + List alleles = new ArrayList(); + Map genotypes = new TreeMap(); + + // set the reference base for indels in the attributes + Map attributes = new TreeMap(inputVC.getAttributes()); + + Map originalToTrimmedAlleleMap = new HashMap(); + + for (Allele a : inputVC.getAlleles()) { + if (a.isSymbolic()) { + alleles.add(a); + originalToTrimmedAlleleMap.put(a, a); + } else { + // get bases for current allele and create a new one with trimmed bases + byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length()); + Allele trimmedAllele = Allele.create(newBases, a.isReference()); + alleles.add(trimmedAllele); + originalToTrimmedAlleleMap.put(a, trimmedAllele); + } + } + + // detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation + // example: mixed records such as {TA*,TGA,TG} + boolean hasNullAlleles = false; + + for (Allele a: originalToTrimmedAlleleMap.values()) { + if (a.isNull()) + hasNullAlleles = true; + if (a.isReference()) + refAllele = a; + } + + if (!hasNullAlleles) + return inputVC; + // now we can recreate new genotypes with trimmed alleles + for ( Map.Entry sample : inputVC.getGenotypes().entrySet() ) { + + List originalAlleles = sample.getValue().getAlleles(); + List trimmedAlleles = new ArrayList(); + for ( Allele a : originalAlleles ) { + if ( a.isCalled() ) + trimmedAlleles.add(originalToTrimmedAlleleMap.get(a)); + else + trimmedAlleles.add(Allele.NO_CALL); + } + genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles)); + + } + return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0])); + + } + + return inputVC; + } + public static Map stripPLs(Map genotypes) { Map newGs = new HashMap(genotypes.size()); From 09a729da3aba44d319c603a93058e902fb7a1aa0 Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Thu, 25 Aug 2011 15:42:52 -0400 Subject: [PATCH 19/31] Removing incorrect comment --- .../broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java | 1 - 1 file changed, 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java index 3a4ca2478..bb212e128 100755 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java @@ -290,7 +290,6 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, if ( !header.samplesWereAlreadySorted() ) vc.getGenotypes(); - // Trim bases of all alleles if necessary return vc; } From e01273ca7cbdcfab86fb228d8bfd6faf4c8a84d0 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 16:57:11 -0400 Subject: [PATCH 20/31] Queue now writes out queueJobReport.pdf -- General purpose RScript executor in java (please use when invoking RScripts) -- Removed groupName. This is now analysisName -- Explicitly added capability to enable/disable individual QFunction --- .../sting/utils/R/RScriptExecutor.java | 119 ++++++++++++++++++ .../sting/queue/QCommandLine.scala | 3 +- .../sting/queue/engine/QGraphSettings.scala | 4 + .../sting/queue/util/QJobReport.scala | 36 ++++-- 4 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java diff --git a/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java b/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java new file mode 100644 index 000000000..868ea89b5 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.utils.R; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.commandline.Advanced; +import org.broadinstitute.sting.commandline.Argument; +import org.broadinstitute.sting.commandline.ArgumentCollection; +import org.broadinstitute.sting.gatk.walkers.recalibration.Covariate; +import org.broadinstitute.sting.utils.PathUtils; +import org.broadinstitute.sting.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * Generic service for executing RScripts in the GATK directory + * + * @author Your Name + * @since Date created + */ +public class RScriptExecutor { + /** + * our log + */ + protected static Logger logger = Logger.getLogger(RScriptExecutor.class); + + public static class RScriptArgumentCollection { + @Advanced + @Argument(fullName = "path_to_Rscript", shortName = "Rscript", doc = "The path to your implementation of Rscript. For Broad users this is maybe /broad/tools/apps/R-2.6.0/bin/Rscript", required = false) + private String PATH_TO_RSCRIPT = "Rscript"; + + @Advanced + @Argument(fullName = "path_to_resources", shortName = "resources", doc = "Path to resources folder holding the Sting R scripts.", required = false) + private List PATH_TO_RESOURCES = Arrays.asList("public/R/", "private/R/"); + } + + final RScriptArgumentCollection myArgs; + final boolean exceptOnError; + + public RScriptExecutor(final RScriptArgumentCollection myArgs, final boolean exceptOnError) { + this.myArgs = myArgs; + this.exceptOnError = exceptOnError; + } + + public void callRScripts(String scriptName, String... scriptArgs) { + callRScripts(scriptName, Arrays.asList(scriptArgs)); + } + + public void callRScripts(String scriptName, List scriptArgs) { + try { + final File pathToScript = findScript(scriptName); + if ( pathToScript == null ) return; // we failed but shouldn't exception out + final String argString = Utils.join(" ", scriptArgs); + final String cmdLine = Utils.join(" ", Arrays.asList(myArgs.PATH_TO_RSCRIPT, pathToScript, argString)); + logger.info("Executing RScript: " + cmdLine); + Runtime.getRuntime().exec(cmdLine).waitFor(); + } catch (InterruptedException e) { + generateException(e); + } catch (IOException e) { + generateException("Fatal Exception: Perhaps RScript jobs are being spawned too quickly?", e); + } + } + + public File findScript(final String scriptName) { + for ( String pathToResource : myArgs.PATH_TO_RESOURCES ) { + final File f = new File(pathToResource + "/" + scriptName); + if ( f.exists() ) { + if ( f.canRead() ) + return f; + else + generateException("Script exists but couldn't be read: " + scriptName); + } + } + + generateException("Couldn't find script: " + scriptName + " in " + myArgs.PATH_TO_RSCRIPT); + return null; + } + + private void generateException(String msg) { + generateException(msg, null); + } + + private void generateException(Throwable e) { + generateException("", e); + } + + private void generateException(String msg, Throwable e) { + if ( exceptOnError ) + throw new RuntimeException(msg, e); + else + logger.warn(msg + (e == null ? "" : ":" + e.getMessage())); + } +} diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index d877575df..138003cdd 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -118,7 +118,8 @@ class QCommandLine extends CommandLineProgram with Logging { if ( ! settings.disableJobReport ) { logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile) QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile) - // todo -- execute Rscript here once generic RScript execution system is implemented } + QJobReport.plotReport(settings.rScriptArgs, settings.jobReportFile) + } } if (!qGraph.success) { diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index dbc3e3886..46063fc24 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -28,6 +28,7 @@ import java.io.File import org.broadinstitute.sting.queue.QSettings import org.broadinstitute.sting.queue.util.SystemUtils import org.broadinstitute.sting.commandline.{Advanced, ArgumentCollection, Argument} +import org.broadinstitute.sting.utils.R.RScriptExecutor /** * Command line options for a QGraph. @@ -76,6 +77,9 @@ class QGraphSettings { @Argument(fullName="disableJobReport", shortName="disabpleJobReport", doc="If provided, we will not create a job report", required=false) var disableJobReport: Boolean = false + @ArgumentCollection + var rScriptArgs = new RScriptExecutor.RScriptArgumentCollection + @ArgumentCollection val qSettings = new QSettings } diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index 3e393e4c0..c84f0b17a 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -29,22 +29,28 @@ import org.broadinstitute.sting.utils.exceptions.UserException import org.broadinstitute.sting.queue.engine.JobRunInfo import java.io.{FileOutputStream, PrintStream, File} import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction} +import org.broadinstitute.sting.utils.R.RScriptExecutor.RScriptArgumentCollection +import org.broadinstitute.sting.utils.R.RScriptExecutor +import org.broadinstitute.sting.queue.QScript /** * A mixin to add Job info to the class */ -// todo -- need to enforce QFunction to have copySettingTo work trait QJobReport extends Logging { self: QFunction => - // todo -- might make more sense to mix in the variables protected var reportGroup: String = null protected var reportFeatures: Map[String, String] = Map() + protected var reportEnabled: Boolean = true + + def includeInReport = reportEnabled + def enableReport() { reportEnabled = true } + def disableReport() { reportEnabled = false } - def includeInReport = getReportGroup != null def setRunInfo(info: JobRunInfo) { logger.info("info " + info) reportFeatures = reportFeatures ++ Map( + "iteration" -> 1, "analysisName" -> self.analysisName, "jobName" -> QJobReport.workAroundSameJobNames(this), "intermediate" -> self.isIntermediate, @@ -53,9 +59,15 @@ trait QJobReport extends Logging { "formattedStartTime" -> info.getFormattedStartTime, "formattedDoneTime" -> info.getFormattedDoneTime, "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + +// // handle the special case of iterations +// reportFeatures.get("iteration") match { +// case None => reportFeatures("iteration") = 1 +// case _ => ; +// } } - def getReportGroup = reportGroup + def getReportGroup = analysisName def getReportFeatures = reportFeatures def getReportFeatureNames: List[String] = getReportFeatures.keys.toList @@ -68,24 +80,20 @@ trait QJobReport extends Logging { def getReportName: String = getReportFeature("jobName") - def configureJobReport(group: String) { - this.reportGroup = group - } - - def configureJobReport(group: String, features: Map[String, Any]) { - this.reportGroup = group + def configureJobReport(features: Map[String, Any]) { this.reportFeatures = features.mapValues(_.toString) } // copy the QJobReport information -- todo : what's the best way to do this? override def copySettingsTo(function: QFunction) { self.copySettingsTo(function) - function.reportGroup = this.reportGroup function.reportFeatures = this.reportFeatures } } object QJobReport { + val JOB_REPORT_QUEUE_SCRIPT = "queueJobReport.R" + // todo -- fixme to have a unique name for Scatter/gather jobs as well var seenCounter = 1 var seenNames = Set[String]() @@ -98,6 +106,12 @@ object QJobReport { stream.close() } + def plotReport(args: RScriptArgumentCollection, jobReportFile: File) { + val executor = new RScriptExecutor(args, false) // don't except on error + val pdf = jobReportFile.getAbsolutePath + ".pdf" + executor.callRScripts(JOB_REPORT_QUEUE_SCRIPT, jobReportFile.getAbsolutePath, pdf) + } + def workAroundSameJobNames(func: QFunction):String = { if ( seenNames.apply(func.jobName) ) { seenCounter += 1 From e03dfdb0ab318ed740f05a263f3fccee2c9d0521 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 25 Aug 2011 16:59:02 -0400 Subject: [PATCH 21/31] Automatic iteration field addition works properly. --- .../src/org/broadinstitute/sting/queue/util/QJobReport.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala index c84f0b17a..a1fa10cd8 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -49,7 +49,7 @@ trait QJobReport extends Logging { def setRunInfo(info: JobRunInfo) { logger.info("info " + info) - reportFeatures = reportFeatures ++ Map( + reportFeatures = Map( "iteration" -> 1, "analysisName" -> self.analysisName, "jobName" -> QJobReport.workAroundSameJobNames(this), @@ -58,7 +58,7 @@ trait QJobReport extends Logging { "doneTime" -> info.getDoneTime.getTime, "formattedStartTime" -> info.getFormattedStartTime, "formattedDoneTime" -> info.getFormattedDoneTime, - "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") ++ reportFeatures // // handle the special case of iterations // reportFeatures.get("iteration") match { From 9b7512fd94f532d18b6793d2c81b03c2a306656c Mon Sep 17 00:00:00 2001 From: Eric Banks Date: Thu, 25 Aug 2011 22:42:14 -0400 Subject: [PATCH 25/31] Just because there's a ref base doesn't mean the VC needs to be padded --- .../sting/gatk/walkers/variantutils/VariantsToTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java index 7d0b4c3d4..0cf425e6f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java @@ -271,7 +271,7 @@ public class VariantsToTable extends RodWalker { getters.put("REF", new Getter() { public String get(VariantContext vc) { String x = ""; - if ( vc.hasReferenceBaseForIndel() ) { + if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) { Byte refByte = vc.getReferenceBaseForIndel(); x=x+new String(new byte[]{refByte}); } @@ -283,7 +283,7 @@ public class VariantsToTable extends RodWalker { StringBuilder x = new StringBuilder(); int n = vc.getAlternateAlleles().size(); if ( n == 0 ) return "."; - if ( vc.hasReferenceBaseForIndel() ) { + if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) { Byte refByte = vc.getReferenceBaseForIndel(); x.append(new String(new byte[]{refByte})); } From c0503283dff9902dd146acd4fa029f417574fec1 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 26 Aug 2011 07:40:44 -0400 Subject: [PATCH 28/31] Spelling fix requires md5 updates --- .../gatk/walkers/diffengine/DiffObjectsIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java index f9aaaecc1..1f11b5886 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java @@ -50,8 +50,8 @@ public class DiffObjectsIntegrationTest extends WalkerTest { @DataProvider(name = "data") public Object[][] createData() { - new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "92311de76dda3f38aac289d807ef23d0"); - new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "0c69412c385fda50210f2a612e1ffe4a"); + new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "dc1ca75c6ecf32641967d61e167acfff"); + new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "df0fcb568a3a49fc74830103b2e26f6c"); return TestParams.getTests(TestParams.class); } From 415d5d53010bc7bbcc0075654171b7b9cd62e34a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 26 Aug 2011 09:18:28 -0400 Subject: [PATCH 30/31] LSF long times are in seconds, convert to milliseconds to meet standard --- .../sting/queue/engine/lsf/Lsf706JobRunner.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index 02066c74f..166008c26 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -273,8 +273,9 @@ object Lsf706JobRunner extends Logging { logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo)) def updateRunInfo() { - runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue) - runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue) + // the platform LSF startTimes are in seconds, not milliseconds, so convert to the java convention + runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue * 1000) + runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue * 1000) } runner.updateStatus( From 0cb1605df0d7e84dda53535484d89e2bac4be614 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 26 Aug 2011 09:22:58 -0400 Subject: [PATCH 31/31] Clean documentation for JobRunInfo --- .../sting/queue/engine/JobRunInfo.scala | 48 +++++++------------ 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala index c99cbb1fc..03124a420 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -27,52 +27,38 @@ package org.broadinstitute.sting.queue.engine import java.util.Date import java.text.SimpleDateFormat -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - /** - * Base class containing all of the information about a job run. + * Class containing tracked information about a job run. */ + // todo -- it might be nice to have the hostname class JobRunInfo { - var startTime: Date = _ - var doneTime: Date = _ - var status: RunnerStatus.Value = RunnerStatus.DONE + /** constant date format */ + val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS"); - def getStatus = status + /** The start time with millisecond resolution of this job */ + var startTime: Date = _ + /** The done time with millisecond resolution of this job */ + var doneTime: Date = _ def getStartTime = startTime def getDoneTime = doneTime def getFormattedStartTime = formatTime(getStartTime) def getFormattedDoneTime = formatTime(getDoneTime) - val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS"); + /** Helper function that pretty prints the date */ private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" + /** + * Was any information set for this jobInfo? JobInfo can be unset because + * the job never ran or because it already completed. + */ def isFilledIn = startTime != null + /** + * How long did the job run (in wall time)? Returns -1 if this jobInfo isn't filled in + */ def getRuntimeInMs: Long = { - if ( getDoneTime != null && getStartTime != null ) + if ( isFilledIn ) getDoneTime.getTime - getStartTime.getTime else -1