diff --git a/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java b/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java index ff992d77d..16358d05f 100644 --- a/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java +++ b/public/java/src/org/broadinstitute/sting/commandline/ArgumentTypeDescriptor.java @@ -379,10 +379,14 @@ class RodBindingArgumentTypeDescriptor extends ArgumentTypeDescriptor { } if ( tribbleType == null ) - throw new UserException.CommandLineException( - String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " + - "Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s", - manager.userFriendlyListOfAvailableFeatures(parameterType))); + if ( ! file.canRead() | !! file.isFile() ) { + throw new UserException.BadArgumentValue(name, "Couldn't read file to determine type: " + file); + } else { + throw new UserException.CommandLineException( + String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " + + "Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s", + manager.userFriendlyListOfAvailableFeatures(parameterType))); + } } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 48fd73e0b..65ff27497 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -48,9 +48,10 @@ public class LinearMicroScheduler extends MicroScheduler { walker.initialize(); Accumulator accumulator = Accumulator.create(engine,walker); + boolean done = walker.isDone(); int counter = 0; for (Shard shard : shardStrategy ) { - if ( shard == null ) // we ran out of shards that aren't owned + if ( done || shard == null ) // we ran out of shards that aren't owned break; if(shard.getShardType() == Shard.ShardType.LOCUS) { @@ -61,6 +62,7 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); + if ( walker.isDone() ) break; } windowMaker.close(); } @@ -70,6 +72,8 @@ public class LinearMicroScheduler extends MicroScheduler { accumulator.accumulate(dataProvider,result); dataProvider.close(); } + + done = walker.isDone(); } Object result = accumulator.finishTraversal(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java index 1ba48ca5f..046003154 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseDuplicates.java @@ -173,7 +173,9 @@ public class TraverseDuplicates extends TraversalEngine those with the same mate pair position, for paired reads * -> those flagged as unpaired and duplicated but having the same start and end */ + boolean done = walker.isDone(); for (SAMRecord read : iter) { + if ( done ) break; // get the genome loc from the read GenomeLoc site = engine.getGenomeLocParser().createGenomeLoc(read); @@ -194,6 +196,7 @@ public class TraverseDuplicates extends TraversalEngine extends TraversalEngine,Locu logger.debug(String.format("TraverseLoci.traverse: Shard is %s", dataProvider)); LocusView locusView = getLocusView( walker, dataProvider ); + boolean done = false; if ( locusView.hasNext() ) { // trivial optimization to avoid unnecessary processing when there's nothing here at all @@ -46,7 +47,7 @@ public class TraverseLoci extends TraversalEngine,Locu LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider ); // We keep processing while the next reference location is within the interval - while( locusView.hasNext() ) { + while( locusView.hasNext() && ! done ) { AlignmentContext locus = locusView.next(); GenomeLoc location = locus.getLocation(); @@ -76,15 +77,17 @@ public class TraverseLoci extends TraversalEngine,Locu if (keepMeP) { M x = walker.map(tracker, refContext, locus); sum = walker.reduce(x, sum); + done = walker.isDone(); } printProgress(dataProvider.getShard(),locus.getLocation()); } } - // We have a final map call to execute here to clean up the skipped based from the - // last position in the ROD to that in the interval - if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA ) { + // We have a final map call to execute here to clean up the skipped based from the + // last position in the ROD to that in the interval + if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA && ! walker.isDone() ) { + // only do this if the walker isn't done! RodLocusView rodLocusView = (RodLocusView)locusView; long nSkipped = rodLocusView.getLastSkippedBases(); if ( nSkipped > 0 ) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java index 196d54036..dd4402d82 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -50,7 +50,9 @@ public class TraverseReadPairs extends TraversalEngine pairs = new ArrayList(); + boolean done = walker.isDone(); for(SAMRecord read: reads) { + if ( done ) break; dataProvider.getShard().getReadMetrics().incrementNumReadsSeen(); if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) { @@ -65,6 +67,8 @@ public class TraverseReadPairs extends TraversalEngine extends TraversalEngine,Read // get the reference ordered data ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + boolean done = walker.isDone(); // while we still have more reads for (SAMRecord read : reads) { + if ( done ) break; // ReferenceContext -- the reference bases covered by the read ReferenceContext refContext = null; @@ -106,6 +108,7 @@ public class TraverseReads extends TraversalEngine,Read GenomeLoc locus = read.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX ? null : engine.getGenomeLocParser().createGenomeLoc(read.getReferenceName(),read.getAlignmentStart()); printProgress(dataProvider.getShard(),locus); + done = walker.isDone(); } return sum; } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java index 9e261a0b1..c88c7c3c4 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java @@ -126,6 +126,17 @@ public abstract class Walker { public void initialize() { } + /** + * A function for overloading in subclasses providing a mechanism to abort early from a walker. + * + * If this ever returns true, then the Traversal engine will stop executing map calls + * and start the process of shutting down the walker in an orderly fashion. + * @return + */ + public boolean isDone() { + return false; + } + /** * Provide an initial value for reduce computations. * @return Initial value of reduce. diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java index 4e3342609..2159bc839 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/diffengine/DiffEngine.java @@ -234,7 +234,7 @@ public class DiffEngine { // now that we have a specific list of values we want to show, display them GATKReport report = new GATKReport(); - final String tableName = "diffences"; + final String tableName = "differences"; report.addTable(tableName, "Summarized differences between the master and test files. See http://www.broadinstitute.org/gsa/wiki/index.php/DiffEngine for more information", false); GATKReportTable table = report.getTable(tableName); table.addPrimaryKey("Difference", true); diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java index 7d0b4c3d4..2a877fb09 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantutils/VariantsToTable.java @@ -151,22 +151,21 @@ public class VariantsToTable extends RodWalker { if ( tracker == null ) // RodWalkers can make funky map calls return 0; - if ( ++nRecords < MAX_RECORDS || MAX_RECORDS == -1 ) { - for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) { - if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) { - List vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA, keepMultiAllelic, logACSum); - out.println(Utils.join("\t", vals)); - } + for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) { + if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) { + List vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA, keepMultiAllelic, logACSum); + out.println(Utils.join("\t", vals)); } - - return 1; - } else { - if ( nRecords >= MAX_RECORDS ) { - logger.warn("Calling sys exit to leave after " + nRecords + " records"); - System.exit(0); // todo -- what's the recommend way to abort like this? - } - return 0; } + + return 1; + } + + @Override + public boolean isDone() { + boolean done = MAX_RECORDS != -1 && nRecords >= MAX_RECORDS; + if ( done) logger.warn("isDone() will return true to leave after " + nRecords + " records"); + return done ; } private static final boolean isWildCard(String s) { @@ -271,7 +270,7 @@ public class VariantsToTable extends RodWalker { getters.put("REF", new Getter() { public String get(VariantContext vc) { String x = ""; - if ( vc.hasReferenceBaseForIndel() ) { + if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) { Byte refByte = vc.getReferenceBaseForIndel(); x=x+new String(new byte[]{refByte}); } @@ -283,7 +282,7 @@ public class VariantsToTable extends RodWalker { StringBuilder x = new StringBuilder(); int n = vc.getAlternateAlleles().size(); if ( n == 0 ) return "."; - if ( vc.hasReferenceBaseForIndel() ) { + if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) { Byte refByte = vc.getReferenceBaseForIndel(); x.append(new String(new byte[]{refByte})); } diff --git a/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java b/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java new file mode 100644 index 000000000..868ea89b5 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/R/RScriptExecutor.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.utils.R; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.commandline.Advanced; +import org.broadinstitute.sting.commandline.Argument; +import org.broadinstitute.sting.commandline.ArgumentCollection; +import org.broadinstitute.sting.gatk.walkers.recalibration.Covariate; +import org.broadinstitute.sting.utils.PathUtils; +import org.broadinstitute.sting.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * Generic service for executing RScripts in the GATK directory + * + * @author Your Name + * @since Date created + */ +public class RScriptExecutor { + /** + * our log + */ + protected static Logger logger = Logger.getLogger(RScriptExecutor.class); + + public static class RScriptArgumentCollection { + @Advanced + @Argument(fullName = "path_to_Rscript", shortName = "Rscript", doc = "The path to your implementation of Rscript. For Broad users this is maybe /broad/tools/apps/R-2.6.0/bin/Rscript", required = false) + private String PATH_TO_RSCRIPT = "Rscript"; + + @Advanced + @Argument(fullName = "path_to_resources", shortName = "resources", doc = "Path to resources folder holding the Sting R scripts.", required = false) + private List PATH_TO_RESOURCES = Arrays.asList("public/R/", "private/R/"); + } + + final RScriptArgumentCollection myArgs; + final boolean exceptOnError; + + public RScriptExecutor(final RScriptArgumentCollection myArgs, final boolean exceptOnError) { + this.myArgs = myArgs; + this.exceptOnError = exceptOnError; + } + + public void callRScripts(String scriptName, String... scriptArgs) { + callRScripts(scriptName, Arrays.asList(scriptArgs)); + } + + public void callRScripts(String scriptName, List scriptArgs) { + try { + final File pathToScript = findScript(scriptName); + if ( pathToScript == null ) return; // we failed but shouldn't exception out + final String argString = Utils.join(" ", scriptArgs); + final String cmdLine = Utils.join(" ", Arrays.asList(myArgs.PATH_TO_RSCRIPT, pathToScript, argString)); + logger.info("Executing RScript: " + cmdLine); + Runtime.getRuntime().exec(cmdLine).waitFor(); + } catch (InterruptedException e) { + generateException(e); + } catch (IOException e) { + generateException("Fatal Exception: Perhaps RScript jobs are being spawned too quickly?", e); + } + } + + public File findScript(final String scriptName) { + for ( String pathToResource : myArgs.PATH_TO_RESOURCES ) { + final File f = new File(pathToResource + "/" + scriptName); + if ( f.exists() ) { + if ( f.canRead() ) + return f; + else + generateException("Script exists but couldn't be read: " + scriptName); + } + } + + generateException("Couldn't find script: " + scriptName + " in " + myArgs.PATH_TO_RSCRIPT); + return null; + } + + private void generateException(String msg) { + generateException(msg, null); + } + + private void generateException(Throwable e) { + generateException("", e); + } + + private void generateException(String msg, Throwable e) { + if ( exceptOnError ) + throw new RuntimeException(msg, e); + else + logger.warn(msg + (e == null ? "" : ":" + e.getMessage())); + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java index a3100030e..bb212e128 100755 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/AbstractVCFCodec.java @@ -281,7 +281,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, VariantContext vc = null; try { - vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes); + vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes, ref.getBytes()[0]); } catch (Exception e) { generateException(e.getMessage()); } @@ -290,8 +290,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, if ( !header.samplesWereAlreadySorted() ) vc.getGenotypes(); - // Trim bases of all alleles if necessary - return createVariantContextWithTrimmedAlleles(vc); + return vc; } /** @@ -516,25 +515,44 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, return true; } - private static int computeForwardClipping(List unclippedAlleles, String ref) { + public static int computeForwardClipping(List unclippedAlleles, String ref) { boolean clipping = true; - // Note that the computation of forward clipping here is meant only to see whether there is a common - // base to all alleles, and to correctly compute reverse clipping, - // but it is not used for actually changing alleles - this is done in function - // createVariantContextWithTrimmedAlleles() below. - for (Allele a : unclippedAlleles) { - if (a.isSymbolic()) { + for ( Allele a : unclippedAlleles ) { + if ( a.isSymbolic() ) continue; - } - if (a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0])) { + + if ( a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0]) ) { clipping = false; + break; } } - return (clipping) ? 1 : 0; + return (clipping) ? 1 : 0; } + protected static int computeReverseClipping(List unclippedAlleles, String ref, int forwardClipping, int lineNo) { + int clipping = 0; + boolean stillClipping = true; + + while ( stillClipping ) { + for ( Allele a : unclippedAlleles ) { + if ( a.isSymbolic() ) + continue; + + if ( a.length() - clipping <= forwardClipping || a.length() - forwardClipping == 0 ) + stillClipping = false; + else if ( ref.length() == clipping ) + generateException("bad alleles encountered", lineNo); + else if ( a.getBases()[a.length()-clipping-1] != ref.getBytes()[ref.length()-clipping-1] ) + stillClipping = false; + } + if ( stillClipping ) + clipping++; + } + + return clipping; + } /** * clip the alleles, based on the reference * @@ -542,122 +560,30 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec, * @param ref the reference string * @param unclippedAlleles the list of unclipped alleles * @param clippedAlleles output list of clipped alleles + * @param lineNo the current line number in the file * @return the new reference end position of this event */ protected static int clipAlleles(int position, String ref, List unclippedAlleles, List clippedAlleles, int lineNo) { - // Note that the computation of forward clipping here is meant only to see whether there is a common - // base to all alleles, and to correctly compute reverse clipping, - // but it is not used for actually changing alleles - this is done in function - // createVariantContextWithTrimmedAlleles() below. - int forwardClipping = computeForwardClipping(unclippedAlleles, ref); - - int reverseClipped = 0; - boolean clipping = true; - while (clipping) { - for (Allele a : unclippedAlleles) { - if (a.isSymbolic()) { - continue; - } - if (a.length() - reverseClipped <= forwardClipping || a.length() - forwardClipping == 0) - clipping = false; - else if (ref.length() == reverseClipped) - generateException("bad alleles encountered", lineNo); - else if (a.getBases()[a.length()-reverseClipped-1] != ref.getBytes()[ref.length()-reverseClipped-1]) - clipping = false; - } - if (clipping) reverseClipped++; - } + int reverseClipping = computeReverseClipping(unclippedAlleles, ref, forwardClipping, lineNo); if ( clippedAlleles != null ) { for ( Allele a : unclippedAlleles ) { if ( a.isSymbolic() ) { clippedAlleles.add(a); } else { - clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(),0,a.getBases().length-reverseClipped),a.isReference())); + clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(), forwardClipping, a.getBases().length-reverseClipping), a.isReference())); } } } // the new reference length - int refLength = ref.length() - reverseClipped; + int refLength = ref.length() - reverseClipping; return position+Math.max(refLength - 1,0); } - public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) { - // see if we need to trim common reference base from all alleles - boolean trimVC; - - // We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles - Allele refAllele = inputVC.getReference(); - if (!inputVC.isVariant()) - trimVC = false; - else if (refAllele.isNull()) - trimVC = false; - else { - trimVC = (computeForwardClipping(new ArrayList(inputVC.getAlternateAlleles()), - inputVC.getReference().getDisplayString()) > 0); - } - - // nothing to do if we don't need to trim bases - if (trimVC) { - List alleles = new ArrayList(); - Map genotypes = new TreeMap(); - - // set the reference base for indels in the attributes - Map attributes = new TreeMap(inputVC.getAttributes()); - - Map originalToTrimmedAlleleMap = new HashMap(); - - for (Allele a : inputVC.getAlleles()) { - if (a.isSymbolic()) { - alleles.add(a); - originalToTrimmedAlleleMap.put(a, a); - } else { - // get bases for current allele and create a new one with trimmed bases - byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length()); - Allele trimmedAllele = Allele.create(newBases, a.isReference()); - alleles.add(trimmedAllele); - originalToTrimmedAlleleMap.put(a, trimmedAllele); - } - } - - // detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation - // example: mixed records such as {TA*,TGA,TG} - boolean hasNullAlleles = false; - - for (Allele a: originalToTrimmedAlleleMap.values()) { - if (a.isNull()) - hasNullAlleles = true; - if (a.isReference()) - refAllele = a; - } - - if (!hasNullAlleles) - return inputVC; - // now we can recreate new genotypes with trimmed alleles - for ( Map.Entry sample : inputVC.getGenotypes().entrySet() ) { - - List originalAlleles = sample.getValue().getAlleles(); - List trimmedAlleles = new ArrayList(); - for ( Allele a : originalAlleles ) { - if ( a.isCalled() ) - trimmedAlleles.add(originalToTrimmedAlleleMap.get(a)); - else - trimmedAlleles.add(Allele.NO_CALL); - } - genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles)); - - } - return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0])); - - } - - return inputVC; - } - public final static boolean canDecodeFile(final File potentialInput, final String MAGIC_HEADER_LINE) { try { return isVCFStream(new FileInputStream(potentialInput), MAGIC_HEADER_LINE) || diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java index 1fde8879b..673fe4529 100755 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContext.java @@ -258,9 +258,10 @@ public class VariantContext implements Feature { // to enable tribble intergrati * @param negLog10PError qual * @param filters filters: use null for unfiltered and empty set for passes filters * @param attributes attributes + * @param referenceBaseForIndel padded reference base */ - public VariantContext(String source, String contig, long start, long stop, Collection alleles, double negLog10PError, Set filters, Map attributes) { - this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, null, true); + public VariantContext(String source, String contig, long start, long stop, Collection alleles, double negLog10PError, Set filters, Map attributes, Byte referenceBaseForIndel) { + this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, referenceBaseForIndel, true); } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java index 834ad0917..986d6305c 100755 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/VariantContextUtils.java @@ -657,12 +657,84 @@ public class VariantContextUtils { VariantContext merged = new VariantContext(name, loc.getContig(), loc.getStart(), loc.getStop(), alleles, genotypes, negLog10PError, filters, (mergeInfoWithMaxAC ? attributesWithMaxAC : attributes) ); // Trim the padded bases of all alleles if necessary - merged = AbstractVCFCodec.createVariantContextWithTrimmedAlleles(merged); + merged = createVariantContextWithTrimmedAlleles(merged); if ( printMessages && remapped ) System.out.printf("Remapped => %s%n", merged); return merged; } + public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) { + // see if we need to trim common reference base from all alleles + boolean trimVC; + + // We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles + Allele refAllele = inputVC.getReference(); + if (!inputVC.isVariant()) + trimVC = false; + else if (refAllele.isNull()) + trimVC = false; + else { + trimVC = (AbstractVCFCodec.computeForwardClipping(new ArrayList(inputVC.getAlternateAlleles()), + inputVC.getReference().getDisplayString()) > 0); + } + + // nothing to do if we don't need to trim bases + if (trimVC) { + List alleles = new ArrayList(); + Map genotypes = new TreeMap(); + + // set the reference base for indels in the attributes + Map attributes = new TreeMap(inputVC.getAttributes()); + + Map originalToTrimmedAlleleMap = new HashMap(); + + for (Allele a : inputVC.getAlleles()) { + if (a.isSymbolic()) { + alleles.add(a); + originalToTrimmedAlleleMap.put(a, a); + } else { + // get bases for current allele and create a new one with trimmed bases + byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length()); + Allele trimmedAllele = Allele.create(newBases, a.isReference()); + alleles.add(trimmedAllele); + originalToTrimmedAlleleMap.put(a, trimmedAllele); + } + } + + // detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation + // example: mixed records such as {TA*,TGA,TG} + boolean hasNullAlleles = false; + + for (Allele a: originalToTrimmedAlleleMap.values()) { + if (a.isNull()) + hasNullAlleles = true; + if (a.isReference()) + refAllele = a; + } + + if (!hasNullAlleles) + return inputVC; + // now we can recreate new genotypes with trimmed alleles + for ( Map.Entry sample : inputVC.getGenotypes().entrySet() ) { + + List originalAlleles = sample.getValue().getAlleles(); + List trimmedAlleles = new ArrayList(); + for ( Allele a : originalAlleles ) { + if ( a.isCalled() ) + trimmedAlleles.add(originalToTrimmedAlleleMap.get(a)); + else + trimmedAlleles.add(Allele.NO_CALL); + } + genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles)); + + } + return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0])); + + } + + return inputVC; + } + public static Map stripPLs(Map genotypes) { Map newGs = new HashMap(genotypes.size()); diff --git a/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java b/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java index f9aaaecc1..1f11b5886 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/walkers/diffengine/DiffObjectsIntegrationTest.java @@ -50,8 +50,8 @@ public class DiffObjectsIntegrationTest extends WalkerTest { @DataProvider(name = "data") public Object[][] createData() { - new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "92311de76dda3f38aac289d807ef23d0"); - new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "0c69412c385fda50210f2a612e1ffe4a"); + new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "dc1ca75c6ecf32641967d61e167acfff"); + new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "df0fcb568a3a49fc74830103b2e26f6c"); return TestParams.getTests(TestParams.class); } diff --git a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala index f19d60930..138003cdd 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QCommandLine.scala @@ -95,7 +95,8 @@ class QCommandLine extends CommandLineProgram with Logging { def execute = { qGraph.settings = settings - for (script <- pluginManager.createAllTypes()) { + val allQScripts = pluginManager.createAllTypes(); + for (script <- allQScripts) { logger.info("Scripting " + pluginManager.getName(script.getClass.asSubclass(classOf[QScript]))) loadArgumentsIntoObject(script) try { @@ -108,14 +109,24 @@ class QCommandLine extends CommandLineProgram with Logging { logger.info("Added " + script.functions.size + " functions") } + // Execute the job graph qGraph.run() + // walk over each script, calling onExecutionDone + for (script <- allQScripts) { + script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success) + if ( ! settings.disableJobReport ) { + logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile) + QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile) + QJobReport.plotReport(settings.rScriptArgs, settings.jobReportFile) + } + } + if (!qGraph.success) { logger.info("Done with errors") qGraph.logFailed() 1 } else { - logger.info("Done") 0 } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala index 5cb8d1d29..3120d5d62 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/QScript.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/QScript.scala @@ -24,6 +24,7 @@ package org.broadinstitute.sting.queue +import engine.JobRunInfo import org.broadinstitute.sting.queue.function.QFunction import annotation.target.field import io.Source @@ -57,6 +58,15 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon */ def script() + /** + * A default handler for the onExecutionDone() function. By default this doesn't do anything + * except print out a fine status message. + */ + def onExecutionDone(jobs: Map[QFunction, JobRunInfo], success: Boolean) { + logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", jobs.size)) + for ( (f, info) <- jobs ) logger.info(" %s %s".format(f.jobName, info)) + } + /** * The command line functions that will be executed for this QScript. */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala index 68bc7ae61..ef7f2afb0 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/FunctionEdge.scala @@ -23,6 +23,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod */ var depth = -1 + val myRunInfo: JobRunInfo = JobRunInfo.default // purely for dryRun testing + /** * Initializes with the current status of the function. */ @@ -179,4 +181,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod printWriter.close IOUtils.writeContents(functionErrorFile, stackTrace.toString) } + + def getRunInfo = { + if ( runner == null ) myRunInfo else runner.getRunInfo + } } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala index d583a55ef..25baaca4f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/InProcessRunner.scala @@ -1,7 +1,8 @@ package org.broadinstitute.sting.queue.engine import org.broadinstitute.sting.queue.function.InProcessFunction -import org.broadinstitute.sting.queue.util.IOUtils +import java.util.Date +import org.broadinstitute.sting.queue.util.{Logging, IOUtils} /** * Runs a function that executes in process and does not fork out an external process. @@ -10,8 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce private var runStatus: RunnerStatus.Value = _ def start() = { + getRunInfo.startTime = new Date() runStatus = RunnerStatus.RUNNING + function.run() + + getRunInfo.doneTime = new Date() val content = "%s%nDone.".format(function.description) IOUtils.writeContents(function.jobOutputFile, content) runStatus = RunnerStatus.DONE diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala new file mode 100644 index 000000000..03124a420 --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunInfo.scala @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.engine + +import java.util.Date +import java.text.SimpleDateFormat + +/** + * Class containing tracked information about a job run. + */ + // todo -- it might be nice to have the hostname +class JobRunInfo { + /** constant date format */ + val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS"); + + /** The start time with millisecond resolution of this job */ + var startTime: Date = _ + /** The done time with millisecond resolution of this job */ + var doneTime: Date = _ + + def getStartTime = startTime + def getDoneTime = doneTime + def getFormattedStartTime = formatTime(getStartTime) + def getFormattedDoneTime = formatTime(getDoneTime) + + /** Helper function that pretty prints the date */ + private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null" + + /** + * Was any information set for this jobInfo? JobInfo can be unset because + * the job never ran or because it already completed. + */ + def isFilledIn = startTime != null + + /** + * How long did the job run (in wall time)? Returns -1 if this jobInfo isn't filled in + */ + def getRuntimeInMs: Long = { + if ( isFilledIn ) + getDoneTime.getTime - getStartTime.getTime + else + -1 + } + + override def toString: String = + "started %s ended %s runtime %s".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs) +} + +object JobRunInfo { + def default: JobRunInfo = new JobRunInfo() +} \ No newline at end of file diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala index de5fbde05..6dca5d89f 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/JobRunner.scala @@ -69,6 +69,12 @@ trait JobRunner[TFunction <: QFunction] { def cleanup() { } + /** + * Must be overloaded + */ + val runInfo = JobRunInfo.default + def getRunInfo = runInfo + /** * Calls back to a hook that an expert user can setup to modify a job. * @param value Value to modify. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala index 766d9db94..557230b05 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraph.scala @@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils import org.broadinstitute.sting.queue.util._ import collection.immutable.{TreeSet, TreeMap} import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction} +import java.util.Date /** * The internal dependency tracker between sets of function input and output files. @@ -319,7 +320,9 @@ class QGraph extends Logging { logger.debug("+++++++") foreachFunction(readyJobs.toList, edge => { if (running) { + edge.myRunInfo.startTime = new Date() logEdge(edge) + edge.myRunInfo.doneTime = new Date() edge.markAsDone } }) @@ -939,6 +942,14 @@ class QGraph extends Logging { edges.sorted(functionOrdering).foreach(edge => if (running) f(edge)) } + /** + * Utility function for running a method over all function edges. + * @param edgeFunction Function to run for each FunctionEdge. + */ + private def getFunctionEdges: List[FunctionEdge] = { + jobGraph.edgeSet.toList.filter(_.isInstanceOf[FunctionEdge]).asInstanceOf[List[FunctionEdge]] + } + /** * Utility function for running a method over all functions, but traversing the nodes in order of dependency. * @param edgeFunction Function to run for each FunctionEdge. @@ -1028,6 +1039,10 @@ class QGraph extends Logging { */ def isShutdown = !running + def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = { + getFunctionEdges.map(edge => (edge.function, edge.getRunInfo)).toMap + } + /** * Kills any forked jobs still running. */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala index 6ece600dd..46063fc24 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/QGraphSettings.scala @@ -26,8 +26,9 @@ package org.broadinstitute.sting.queue.engine import java.io.File import org.broadinstitute.sting.queue.QSettings -import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument} import org.broadinstitute.sting.queue.util.SystemUtils +import org.broadinstitute.sting.commandline.{Advanced, ArgumentCollection, Argument} +import org.broadinstitute.sting.utils.R.RScriptExecutor /** * Command line options for a QGraph. @@ -69,6 +70,16 @@ class QGraphSettings { @Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false) var expandedDotFile: File = _ + @Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false) + var jobReportFile: File = new File("queue_jobreport.gatkreport.txt") + + @Advanced + @Argument(fullName="disableJobReport", shortName="disabpleJobReport", doc="If provided, we will not create a job report", required=false) + var disableJobReport: Boolean = false + + @ArgumentCollection + var rScriptArgs = new RScriptExecutor.RScriptArgumentCollection + @ArgumentCollection val qSettings = new QSettings } diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala index bb711344c..166008c26 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/lsf/Lsf706JobRunner.scala @@ -31,11 +31,12 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat} import org.broadinstitute.sting.utils.Utils import org.broadinstitute.sting.jna.clibrary.LibC import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit} -import com.sun.jna.ptr.IntByReference import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} -import com.sun.jna.{Structure, StringArray, NativeLong} import java.util.regex.Pattern import java.lang.StringBuffer +import java.util.Date +import com.sun.jna.{Pointer, Structure, StringArray, NativeLong} +import com.sun.jna.ptr.{PointerByReference, IntByReference} /** * Runs jobs on an LSF compute cluster. @@ -271,12 +272,20 @@ object Lsf706JobRunner extends Logging { logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo)) + def updateRunInfo() { + // the platform LSF startTimes are in seconds, not milliseconds, so convert to the java convention + runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue * 1000) + runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue * 1000) + } + runner.updateStatus( if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) { // Done successfully. + updateRunInfo() RunnerStatus.DONE } else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) { // Exited function that (probably) won't be retried. + updateRunInfo() RunnerStatus.FAILED } else { // Note that we still saw the job in the system. diff --git a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala index 03f9d3315..4124f65a0 100755 --- a/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/engine/shell/ShellJobRunner.scala @@ -27,6 +27,7 @@ package org.broadinstitute.sting.queue.engine.shell import org.broadinstitute.sting.queue.function.CommandLineFunction import org.broadinstitute.sting.queue.util.ShellJob import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner} +import java.util.Date /** * Runs jobs one at a time locally @@ -50,8 +51,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu // Allow advanced users to update the job. updateJobRun(job) + getRunInfo.startTime = new Date() updateStatus(RunnerStatus.RUNNING) job.run() + getRunInfo.doneTime = new Date() updateStatus(RunnerStatus.DONE) } diff --git a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala index 7048b6413..c905581fa 100644 --- a/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala +++ b/public/scala/src/org/broadinstitute/sting/queue/function/QFunction.scala @@ -30,14 +30,14 @@ import org.broadinstitute.sting.commandline._ import org.broadinstitute.sting.queue.{QException, QSettings} import collection.JavaConversions._ import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction -import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, ReflectionUtils} +import org.broadinstitute.sting.queue.util._ /** * The base interface for all functions in Queue. * Inputs and outputs are specified as Sets of values. * Inputs are matched to other outputs by using .equals() */ -trait QFunction extends Logging { +trait QFunction extends Logging with QJobReport { /** A short description of this step in the graph */ var analysisName: String = "" @@ -83,11 +83,17 @@ trait QFunction extends Logging { */ var deleteIntermediateOutputs = true + // ------------------------------------------------------- + // + // job run information + // + // ------------------------------------------------------- + /** * Copies settings from this function to another function. * @param function QFunction to copy values to. */ - def copySettingsTo(function: QFunction) { + override def copySettingsTo(function: QFunction) { function.analysisName = this.analysisName function.jobName = this.jobName function.qSettings = this.qSettings @@ -99,6 +105,8 @@ trait QFunction extends Logging { function.updateJobRun = this.updateJobRun function.isIntermediate = this.isIntermediate function.deleteIntermediateOutputs = this.deleteIntermediateOutputs + function.reportGroup = this.reportGroup + function.reportFeatures = this.reportFeatures } /** File to redirect any output. Defaults to .out */ diff --git a/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala new file mode 100644 index 000000000..a1fa10cd8 --- /dev/null +++ b/public/scala/src/org/broadinstitute/sting/queue/util/QJobReport.scala @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.queue.util +import org.broadinstitute.sting.queue.function.QFunction +import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport} +import org.broadinstitute.sting.utils.exceptions.UserException +import org.broadinstitute.sting.queue.engine.JobRunInfo +import java.io.{FileOutputStream, PrintStream, File} +import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction} +import org.broadinstitute.sting.utils.R.RScriptExecutor.RScriptArgumentCollection +import org.broadinstitute.sting.utils.R.RScriptExecutor +import org.broadinstitute.sting.queue.QScript + +/** + * A mixin to add Job info to the class + */ +trait QJobReport extends Logging { + self: QFunction => + + protected var reportGroup: String = null + protected var reportFeatures: Map[String, String] = Map() + protected var reportEnabled: Boolean = true + + def includeInReport = reportEnabled + def enableReport() { reportEnabled = true } + def disableReport() { reportEnabled = false } + + def setRunInfo(info: JobRunInfo) { + logger.info("info " + info) + reportFeatures = Map( + "iteration" -> 1, + "analysisName" -> self.analysisName, + "jobName" -> QJobReport.workAroundSameJobNames(this), + "intermediate" -> self.isIntermediate, + "startTime" -> info.getStartTime.getTime, + "doneTime" -> info.getDoneTime.getTime, + "formattedStartTime" -> info.getFormattedStartTime, + "formattedDoneTime" -> info.getFormattedDoneTime, + "runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") ++ reportFeatures + +// // handle the special case of iterations +// reportFeatures.get("iteration") match { +// case None => reportFeatures("iteration") = 1 +// case _ => ; +// } + } + + def getReportGroup = analysisName + def getReportFeatures = reportFeatures + + def getReportFeatureNames: List[String] = getReportFeatures.keys.toList + def getReportFeature(key: String): String = { + getReportFeatures.get(key) match { + case Some(x) => x + case None => throw new RuntimeException("Get called with key %s but no value was found".format(key)) + } + } + + def getReportName: String = getReportFeature("jobName") + + def configureJobReport(features: Map[String, Any]) { + this.reportFeatures = features.mapValues(_.toString) + } + + // copy the QJobReport information -- todo : what's the best way to do this? + override def copySettingsTo(function: QFunction) { + self.copySettingsTo(function) + function.reportFeatures = this.reportFeatures + } +} + +object QJobReport { + val JOB_REPORT_QUEUE_SCRIPT = "queueJobReport.R" + + // todo -- fixme to have a unique name for Scatter/gather jobs as well + var seenCounter = 1 + var seenNames = Set[String]() + + def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) { + val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport) + jobs foreach {case (qf, info) => qf.setRunInfo(info)} + val stream = new PrintStream(new FileOutputStream(dest)) + printJobLogging(jobs.keys.toList, stream) + stream.close() + } + + def plotReport(args: RScriptArgumentCollection, jobReportFile: File) { + val executor = new RScriptExecutor(args, false) // don't except on error + val pdf = jobReportFile.getAbsolutePath + ".pdf" + executor.callRScripts(JOB_REPORT_QUEUE_SCRIPT, jobReportFile.getAbsolutePath, pdf) + } + + def workAroundSameJobNames(func: QFunction):String = { + if ( seenNames.apply(func.jobName) ) { + seenCounter += 1 + "%s_%d".format(func.jobName, seenCounter) + } else { + seenNames += func.jobName + func.jobName + } + } + + /** + * Prints the JobLogging logs to a GATKReport. First splits up the + * logs by group, and for each group generates a GATKReportTable + */ + private def printJobLogging(logs: List[QFunction], stream: PrintStream) { + // create the report + val report: GATKReport = new GATKReport + + // create a table for each group of logs + for ( (group, groupLogs) <- groupLogs(logs) ) { + report.addTable(group, "Job logs for " + group) + val table: GATKReportTable = report.getTable(group) + table.addPrimaryKey("jobName", false) + val keys = logKeys(groupLogs) + + // add the columns + keys.foreach(table.addColumn(_, 0)) + for (log <- groupLogs) { + for ( key <- keys ) + table.set(log.getReportName, key, log.getReportFeature(key)) + } + } + + report.print(stream) + } + + private def groupLogs(logs: List[QFunction]): Map[String, List[QFunction]] = { + logs.groupBy(_.getReportGroup) + } + + private def logKeys(logs: List[QFunction]): Set[String] = { + // the keys should be the same for each log, but we will check that + val keys = Set[String](logs(0).getReportFeatureNames : _*) + + for ( log <- logs ) + if ( keys.sameElements(Set(log.getReportFeatureNames)) ) + throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " + + "We found one with %s and another with %s").format(keys, log.getReportFeatureNames)) + + keys + } +}