Merge branch 'master' of ssh://nickel.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable
This commit is contained in:
commit
8f90a22555
|
|
@ -379,10 +379,14 @@ class RodBindingArgumentTypeDescriptor extends ArgumentTypeDescriptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( tribbleType == null )
|
if ( tribbleType == null )
|
||||||
throw new UserException.CommandLineException(
|
if ( ! file.canRead() | !! file.isFile() ) {
|
||||||
String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " +
|
throw new UserException.BadArgumentValue(name, "Couldn't read file to determine type: " + file);
|
||||||
"Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s",
|
} else {
|
||||||
manager.userFriendlyListOfAvailableFeatures(parameterType)));
|
throw new UserException.CommandLineException(
|
||||||
|
String.format("No tribble type was provided on the command line and the type of the file could not be determined dynamically. " +
|
||||||
|
"Please add an explicit type tag :NAME listing the correct type from among the supported types:%n%s",
|
||||||
|
manager.userFriendlyListOfAvailableFeatures(parameterType)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,9 +48,10 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
walker.initialize();
|
walker.initialize();
|
||||||
Accumulator accumulator = Accumulator.create(engine,walker);
|
Accumulator accumulator = Accumulator.create(engine,walker);
|
||||||
|
|
||||||
|
boolean done = walker.isDone();
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
for (Shard shard : shardStrategy ) {
|
for (Shard shard : shardStrategy ) {
|
||||||
if ( shard == null ) // we ran out of shards that aren't owned
|
if ( done || shard == null ) // we ran out of shards that aren't owned
|
||||||
break;
|
break;
|
||||||
|
|
||||||
if(shard.getShardType() == Shard.ShardType.LOCUS) {
|
if(shard.getShardType() == Shard.ShardType.LOCUS) {
|
||||||
|
|
@ -61,6 +62,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
|
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
|
||||||
accumulator.accumulate(dataProvider,result);
|
accumulator.accumulate(dataProvider,result);
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
|
if ( walker.isDone() ) break;
|
||||||
}
|
}
|
||||||
windowMaker.close();
|
windowMaker.close();
|
||||||
}
|
}
|
||||||
|
|
@ -70,6 +72,8 @@ public class LinearMicroScheduler extends MicroScheduler {
|
||||||
accumulator.accumulate(dataProvider,result);
|
accumulator.accumulate(dataProvider,result);
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
done = walker.isDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
Object result = accumulator.finishTraversal();
|
Object result = accumulator.finishTraversal();
|
||||||
|
|
|
||||||
|
|
@ -173,7 +173,9 @@ public class TraverseDuplicates<M,T> extends TraversalEngine<M,T,DuplicateWalker
|
||||||
* -> those with the same mate pair position, for paired reads
|
* -> those with the same mate pair position, for paired reads
|
||||||
* -> those flagged as unpaired and duplicated but having the same start and end
|
* -> those flagged as unpaired and duplicated but having the same start and end
|
||||||
*/
|
*/
|
||||||
|
boolean done = walker.isDone();
|
||||||
for (SAMRecord read : iter) {
|
for (SAMRecord read : iter) {
|
||||||
|
if ( done ) break;
|
||||||
// get the genome loc from the read
|
// get the genome loc from the read
|
||||||
GenomeLoc site = engine.getGenomeLocParser().createGenomeLoc(read);
|
GenomeLoc site = engine.getGenomeLocParser().createGenomeLoc(read);
|
||||||
|
|
||||||
|
|
@ -194,6 +196,7 @@ public class TraverseDuplicates<M,T> extends TraversalEngine<M,T,DuplicateWalker
|
||||||
}
|
}
|
||||||
|
|
||||||
printProgress(dataProvider.getShard(),site);
|
printProgress(dataProvider.getShard(),site);
|
||||||
|
done = walker.isDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
return sum;
|
return sum;
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ public class TraverseLoci<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,Locu
|
||||||
logger.debug(String.format("TraverseLoci.traverse: Shard is %s", dataProvider));
|
logger.debug(String.format("TraverseLoci.traverse: Shard is %s", dataProvider));
|
||||||
|
|
||||||
LocusView locusView = getLocusView( walker, dataProvider );
|
LocusView locusView = getLocusView( walker, dataProvider );
|
||||||
|
boolean done = false;
|
||||||
|
|
||||||
if ( locusView.hasNext() ) { // trivial optimization to avoid unnecessary processing when there's nothing here at all
|
if ( locusView.hasNext() ) { // trivial optimization to avoid unnecessary processing when there's nothing here at all
|
||||||
|
|
||||||
|
|
@ -46,7 +47,7 @@ public class TraverseLoci<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,Locu
|
||||||
LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
|
LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
|
||||||
|
|
||||||
// We keep processing while the next reference location is within the interval
|
// We keep processing while the next reference location is within the interval
|
||||||
while( locusView.hasNext() ) {
|
while( locusView.hasNext() && ! done ) {
|
||||||
AlignmentContext locus = locusView.next();
|
AlignmentContext locus = locusView.next();
|
||||||
GenomeLoc location = locus.getLocation();
|
GenomeLoc location = locus.getLocation();
|
||||||
|
|
||||||
|
|
@ -76,15 +77,17 @@ public class TraverseLoci<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,Locu
|
||||||
if (keepMeP) {
|
if (keepMeP) {
|
||||||
M x = walker.map(tracker, refContext, locus);
|
M x = walker.map(tracker, refContext, locus);
|
||||||
sum = walker.reduce(x, sum);
|
sum = walker.reduce(x, sum);
|
||||||
|
done = walker.isDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
printProgress(dataProvider.getShard(),locus.getLocation());
|
printProgress(dataProvider.getShard(),locus.getLocation());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have a final map call to execute here to clean up the skipped based from the
|
// We have a final map call to execute here to clean up the skipped based from the
|
||||||
// last position in the ROD to that in the interval
|
// last position in the ROD to that in the interval
|
||||||
if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA ) {
|
if ( WalkerManager.getWalkerDataSource(walker) == DataSource.REFERENCE_ORDERED_DATA && ! walker.isDone() ) {
|
||||||
|
// only do this if the walker isn't done!
|
||||||
RodLocusView rodLocusView = (RodLocusView)locusView;
|
RodLocusView rodLocusView = (RodLocusView)locusView;
|
||||||
long nSkipped = rodLocusView.getLastSkippedBases();
|
long nSkipped = rodLocusView.getLastSkippedBases();
|
||||||
if ( nSkipped > 0 ) {
|
if ( nSkipped > 0 ) {
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,9 @@ public class TraverseReadPairs<M,T> extends TraversalEngine<M,T, ReadPairWalker<
|
||||||
ReadView reads = new ReadView(dataProvider);
|
ReadView reads = new ReadView(dataProvider);
|
||||||
List<SAMRecord> pairs = new ArrayList<SAMRecord>();
|
List<SAMRecord> pairs = new ArrayList<SAMRecord>();
|
||||||
|
|
||||||
|
boolean done = walker.isDone();
|
||||||
for(SAMRecord read: reads) {
|
for(SAMRecord read: reads) {
|
||||||
|
if ( done ) break;
|
||||||
dataProvider.getShard().getReadMetrics().incrementNumReadsSeen();
|
dataProvider.getShard().getReadMetrics().incrementNumReadsSeen();
|
||||||
|
|
||||||
if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) {
|
if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) {
|
||||||
|
|
@ -65,6 +67,8 @@ public class TraverseReadPairs<M,T> extends TraversalEngine<M,T, ReadPairWalker<
|
||||||
|
|
||||||
printProgress(dataProvider.getShard(),null);
|
printProgress(dataProvider.getShard(),null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
done = walker.isDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any data was left in the queue, process it.
|
// If any data was left in the queue, process it.
|
||||||
|
|
|
||||||
|
|
@ -82,8 +82,10 @@ public class TraverseReads<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,Read
|
||||||
// get the reference ordered data
|
// get the reference ordered data
|
||||||
ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
||||||
|
|
||||||
|
boolean done = walker.isDone();
|
||||||
// while we still have more reads
|
// while we still have more reads
|
||||||
for (SAMRecord read : reads) {
|
for (SAMRecord read : reads) {
|
||||||
|
if ( done ) break;
|
||||||
// ReferenceContext -- the reference bases covered by the read
|
// ReferenceContext -- the reference bases covered by the read
|
||||||
ReferenceContext refContext = null;
|
ReferenceContext refContext = null;
|
||||||
|
|
||||||
|
|
@ -106,6 +108,7 @@ public class TraverseReads<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,Read
|
||||||
|
|
||||||
GenomeLoc locus = read.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX ? null : engine.getGenomeLocParser().createGenomeLoc(read.getReferenceName(),read.getAlignmentStart());
|
GenomeLoc locus = read.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX ? null : engine.getGenomeLocParser().createGenomeLoc(read.getReferenceName(),read.getAlignmentStart());
|
||||||
printProgress(dataProvider.getShard(),locus);
|
printProgress(dataProvider.getShard(),locus);
|
||||||
|
done = walker.isDone();
|
||||||
}
|
}
|
||||||
return sum;
|
return sum;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -126,6 +126,17 @@ public abstract class Walker<MapType, ReduceType> {
|
||||||
|
|
||||||
public void initialize() { }
|
public void initialize() { }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A function for overloading in subclasses providing a mechanism to abort early from a walker.
|
||||||
|
*
|
||||||
|
* If this ever returns true, then the Traversal engine will stop executing map calls
|
||||||
|
* and start the process of shutting down the walker in an orderly fashion.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean isDone() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide an initial value for reduce computations.
|
* Provide an initial value for reduce computations.
|
||||||
* @return Initial value of reduce.
|
* @return Initial value of reduce.
|
||||||
|
|
|
||||||
|
|
@ -234,7 +234,7 @@ public class DiffEngine {
|
||||||
|
|
||||||
// now that we have a specific list of values we want to show, display them
|
// now that we have a specific list of values we want to show, display them
|
||||||
GATKReport report = new GATKReport();
|
GATKReport report = new GATKReport();
|
||||||
final String tableName = "diffences";
|
final String tableName = "differences";
|
||||||
report.addTable(tableName, "Summarized differences between the master and test files. See http://www.broadinstitute.org/gsa/wiki/index.php/DiffEngine for more information", false);
|
report.addTable(tableName, "Summarized differences between the master and test files. See http://www.broadinstitute.org/gsa/wiki/index.php/DiffEngine for more information", false);
|
||||||
GATKReportTable table = report.getTable(tableName);
|
GATKReportTable table = report.getTable(tableName);
|
||||||
table.addPrimaryKey("Difference", true);
|
table.addPrimaryKey("Difference", true);
|
||||||
|
|
|
||||||
|
|
@ -151,22 +151,21 @@ public class VariantsToTable extends RodWalker<Integer, Integer> {
|
||||||
if ( tracker == null ) // RodWalkers can make funky map calls
|
if ( tracker == null ) // RodWalkers can make funky map calls
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if ( ++nRecords < MAX_RECORDS || MAX_RECORDS == -1 ) {
|
for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) {
|
||||||
for ( VariantContext vc : tracker.getValues(variantCollection.variants, context.getLocation())) {
|
if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) {
|
||||||
if ( (keepMultiAllelic || vc.isBiallelic()) && ( showFiltered || vc.isNotFiltered() ) ) {
|
List<String> vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA, keepMultiAllelic, logACSum);
|
||||||
List<String> vals = extractFields(vc, fieldsToTake, ALLOW_MISSING_DATA, keepMultiAllelic, logACSum);
|
out.println(Utils.join("\t", vals));
|
||||||
out.println(Utils.join("\t", vals));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
if ( nRecords >= MAX_RECORDS ) {
|
|
||||||
logger.warn("Calling sys exit to leave after " + nRecords + " records");
|
|
||||||
System.exit(0); // todo -- what's the recommend way to abort like this?
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone() {
|
||||||
|
boolean done = MAX_RECORDS != -1 && nRecords >= MAX_RECORDS;
|
||||||
|
if ( done) logger.warn("isDone() will return true to leave after " + nRecords + " records");
|
||||||
|
return done ;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final boolean isWildCard(String s) {
|
private static final boolean isWildCard(String s) {
|
||||||
|
|
@ -271,7 +270,7 @@ public class VariantsToTable extends RodWalker<Integer, Integer> {
|
||||||
getters.put("REF", new Getter() {
|
getters.put("REF", new Getter() {
|
||||||
public String get(VariantContext vc) {
|
public String get(VariantContext vc) {
|
||||||
String x = "";
|
String x = "";
|
||||||
if ( vc.hasReferenceBaseForIndel() ) {
|
if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) {
|
||||||
Byte refByte = vc.getReferenceBaseForIndel();
|
Byte refByte = vc.getReferenceBaseForIndel();
|
||||||
x=x+new String(new byte[]{refByte});
|
x=x+new String(new byte[]{refByte});
|
||||||
}
|
}
|
||||||
|
|
@ -283,7 +282,7 @@ public class VariantsToTable extends RodWalker<Integer, Integer> {
|
||||||
StringBuilder x = new StringBuilder();
|
StringBuilder x = new StringBuilder();
|
||||||
int n = vc.getAlternateAlleles().size();
|
int n = vc.getAlternateAlleles().size();
|
||||||
if ( n == 0 ) return ".";
|
if ( n == 0 ) return ".";
|
||||||
if ( vc.hasReferenceBaseForIndel() ) {
|
if ( vc.hasReferenceBaseForIndel() && !vc.isSNP() ) {
|
||||||
Byte refByte = vc.getReferenceBaseForIndel();
|
Byte refByte = vc.getReferenceBaseForIndel();
|
||||||
x.append(new String(new byte[]{refByte}));
|
x.append(new String(new byte[]{refByte}));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2011, The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.sting.utils.R;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.broadinstitute.sting.commandline.Advanced;
|
||||||
|
import org.broadinstitute.sting.commandline.Argument;
|
||||||
|
import org.broadinstitute.sting.commandline.ArgumentCollection;
|
||||||
|
import org.broadinstitute.sting.gatk.walkers.recalibration.Covariate;
|
||||||
|
import org.broadinstitute.sting.utils.PathUtils;
|
||||||
|
import org.broadinstitute.sting.utils.Utils;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic service for executing RScripts in the GATK directory
|
||||||
|
*
|
||||||
|
* @author Your Name
|
||||||
|
* @since Date created
|
||||||
|
*/
|
||||||
|
public class RScriptExecutor {
|
||||||
|
/**
|
||||||
|
* our log
|
||||||
|
*/
|
||||||
|
protected static Logger logger = Logger.getLogger(RScriptExecutor.class);
|
||||||
|
|
||||||
|
public static class RScriptArgumentCollection {
|
||||||
|
@Advanced
|
||||||
|
@Argument(fullName = "path_to_Rscript", shortName = "Rscript", doc = "The path to your implementation of Rscript. For Broad users this is maybe /broad/tools/apps/R-2.6.0/bin/Rscript", required = false)
|
||||||
|
private String PATH_TO_RSCRIPT = "Rscript";
|
||||||
|
|
||||||
|
@Advanced
|
||||||
|
@Argument(fullName = "path_to_resources", shortName = "resources", doc = "Path to resources folder holding the Sting R scripts.", required = false)
|
||||||
|
private List<String> PATH_TO_RESOURCES = Arrays.asList("public/R/", "private/R/");
|
||||||
|
}
|
||||||
|
|
||||||
|
final RScriptArgumentCollection myArgs;
|
||||||
|
final boolean exceptOnError;
|
||||||
|
|
||||||
|
public RScriptExecutor(final RScriptArgumentCollection myArgs, final boolean exceptOnError) {
|
||||||
|
this.myArgs = myArgs;
|
||||||
|
this.exceptOnError = exceptOnError;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callRScripts(String scriptName, String... scriptArgs) {
|
||||||
|
callRScripts(scriptName, Arrays.asList(scriptArgs));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callRScripts(String scriptName, List<String> scriptArgs) {
|
||||||
|
try {
|
||||||
|
final File pathToScript = findScript(scriptName);
|
||||||
|
if ( pathToScript == null ) return; // we failed but shouldn't exception out
|
||||||
|
final String argString = Utils.join(" ", scriptArgs);
|
||||||
|
final String cmdLine = Utils.join(" ", Arrays.asList(myArgs.PATH_TO_RSCRIPT, pathToScript, argString));
|
||||||
|
logger.info("Executing RScript: " + cmdLine);
|
||||||
|
Runtime.getRuntime().exec(cmdLine).waitFor();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
generateException(e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
generateException("Fatal Exception: Perhaps RScript jobs are being spawned too quickly?", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public File findScript(final String scriptName) {
|
||||||
|
for ( String pathToResource : myArgs.PATH_TO_RESOURCES ) {
|
||||||
|
final File f = new File(pathToResource + "/" + scriptName);
|
||||||
|
if ( f.exists() ) {
|
||||||
|
if ( f.canRead() )
|
||||||
|
return f;
|
||||||
|
else
|
||||||
|
generateException("Script exists but couldn't be read: " + scriptName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
generateException("Couldn't find script: " + scriptName + " in " + myArgs.PATH_TO_RSCRIPT);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateException(String msg) {
|
||||||
|
generateException(msg, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateException(Throwable e) {
|
||||||
|
generateException("", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateException(String msg, Throwable e) {
|
||||||
|
if ( exceptOnError )
|
||||||
|
throw new RuntimeException(msg, e);
|
||||||
|
else
|
||||||
|
logger.warn(msg + (e == null ? "" : ":" + e.getMessage()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -281,7 +281,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec,
|
||||||
|
|
||||||
VariantContext vc = null;
|
VariantContext vc = null;
|
||||||
try {
|
try {
|
||||||
vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes);
|
vc = new VariantContext(name, contig, pos, loc, alleles, qual, filters, attributes, ref.getBytes()[0]);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
generateException(e.getMessage());
|
generateException(e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
@ -290,8 +290,7 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec,
|
||||||
if ( !header.samplesWereAlreadySorted() )
|
if ( !header.samplesWereAlreadySorted() )
|
||||||
vc.getGenotypes();
|
vc.getGenotypes();
|
||||||
|
|
||||||
// Trim bases of all alleles if necessary
|
return vc;
|
||||||
return createVariantContextWithTrimmedAlleles(vc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -516,25 +515,44 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int computeForwardClipping(List<Allele> unclippedAlleles, String ref) {
|
public static int computeForwardClipping(List<Allele> unclippedAlleles, String ref) {
|
||||||
boolean clipping = true;
|
boolean clipping = true;
|
||||||
// Note that the computation of forward clipping here is meant only to see whether there is a common
|
|
||||||
// base to all alleles, and to correctly compute reverse clipping,
|
|
||||||
// but it is not used for actually changing alleles - this is done in function
|
|
||||||
// createVariantContextWithTrimmedAlleles() below.
|
|
||||||
|
|
||||||
for (Allele a : unclippedAlleles) {
|
for ( Allele a : unclippedAlleles ) {
|
||||||
if (a.isSymbolic()) {
|
if ( a.isSymbolic() )
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
if (a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0])) {
|
if ( a.length() < 1 || (a.getBases()[0] != ref.getBytes()[0]) ) {
|
||||||
clipping = false;
|
clipping = false;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (clipping) ? 1 : 0;
|
|
||||||
|
|
||||||
|
return (clipping) ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static int computeReverseClipping(List<Allele> unclippedAlleles, String ref, int forwardClipping, int lineNo) {
|
||||||
|
int clipping = 0;
|
||||||
|
boolean stillClipping = true;
|
||||||
|
|
||||||
|
while ( stillClipping ) {
|
||||||
|
for ( Allele a : unclippedAlleles ) {
|
||||||
|
if ( a.isSymbolic() )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if ( a.length() - clipping <= forwardClipping || a.length() - forwardClipping == 0 )
|
||||||
|
stillClipping = false;
|
||||||
|
else if ( ref.length() == clipping )
|
||||||
|
generateException("bad alleles encountered", lineNo);
|
||||||
|
else if ( a.getBases()[a.length()-clipping-1] != ref.getBytes()[ref.length()-clipping-1] )
|
||||||
|
stillClipping = false;
|
||||||
|
}
|
||||||
|
if ( stillClipping )
|
||||||
|
clipping++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return clipping;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* clip the alleles, based on the reference
|
* clip the alleles, based on the reference
|
||||||
*
|
*
|
||||||
|
|
@ -542,122 +560,30 @@ public abstract class AbstractVCFCodec implements FeatureCodec, NameAwareCodec,
|
||||||
* @param ref the reference string
|
* @param ref the reference string
|
||||||
* @param unclippedAlleles the list of unclipped alleles
|
* @param unclippedAlleles the list of unclipped alleles
|
||||||
* @param clippedAlleles output list of clipped alleles
|
* @param clippedAlleles output list of clipped alleles
|
||||||
|
* @param lineNo the current line number in the file
|
||||||
* @return the new reference end position of this event
|
* @return the new reference end position of this event
|
||||||
*/
|
*/
|
||||||
protected static int clipAlleles(int position, String ref, List<Allele> unclippedAlleles, List<Allele> clippedAlleles, int lineNo) {
|
protected static int clipAlleles(int position, String ref, List<Allele> unclippedAlleles, List<Allele> clippedAlleles, int lineNo) {
|
||||||
|
|
||||||
// Note that the computation of forward clipping here is meant only to see whether there is a common
|
|
||||||
// base to all alleles, and to correctly compute reverse clipping,
|
|
||||||
// but it is not used for actually changing alleles - this is done in function
|
|
||||||
// createVariantContextWithTrimmedAlleles() below.
|
|
||||||
|
|
||||||
int forwardClipping = computeForwardClipping(unclippedAlleles, ref);
|
int forwardClipping = computeForwardClipping(unclippedAlleles, ref);
|
||||||
|
int reverseClipping = computeReverseClipping(unclippedAlleles, ref, forwardClipping, lineNo);
|
||||||
int reverseClipped = 0;
|
|
||||||
boolean clipping = true;
|
|
||||||
while (clipping) {
|
|
||||||
for (Allele a : unclippedAlleles) {
|
|
||||||
if (a.isSymbolic()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (a.length() - reverseClipped <= forwardClipping || a.length() - forwardClipping == 0)
|
|
||||||
clipping = false;
|
|
||||||
else if (ref.length() == reverseClipped)
|
|
||||||
generateException("bad alleles encountered", lineNo);
|
|
||||||
else if (a.getBases()[a.length()-reverseClipped-1] != ref.getBytes()[ref.length()-reverseClipped-1])
|
|
||||||
clipping = false;
|
|
||||||
}
|
|
||||||
if (clipping) reverseClipped++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( clippedAlleles != null ) {
|
if ( clippedAlleles != null ) {
|
||||||
for ( Allele a : unclippedAlleles ) {
|
for ( Allele a : unclippedAlleles ) {
|
||||||
if ( a.isSymbolic() ) {
|
if ( a.isSymbolic() ) {
|
||||||
clippedAlleles.add(a);
|
clippedAlleles.add(a);
|
||||||
} else {
|
} else {
|
||||||
clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(),0,a.getBases().length-reverseClipped),a.isReference()));
|
clippedAlleles.add(Allele.create(Arrays.copyOfRange(a.getBases(), forwardClipping, a.getBases().length-reverseClipping), a.isReference()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// the new reference length
|
// the new reference length
|
||||||
int refLength = ref.length() - reverseClipped;
|
int refLength = ref.length() - reverseClipping;
|
||||||
|
|
||||||
return position+Math.max(refLength - 1,0);
|
return position+Math.max(refLength - 1,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) {
|
|
||||||
// see if we need to trim common reference base from all alleles
|
|
||||||
boolean trimVC;
|
|
||||||
|
|
||||||
// We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles
|
|
||||||
Allele refAllele = inputVC.getReference();
|
|
||||||
if (!inputVC.isVariant())
|
|
||||||
trimVC = false;
|
|
||||||
else if (refAllele.isNull())
|
|
||||||
trimVC = false;
|
|
||||||
else {
|
|
||||||
trimVC = (computeForwardClipping(new ArrayList<Allele>(inputVC.getAlternateAlleles()),
|
|
||||||
inputVC.getReference().getDisplayString()) > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// nothing to do if we don't need to trim bases
|
|
||||||
if (trimVC) {
|
|
||||||
List<Allele> alleles = new ArrayList<Allele>();
|
|
||||||
Map<String, Genotype> genotypes = new TreeMap<String, Genotype>();
|
|
||||||
|
|
||||||
// set the reference base for indels in the attributes
|
|
||||||
Map<String,Object> attributes = new TreeMap<String,Object>(inputVC.getAttributes());
|
|
||||||
|
|
||||||
Map<Allele, Allele> originalToTrimmedAlleleMap = new HashMap<Allele, Allele>();
|
|
||||||
|
|
||||||
for (Allele a : inputVC.getAlleles()) {
|
|
||||||
if (a.isSymbolic()) {
|
|
||||||
alleles.add(a);
|
|
||||||
originalToTrimmedAlleleMap.put(a, a);
|
|
||||||
} else {
|
|
||||||
// get bases for current allele and create a new one with trimmed bases
|
|
||||||
byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length());
|
|
||||||
Allele trimmedAllele = Allele.create(newBases, a.isReference());
|
|
||||||
alleles.add(trimmedAllele);
|
|
||||||
originalToTrimmedAlleleMap.put(a, trimmedAllele);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation
|
|
||||||
// example: mixed records such as {TA*,TGA,TG}
|
|
||||||
boolean hasNullAlleles = false;
|
|
||||||
|
|
||||||
for (Allele a: originalToTrimmedAlleleMap.values()) {
|
|
||||||
if (a.isNull())
|
|
||||||
hasNullAlleles = true;
|
|
||||||
if (a.isReference())
|
|
||||||
refAllele = a;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!hasNullAlleles)
|
|
||||||
return inputVC;
|
|
||||||
// now we can recreate new genotypes with trimmed alleles
|
|
||||||
for ( Map.Entry<String, Genotype> sample : inputVC.getGenotypes().entrySet() ) {
|
|
||||||
|
|
||||||
List<Allele> originalAlleles = sample.getValue().getAlleles();
|
|
||||||
List<Allele> trimmedAlleles = new ArrayList<Allele>();
|
|
||||||
for ( Allele a : originalAlleles ) {
|
|
||||||
if ( a.isCalled() )
|
|
||||||
trimmedAlleles.add(originalToTrimmedAlleleMap.get(a));
|
|
||||||
else
|
|
||||||
trimmedAlleles.add(Allele.NO_CALL);
|
|
||||||
}
|
|
||||||
genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles));
|
|
||||||
|
|
||||||
}
|
|
||||||
return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0]));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return inputVC;
|
|
||||||
}
|
|
||||||
|
|
||||||
public final static boolean canDecodeFile(final File potentialInput, final String MAGIC_HEADER_LINE) {
|
public final static boolean canDecodeFile(final File potentialInput, final String MAGIC_HEADER_LINE) {
|
||||||
try {
|
try {
|
||||||
return isVCFStream(new FileInputStream(potentialInput), MAGIC_HEADER_LINE) ||
|
return isVCFStream(new FileInputStream(potentialInput), MAGIC_HEADER_LINE) ||
|
||||||
|
|
|
||||||
|
|
@ -258,9 +258,10 @@ public class VariantContext implements Feature { // to enable tribble intergrati
|
||||||
* @param negLog10PError qual
|
* @param negLog10PError qual
|
||||||
* @param filters filters: use null for unfiltered and empty set for passes filters
|
* @param filters filters: use null for unfiltered and empty set for passes filters
|
||||||
* @param attributes attributes
|
* @param attributes attributes
|
||||||
|
* @param referenceBaseForIndel padded reference base
|
||||||
*/
|
*/
|
||||||
public VariantContext(String source, String contig, long start, long stop, Collection<Allele> alleles, double negLog10PError, Set<String> filters, Map<String, ?> attributes) {
|
public VariantContext(String source, String contig, long start, long stop, Collection<Allele> alleles, double negLog10PError, Set<String> filters, Map<String, ?> attributes, Byte referenceBaseForIndel) {
|
||||||
this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, null, true);
|
this(source, contig, start, stop, alleles, NO_GENOTYPES, negLog10PError, filters, attributes, referenceBaseForIndel, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -657,12 +657,84 @@ public class VariantContextUtils {
|
||||||
|
|
||||||
VariantContext merged = new VariantContext(name, loc.getContig(), loc.getStart(), loc.getStop(), alleles, genotypes, negLog10PError, filters, (mergeInfoWithMaxAC ? attributesWithMaxAC : attributes) );
|
VariantContext merged = new VariantContext(name, loc.getContig(), loc.getStart(), loc.getStop(), alleles, genotypes, negLog10PError, filters, (mergeInfoWithMaxAC ? attributesWithMaxAC : attributes) );
|
||||||
// Trim the padded bases of all alleles if necessary
|
// Trim the padded bases of all alleles if necessary
|
||||||
merged = AbstractVCFCodec.createVariantContextWithTrimmedAlleles(merged);
|
merged = createVariantContextWithTrimmedAlleles(merged);
|
||||||
|
|
||||||
if ( printMessages && remapped ) System.out.printf("Remapped => %s%n", merged);
|
if ( printMessages && remapped ) System.out.printf("Remapped => %s%n", merged);
|
||||||
return merged;
|
return merged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static VariantContext createVariantContextWithTrimmedAlleles(VariantContext inputVC) {
|
||||||
|
// see if we need to trim common reference base from all alleles
|
||||||
|
boolean trimVC;
|
||||||
|
|
||||||
|
// We need to trim common reference base from all alleles in all genotypes if a ref base is common to all alleles
|
||||||
|
Allele refAllele = inputVC.getReference();
|
||||||
|
if (!inputVC.isVariant())
|
||||||
|
trimVC = false;
|
||||||
|
else if (refAllele.isNull())
|
||||||
|
trimVC = false;
|
||||||
|
else {
|
||||||
|
trimVC = (AbstractVCFCodec.computeForwardClipping(new ArrayList<Allele>(inputVC.getAlternateAlleles()),
|
||||||
|
inputVC.getReference().getDisplayString()) > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// nothing to do if we don't need to trim bases
|
||||||
|
if (trimVC) {
|
||||||
|
List<Allele> alleles = new ArrayList<Allele>();
|
||||||
|
Map<String, Genotype> genotypes = new TreeMap<String, Genotype>();
|
||||||
|
|
||||||
|
// set the reference base for indels in the attributes
|
||||||
|
Map<String,Object> attributes = new TreeMap<String,Object>(inputVC.getAttributes());
|
||||||
|
|
||||||
|
Map<Allele, Allele> originalToTrimmedAlleleMap = new HashMap<Allele, Allele>();
|
||||||
|
|
||||||
|
for (Allele a : inputVC.getAlleles()) {
|
||||||
|
if (a.isSymbolic()) {
|
||||||
|
alleles.add(a);
|
||||||
|
originalToTrimmedAlleleMap.put(a, a);
|
||||||
|
} else {
|
||||||
|
// get bases for current allele and create a new one with trimmed bases
|
||||||
|
byte[] newBases = Arrays.copyOfRange(a.getBases(), 1, a.length());
|
||||||
|
Allele trimmedAllele = Allele.create(newBases, a.isReference());
|
||||||
|
alleles.add(trimmedAllele);
|
||||||
|
originalToTrimmedAlleleMap.put(a, trimmedAllele);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// detect case where we're trimming bases but resulting vc doesn't have any null allele. In that case, we keep original representation
|
||||||
|
// example: mixed records such as {TA*,TGA,TG}
|
||||||
|
boolean hasNullAlleles = false;
|
||||||
|
|
||||||
|
for (Allele a: originalToTrimmedAlleleMap.values()) {
|
||||||
|
if (a.isNull())
|
||||||
|
hasNullAlleles = true;
|
||||||
|
if (a.isReference())
|
||||||
|
refAllele = a;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasNullAlleles)
|
||||||
|
return inputVC;
|
||||||
|
// now we can recreate new genotypes with trimmed alleles
|
||||||
|
for ( Map.Entry<String, Genotype> sample : inputVC.getGenotypes().entrySet() ) {
|
||||||
|
|
||||||
|
List<Allele> originalAlleles = sample.getValue().getAlleles();
|
||||||
|
List<Allele> trimmedAlleles = new ArrayList<Allele>();
|
||||||
|
for ( Allele a : originalAlleles ) {
|
||||||
|
if ( a.isCalled() )
|
||||||
|
trimmedAlleles.add(originalToTrimmedAlleleMap.get(a));
|
||||||
|
else
|
||||||
|
trimmedAlleles.add(Allele.NO_CALL);
|
||||||
|
}
|
||||||
|
genotypes.put(sample.getKey(), Genotype.modifyAlleles(sample.getValue(), trimmedAlleles));
|
||||||
|
|
||||||
|
}
|
||||||
|
return new VariantContext(inputVC.getSource(), inputVC.getChr(), inputVC.getStart(), inputVC.getEnd(), alleles, genotypes, inputVC.getNegLog10PError(), inputVC.filtersWereApplied() ? inputVC.getFilters() : null, attributes, new Byte(inputVC.getReference().getBases()[0]));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return inputVC;
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, Genotype> stripPLs(Map<String, Genotype> genotypes) {
|
public static Map<String, Genotype> stripPLs(Map<String, Genotype> genotypes) {
|
||||||
Map<String, Genotype> newGs = new HashMap<String, Genotype>(genotypes.size());
|
Map<String, Genotype> newGs = new HashMap<String, Genotype>(genotypes.size());
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,8 +50,8 @@ public class DiffObjectsIntegrationTest extends WalkerTest {
|
||||||
|
|
||||||
@DataProvider(name = "data")
|
@DataProvider(name = "data")
|
||||||
public Object[][] createData() {
|
public Object[][] createData() {
|
||||||
new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "92311de76dda3f38aac289d807ef23d0");
|
new TestParams(testDir + "diffTestMaster.vcf", testDir + "diffTestTest.vcf", "dc1ca75c6ecf32641967d61e167acfff");
|
||||||
new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "0c69412c385fda50210f2a612e1ffe4a");
|
new TestParams(testDir + "exampleBAM.bam", testDir + "exampleBAM.simple.bam", "df0fcb568a3a49fc74830103b2e26f6c");
|
||||||
return TestParams.getTests(TestParams.class);
|
return TestParams.getTests(TestParams.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,8 @@ class QCommandLine extends CommandLineProgram with Logging {
|
||||||
def execute = {
|
def execute = {
|
||||||
qGraph.settings = settings
|
qGraph.settings = settings
|
||||||
|
|
||||||
for (script <- pluginManager.createAllTypes()) {
|
val allQScripts = pluginManager.createAllTypes();
|
||||||
|
for (script <- allQScripts) {
|
||||||
logger.info("Scripting " + pluginManager.getName(script.getClass.asSubclass(classOf[QScript])))
|
logger.info("Scripting " + pluginManager.getName(script.getClass.asSubclass(classOf[QScript])))
|
||||||
loadArgumentsIntoObject(script)
|
loadArgumentsIntoObject(script)
|
||||||
try {
|
try {
|
||||||
|
|
@ -108,14 +109,24 @@ class QCommandLine extends CommandLineProgram with Logging {
|
||||||
logger.info("Added " + script.functions.size + " functions")
|
logger.info("Added " + script.functions.size + " functions")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Execute the job graph
|
||||||
qGraph.run()
|
qGraph.run()
|
||||||
|
|
||||||
|
// walk over each script, calling onExecutionDone
|
||||||
|
for (script <- allQScripts) {
|
||||||
|
script.onExecutionDone(qGraph.getFunctionsAndStatus(script.functions), qGraph.success)
|
||||||
|
if ( ! settings.disableJobReport ) {
|
||||||
|
logger.info("Writing JobLogging GATKReport to file " + settings.jobReportFile)
|
||||||
|
QJobReport.printReport(qGraph.getFunctionsAndStatus(script.functions), settings.jobReportFile)
|
||||||
|
QJobReport.plotReport(settings.rScriptArgs, settings.jobReportFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!qGraph.success) {
|
if (!qGraph.success) {
|
||||||
logger.info("Done with errors")
|
logger.info("Done with errors")
|
||||||
qGraph.logFailed()
|
qGraph.logFailed()
|
||||||
1
|
1
|
||||||
} else {
|
} else {
|
||||||
logger.info("Done")
|
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
package org.broadinstitute.sting.queue
|
package org.broadinstitute.sting.queue
|
||||||
|
|
||||||
|
import engine.JobRunInfo
|
||||||
import org.broadinstitute.sting.queue.function.QFunction
|
import org.broadinstitute.sting.queue.function.QFunction
|
||||||
import annotation.target.field
|
import annotation.target.field
|
||||||
import io.Source
|
import io.Source
|
||||||
|
|
@ -57,6 +58,15 @@ trait QScript extends Logging with PrimitiveOptionConversions with StringFileCon
|
||||||
*/
|
*/
|
||||||
def script()
|
def script()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A default handler for the onExecutionDone() function. By default this doesn't do anything
|
||||||
|
* except print out a fine status message.
|
||||||
|
*/
|
||||||
|
def onExecutionDone(jobs: Map[QFunction, JobRunInfo], success: Boolean) {
|
||||||
|
logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", jobs.size))
|
||||||
|
for ( (f, info) <- jobs ) logger.info(" %s %s".format(f.jobName, info))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The command line functions that will be executed for this QScript.
|
* The command line functions that will be executed for this QScript.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
||||||
*/
|
*/
|
||||||
var depth = -1
|
var depth = -1
|
||||||
|
|
||||||
|
val myRunInfo: JobRunInfo = JobRunInfo.default // purely for dryRun testing
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes with the current status of the function.
|
* Initializes with the current status of the function.
|
||||||
*/
|
*/
|
||||||
|
|
@ -179,4 +181,8 @@ class FunctionEdge(val function: QFunction, val inputs: QNode, val outputs: QNod
|
||||||
printWriter.close
|
printWriter.close
|
||||||
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
|
IOUtils.writeContents(functionErrorFile, stackTrace.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getRunInfo = {
|
||||||
|
if ( runner == null ) myRunInfo else runner.getRunInfo
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
package org.broadinstitute.sting.queue.engine
|
package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
import org.broadinstitute.sting.queue.function.InProcessFunction
|
import org.broadinstitute.sting.queue.function.InProcessFunction
|
||||||
import org.broadinstitute.sting.queue.util.IOUtils
|
import java.util.Date
|
||||||
|
import org.broadinstitute.sting.queue.util.{Logging, IOUtils}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs a function that executes in process and does not fork out an external process.
|
* Runs a function that executes in process and does not fork out an external process.
|
||||||
|
|
@ -10,8 +11,12 @@ class InProcessRunner(val function: InProcessFunction) extends JobRunner[InProce
|
||||||
private var runStatus: RunnerStatus.Value = _
|
private var runStatus: RunnerStatus.Value = _
|
||||||
|
|
||||||
def start() = {
|
def start() = {
|
||||||
|
getRunInfo.startTime = new Date()
|
||||||
runStatus = RunnerStatus.RUNNING
|
runStatus = RunnerStatus.RUNNING
|
||||||
|
|
||||||
function.run()
|
function.run()
|
||||||
|
|
||||||
|
getRunInfo.doneTime = new Date()
|
||||||
val content = "%s%nDone.".format(function.description)
|
val content = "%s%nDone.".format(function.description)
|
||||||
IOUtils.writeContents(function.jobOutputFile, content)
|
IOUtils.writeContents(function.jobOutputFile, content)
|
||||||
runStatus = RunnerStatus.DONE
|
runStatus = RunnerStatus.DONE
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2011, The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
|
import java.util.Date
|
||||||
|
import java.text.SimpleDateFormat
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class containing tracked information about a job run.
|
||||||
|
*/
|
||||||
|
// todo -- it might be nice to have the hostname
|
||||||
|
class JobRunInfo {
|
||||||
|
/** constant date format */
|
||||||
|
val formatter = new SimpleDateFormat("yy-MM-dd H:mm:ss:SSS");
|
||||||
|
|
||||||
|
/** The start time with millisecond resolution of this job */
|
||||||
|
var startTime: Date = _
|
||||||
|
/** The done time with millisecond resolution of this job */
|
||||||
|
var doneTime: Date = _
|
||||||
|
|
||||||
|
def getStartTime = startTime
|
||||||
|
def getDoneTime = doneTime
|
||||||
|
def getFormattedStartTime = formatTime(getStartTime)
|
||||||
|
def getFormattedDoneTime = formatTime(getDoneTime)
|
||||||
|
|
||||||
|
/** Helper function that pretty prints the date */
|
||||||
|
private def formatTime(d: Date) = if ( d != null ) formatter.format(d) else "null"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Was any information set for this jobInfo? JobInfo can be unset because
|
||||||
|
* the job never ran or because it already completed.
|
||||||
|
*/
|
||||||
|
def isFilledIn = startTime != null
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long did the job run (in wall time)? Returns -1 if this jobInfo isn't filled in
|
||||||
|
*/
|
||||||
|
def getRuntimeInMs: Long = {
|
||||||
|
if ( isFilledIn )
|
||||||
|
getDoneTime.getTime - getStartTime.getTime
|
||||||
|
else
|
||||||
|
-1
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString: String =
|
||||||
|
"started %s ended %s runtime %s".format(getFormattedStartTime, getFormattedDoneTime, getRuntimeInMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
object JobRunInfo {
|
||||||
|
def default: JobRunInfo = new JobRunInfo()
|
||||||
|
}
|
||||||
|
|
@ -69,6 +69,12 @@ trait JobRunner[TFunction <: QFunction] {
|
||||||
def cleanup() {
|
def cleanup() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Must be overloaded
|
||||||
|
*/
|
||||||
|
val runInfo = JobRunInfo.default
|
||||||
|
def getRunInfo = runInfo
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls back to a hook that an expert user can setup to modify a job.
|
* Calls back to a hook that an expert user can setup to modify a job.
|
||||||
* @param value Value to modify.
|
* @param value Value to modify.
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils
|
||||||
import org.broadinstitute.sting.queue.util._
|
import org.broadinstitute.sting.queue.util._
|
||||||
import collection.immutable.{TreeSet, TreeMap}
|
import collection.immutable.{TreeSet, TreeMap}
|
||||||
import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction}
|
import org.broadinstitute.sting.queue.function.scattergather.{ScatterFunction, CloneFunction, GatherFunction, ScatterGatherableFunction}
|
||||||
|
import java.util.Date
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The internal dependency tracker between sets of function input and output files.
|
* The internal dependency tracker between sets of function input and output files.
|
||||||
|
|
@ -319,7 +320,9 @@ class QGraph extends Logging {
|
||||||
logger.debug("+++++++")
|
logger.debug("+++++++")
|
||||||
foreachFunction(readyJobs.toList, edge => {
|
foreachFunction(readyJobs.toList, edge => {
|
||||||
if (running) {
|
if (running) {
|
||||||
|
edge.myRunInfo.startTime = new Date()
|
||||||
logEdge(edge)
|
logEdge(edge)
|
||||||
|
edge.myRunInfo.doneTime = new Date()
|
||||||
edge.markAsDone
|
edge.markAsDone
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
@ -939,6 +942,14 @@ class QGraph extends Logging {
|
||||||
edges.sorted(functionOrdering).foreach(edge => if (running) f(edge))
|
edges.sorted(functionOrdering).foreach(edge => if (running) f(edge))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility function for running a method over all function edges.
|
||||||
|
* @param edgeFunction Function to run for each FunctionEdge.
|
||||||
|
*/
|
||||||
|
private def getFunctionEdges: List[FunctionEdge] = {
|
||||||
|
jobGraph.edgeSet.toList.filter(_.isInstanceOf[FunctionEdge]).asInstanceOf[List[FunctionEdge]]
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility function for running a method over all functions, but traversing the nodes in order of dependency.
|
* Utility function for running a method over all functions, but traversing the nodes in order of dependency.
|
||||||
* @param edgeFunction Function to run for each FunctionEdge.
|
* @param edgeFunction Function to run for each FunctionEdge.
|
||||||
|
|
@ -1028,6 +1039,10 @@ class QGraph extends Logging {
|
||||||
*/
|
*/
|
||||||
def isShutdown = !running
|
def isShutdown = !running
|
||||||
|
|
||||||
|
def getFunctionsAndStatus(functions: List[QFunction]): Map[QFunction, JobRunInfo] = {
|
||||||
|
getFunctionEdges.map(edge => (edge.function, edge.getRunInfo)).toMap
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Kills any forked jobs still running.
|
* Kills any forked jobs still running.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -26,8 +26,9 @@ package org.broadinstitute.sting.queue.engine
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import org.broadinstitute.sting.queue.QSettings
|
import org.broadinstitute.sting.queue.QSettings
|
||||||
import org.broadinstitute.sting.commandline.{ArgumentCollection, Argument}
|
|
||||||
import org.broadinstitute.sting.queue.util.SystemUtils
|
import org.broadinstitute.sting.queue.util.SystemUtils
|
||||||
|
import org.broadinstitute.sting.commandline.{Advanced, ArgumentCollection, Argument}
|
||||||
|
import org.broadinstitute.sting.utils.R.RScriptExecutor
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Command line options for a QGraph.
|
* Command line options for a QGraph.
|
||||||
|
|
@ -69,6 +70,16 @@ class QGraphSettings {
|
||||||
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
|
@Argument(fullName="expanded_dot_graph", shortName="expandedDot", doc="Outputs the queue graph of scatter gather to a .dot file. Otherwise overwrites the dot_graph", required=false)
|
||||||
var expandedDotFile: File = _
|
var expandedDotFile: File = _
|
||||||
|
|
||||||
|
@Argument(fullName="jobReport", shortName="jobReport", doc="File where we will write the Queue job report", required=false)
|
||||||
|
var jobReportFile: File = new File("queue_jobreport.gatkreport.txt")
|
||||||
|
|
||||||
|
@Advanced
|
||||||
|
@Argument(fullName="disableJobReport", shortName="disabpleJobReport", doc="If provided, we will not create a job report", required=false)
|
||||||
|
var disableJobReport: Boolean = false
|
||||||
|
|
||||||
|
@ArgumentCollection
|
||||||
|
var rScriptArgs = new RScriptExecutor.RScriptArgumentCollection
|
||||||
|
|
||||||
@ArgumentCollection
|
@ArgumentCollection
|
||||||
val qSettings = new QSettings
|
val qSettings = new QSettings
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,11 +31,12 @@ import org.broadinstitute.sting.jna.lsf.v7_0_6.{LibLsf, LibBat}
|
||||||
import org.broadinstitute.sting.utils.Utils
|
import org.broadinstitute.sting.utils.Utils
|
||||||
import org.broadinstitute.sting.jna.clibrary.LibC
|
import org.broadinstitute.sting.jna.clibrary.LibC
|
||||||
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
|
import org.broadinstitute.sting.jna.lsf.v7_0_6.LibBat.{submitReply, submit}
|
||||||
import com.sun.jna.ptr.IntByReference
|
|
||||||
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
||||||
import com.sun.jna.{Structure, StringArray, NativeLong}
|
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import java.lang.StringBuffer
|
import java.lang.StringBuffer
|
||||||
|
import java.util.Date
|
||||||
|
import com.sun.jna.{Pointer, Structure, StringArray, NativeLong}
|
||||||
|
import com.sun.jna.ptr.{PointerByReference, IntByReference}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs jobs on an LSF compute cluster.
|
* Runs jobs on an LSF compute cluster.
|
||||||
|
|
@ -271,12 +272,20 @@ object Lsf706JobRunner extends Logging {
|
||||||
|
|
||||||
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo))
|
logger.debug("Job Id %s status / exitStatus / exitInfo: 0x%02x / 0x%02x / 0x%02x".format(runner.jobId, jobStatus, exitStatus, exitInfo))
|
||||||
|
|
||||||
|
def updateRunInfo() {
|
||||||
|
// the platform LSF startTimes are in seconds, not milliseconds, so convert to the java convention
|
||||||
|
runner.getRunInfo.startTime = new Date(jobInfo.startTime.longValue * 1000)
|
||||||
|
runner.getRunInfo.doneTime = new Date(jobInfo.endTime.longValue * 1000)
|
||||||
|
}
|
||||||
|
|
||||||
runner.updateStatus(
|
runner.updateStatus(
|
||||||
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
|
if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_DONE)) {
|
||||||
// Done successfully.
|
// Done successfully.
|
||||||
|
updateRunInfo()
|
||||||
RunnerStatus.DONE
|
RunnerStatus.DONE
|
||||||
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
|
} else if (Utils.isFlagSet(jobStatus, LibBat.JOB_STAT_EXIT) && !willRetry(exitInfo, endTime)) {
|
||||||
// Exited function that (probably) won't be retried.
|
// Exited function that (probably) won't be retried.
|
||||||
|
updateRunInfo()
|
||||||
RunnerStatus.FAILED
|
RunnerStatus.FAILED
|
||||||
} else {
|
} else {
|
||||||
// Note that we still saw the job in the system.
|
// Note that we still saw the job in the system.
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ package org.broadinstitute.sting.queue.engine.shell
|
||||||
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
import org.broadinstitute.sting.queue.function.CommandLineFunction
|
||||||
import org.broadinstitute.sting.queue.util.ShellJob
|
import org.broadinstitute.sting.queue.util.ShellJob
|
||||||
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
import org.broadinstitute.sting.queue.engine.{RunnerStatus, CommandLineJobRunner}
|
||||||
|
import java.util.Date
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs jobs one at a time locally
|
* Runs jobs one at a time locally
|
||||||
|
|
@ -50,8 +51,10 @@ class ShellJobRunner(val function: CommandLineFunction) extends CommandLineJobRu
|
||||||
// Allow advanced users to update the job.
|
// Allow advanced users to update the job.
|
||||||
updateJobRun(job)
|
updateJobRun(job)
|
||||||
|
|
||||||
|
getRunInfo.startTime = new Date()
|
||||||
updateStatus(RunnerStatus.RUNNING)
|
updateStatus(RunnerStatus.RUNNING)
|
||||||
job.run()
|
job.run()
|
||||||
|
getRunInfo.doneTime = new Date()
|
||||||
updateStatus(RunnerStatus.DONE)
|
updateStatus(RunnerStatus.DONE)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,14 +30,14 @@ import org.broadinstitute.sting.commandline._
|
||||||
import org.broadinstitute.sting.queue.{QException, QSettings}
|
import org.broadinstitute.sting.queue.{QException, QSettings}
|
||||||
import collection.JavaConversions._
|
import collection.JavaConversions._
|
||||||
import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
|
import org.broadinstitute.sting.queue.function.scattergather.SimpleTextGatherFunction
|
||||||
import org.broadinstitute.sting.queue.util.{Logging, CollectionUtils, IOUtils, ReflectionUtils}
|
import org.broadinstitute.sting.queue.util._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The base interface for all functions in Queue.
|
* The base interface for all functions in Queue.
|
||||||
* Inputs and outputs are specified as Sets of values.
|
* Inputs and outputs are specified as Sets of values.
|
||||||
* Inputs are matched to other outputs by using .equals()
|
* Inputs are matched to other outputs by using .equals()
|
||||||
*/
|
*/
|
||||||
trait QFunction extends Logging {
|
trait QFunction extends Logging with QJobReport {
|
||||||
/** A short description of this step in the graph */
|
/** A short description of this step in the graph */
|
||||||
var analysisName: String = "<function>"
|
var analysisName: String = "<function>"
|
||||||
|
|
||||||
|
|
@ -83,11 +83,17 @@ trait QFunction extends Logging {
|
||||||
*/
|
*/
|
||||||
var deleteIntermediateOutputs = true
|
var deleteIntermediateOutputs = true
|
||||||
|
|
||||||
|
// -------------------------------------------------------
|
||||||
|
//
|
||||||
|
// job run information
|
||||||
|
//
|
||||||
|
// -------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copies settings from this function to another function.
|
* Copies settings from this function to another function.
|
||||||
* @param function QFunction to copy values to.
|
* @param function QFunction to copy values to.
|
||||||
*/
|
*/
|
||||||
def copySettingsTo(function: QFunction) {
|
override def copySettingsTo(function: QFunction) {
|
||||||
function.analysisName = this.analysisName
|
function.analysisName = this.analysisName
|
||||||
function.jobName = this.jobName
|
function.jobName = this.jobName
|
||||||
function.qSettings = this.qSettings
|
function.qSettings = this.qSettings
|
||||||
|
|
@ -99,6 +105,8 @@ trait QFunction extends Logging {
|
||||||
function.updateJobRun = this.updateJobRun
|
function.updateJobRun = this.updateJobRun
|
||||||
function.isIntermediate = this.isIntermediate
|
function.isIntermediate = this.isIntermediate
|
||||||
function.deleteIntermediateOutputs = this.deleteIntermediateOutputs
|
function.deleteIntermediateOutputs = this.deleteIntermediateOutputs
|
||||||
|
function.reportGroup = this.reportGroup
|
||||||
|
function.reportFeatures = this.reportFeatures
|
||||||
}
|
}
|
||||||
|
|
||||||
/** File to redirect any output. Defaults to <jobName>.out */
|
/** File to redirect any output. Defaults to <jobName>.out */
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,166 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2011, The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.broadinstitute.sting.queue.util
|
||||||
|
import org.broadinstitute.sting.queue.function.QFunction
|
||||||
|
import org.broadinstitute.sting.gatk.report.{GATKReportTable, GATKReport}
|
||||||
|
import org.broadinstitute.sting.utils.exceptions.UserException
|
||||||
|
import org.broadinstitute.sting.queue.engine.JobRunInfo
|
||||||
|
import java.io.{FileOutputStream, PrintStream, File}
|
||||||
|
import org.broadinstitute.sting.queue.function.scattergather.{GathererFunction, ScatterFunction}
|
||||||
|
import org.broadinstitute.sting.utils.R.RScriptExecutor.RScriptArgumentCollection
|
||||||
|
import org.broadinstitute.sting.utils.R.RScriptExecutor
|
||||||
|
import org.broadinstitute.sting.queue.QScript
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mixin to add Job info to the class
|
||||||
|
*/
|
||||||
|
trait QJobReport extends Logging {
|
||||||
|
self: QFunction =>
|
||||||
|
|
||||||
|
protected var reportGroup: String = null
|
||||||
|
protected var reportFeatures: Map[String, String] = Map()
|
||||||
|
protected var reportEnabled: Boolean = true
|
||||||
|
|
||||||
|
def includeInReport = reportEnabled
|
||||||
|
def enableReport() { reportEnabled = true }
|
||||||
|
def disableReport() { reportEnabled = false }
|
||||||
|
|
||||||
|
def setRunInfo(info: JobRunInfo) {
|
||||||
|
logger.info("info " + info)
|
||||||
|
reportFeatures = Map(
|
||||||
|
"iteration" -> 1,
|
||||||
|
"analysisName" -> self.analysisName,
|
||||||
|
"jobName" -> QJobReport.workAroundSameJobNames(this),
|
||||||
|
"intermediate" -> self.isIntermediate,
|
||||||
|
"startTime" -> info.getStartTime.getTime,
|
||||||
|
"doneTime" -> info.getDoneTime.getTime,
|
||||||
|
"formattedStartTime" -> info.getFormattedStartTime,
|
||||||
|
"formattedDoneTime" -> info.getFormattedDoneTime,
|
||||||
|
"runtime" -> info.getRuntimeInMs).mapValues((x:Any) => if (x != null) x.toString else "null") ++ reportFeatures
|
||||||
|
|
||||||
|
// // handle the special case of iterations
|
||||||
|
// reportFeatures.get("iteration") match {
|
||||||
|
// case None => reportFeatures("iteration") = 1
|
||||||
|
// case _ => ;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
def getReportGroup = analysisName
|
||||||
|
def getReportFeatures = reportFeatures
|
||||||
|
|
||||||
|
def getReportFeatureNames: List[String] = getReportFeatures.keys.toList
|
||||||
|
def getReportFeature(key: String): String = {
|
||||||
|
getReportFeatures.get(key) match {
|
||||||
|
case Some(x) => x
|
||||||
|
case None => throw new RuntimeException("Get called with key %s but no value was found".format(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getReportName: String = getReportFeature("jobName")
|
||||||
|
|
||||||
|
def configureJobReport(features: Map[String, Any]) {
|
||||||
|
this.reportFeatures = features.mapValues(_.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy the QJobReport information -- todo : what's the best way to do this?
|
||||||
|
override def copySettingsTo(function: QFunction) {
|
||||||
|
self.copySettingsTo(function)
|
||||||
|
function.reportFeatures = this.reportFeatures
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object QJobReport {
|
||||||
|
val JOB_REPORT_QUEUE_SCRIPT = "queueJobReport.R"
|
||||||
|
|
||||||
|
// todo -- fixme to have a unique name for Scatter/gather jobs as well
|
||||||
|
var seenCounter = 1
|
||||||
|
var seenNames = Set[String]()
|
||||||
|
|
||||||
|
def printReport(jobsRaw: Map[QFunction, JobRunInfo], dest: File) {
|
||||||
|
val jobs = jobsRaw.filter(_._2.isFilledIn).filter(_._1.includeInReport)
|
||||||
|
jobs foreach {case (qf, info) => qf.setRunInfo(info)}
|
||||||
|
val stream = new PrintStream(new FileOutputStream(dest))
|
||||||
|
printJobLogging(jobs.keys.toList, stream)
|
||||||
|
stream.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
def plotReport(args: RScriptArgumentCollection, jobReportFile: File) {
|
||||||
|
val executor = new RScriptExecutor(args, false) // don't except on error
|
||||||
|
val pdf = jobReportFile.getAbsolutePath + ".pdf"
|
||||||
|
executor.callRScripts(JOB_REPORT_QUEUE_SCRIPT, jobReportFile.getAbsolutePath, pdf)
|
||||||
|
}
|
||||||
|
|
||||||
|
def workAroundSameJobNames(func: QFunction):String = {
|
||||||
|
if ( seenNames.apply(func.jobName) ) {
|
||||||
|
seenCounter += 1
|
||||||
|
"%s_%d".format(func.jobName, seenCounter)
|
||||||
|
} else {
|
||||||
|
seenNames += func.jobName
|
||||||
|
func.jobName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prints the JobLogging logs to a GATKReport. First splits up the
|
||||||
|
* logs by group, and for each group generates a GATKReportTable
|
||||||
|
*/
|
||||||
|
private def printJobLogging(logs: List[QFunction], stream: PrintStream) {
|
||||||
|
// create the report
|
||||||
|
val report: GATKReport = new GATKReport
|
||||||
|
|
||||||
|
// create a table for each group of logs
|
||||||
|
for ( (group, groupLogs) <- groupLogs(logs) ) {
|
||||||
|
report.addTable(group, "Job logs for " + group)
|
||||||
|
val table: GATKReportTable = report.getTable(group)
|
||||||
|
table.addPrimaryKey("jobName", false)
|
||||||
|
val keys = logKeys(groupLogs)
|
||||||
|
|
||||||
|
// add the columns
|
||||||
|
keys.foreach(table.addColumn(_, 0))
|
||||||
|
for (log <- groupLogs) {
|
||||||
|
for ( key <- keys )
|
||||||
|
table.set(log.getReportName, key, log.getReportFeature(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
report.print(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def groupLogs(logs: List[QFunction]): Map[String, List[QFunction]] = {
|
||||||
|
logs.groupBy(_.getReportGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def logKeys(logs: List[QFunction]): Set[String] = {
|
||||||
|
// the keys should be the same for each log, but we will check that
|
||||||
|
val keys = Set[String](logs(0).getReportFeatureNames : _*)
|
||||||
|
|
||||||
|
for ( log <- logs )
|
||||||
|
if ( keys.sameElements(Set(log.getReportFeatureNames)) )
|
||||||
|
throw new UserException(("All JobLogging jobs in the same group must have the same set of features. " +
|
||||||
|
"We found one with %s and another with %s").format(keys, log.getReportFeatureNames))
|
||||||
|
|
||||||
|
keys
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue