Implemented TreeReducible - if num threads > 1, the output will be accumulated in memory and written to a vcf file at the end - in onTraveralDone(..). If num threads == 1, things will work as before - where vcf records are written to disk as soon as they are computed with map(..).
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3530 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
3b375cb237
commit
06fc5eecf8
|
|
@ -56,6 +56,7 @@ import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTa
|
|||
import org.broadinstitute.sting.gatk.walkers.By;
|
||||
import org.broadinstitute.sting.gatk.walkers.DataSource;
|
||||
import org.broadinstitute.sting.gatk.walkers.RodWalker;
|
||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.walkers.annotator.VariantAnnotatorEngine;
|
||||
import org.broadinstitute.sting.utils.BaseUtils;
|
||||
import org.broadinstitute.sting.utils.SampleUtils;
|
||||
|
|
@ -73,7 +74,7 @@ import org.broadinstitute.sting.utils.genotype.vcf.VCFWriter;
|
|||
//@Allows(value={DataSource.READS, DataSource.REFERENCE})
|
||||
//@Reference(window=@Window(start=-50,stop=50))
|
||||
@By(DataSource.REFERENCE)
|
||||
public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
||||
public class GenomicAnnotator extends RodWalker<LinkedList<VCFRecord>, LinkedList<VCFRecord>> implements TreeReducible<LinkedList<VCFRecord>> {
|
||||
@Argument(fullName="vcfOutput", shortName="vcf", doc="VCF file to which all variants should be written with annotations", required=true)
|
||||
protected File VCF_OUT;
|
||||
|
||||
|
|
@ -96,19 +97,20 @@ public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
|||
|
||||
private boolean strict = true;
|
||||
|
||||
private boolean multiThreadedMode = false; //whether map will be called by more than one thread.
|
||||
|
||||
/**
|
||||
* Prepare the output file and the list of available features.
|
||||
*/
|
||||
public void initialize() {
|
||||
|
||||
multiThreadedMode = getToolkit().getArguments().numberOfThreads > 1;
|
||||
|
||||
// get the list of all sample names from the various VCF input rods
|
||||
TreeSet<String> samples = new TreeSet<String>();
|
||||
SampleUtils.getUniquifiedSamplesFromRods(getToolkit(), samples, new HashMap<Pair<String, String>, String>());
|
||||
|
||||
|
||||
|
||||
|
||||
//read all ROD file headers and construct a set of all column names to be used for validation of command-line args
|
||||
final Set<String> allFullyQualifiedColumnNames = new LinkedHashSet<String>();
|
||||
final Set<String> allBindingNames = new LinkedHashSet<String>();
|
||||
|
|
@ -247,7 +249,7 @@ public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
|||
*
|
||||
* @return 0
|
||||
*/
|
||||
public Integer reduceInit() { return 0; }
|
||||
public LinkedList<VCFRecord> reduceInit() { return new LinkedList<VCFRecord>(); }
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -265,23 +267,25 @@ public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
|||
* @param context the context for the given locus
|
||||
* @return 1 if the locus was successfully processed, 0 if otherwise
|
||||
*/
|
||||
public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
|
||||
public LinkedList<VCFRecord> map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
|
||||
LinkedList<VCFRecord> result = new LinkedList<VCFRecord>();
|
||||
|
||||
if ( tracker == null )
|
||||
return 0;
|
||||
return result;
|
||||
|
||||
List<Object> rods = tracker.getReferenceMetaData("variant");
|
||||
// ignore places where we don't have a variant
|
||||
if ( rods.size() == 0 )
|
||||
return 0;
|
||||
return result;
|
||||
|
||||
Object variant = rods.get(0);
|
||||
if( BaseUtils.isNBase(ref.getBase()) ) {
|
||||
return 0; //TODO Currently, VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()) fails when base is 'N'. is this right?
|
||||
return result; //TODO Currently, VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()) fails when base is 'N'. is this right?
|
||||
}
|
||||
|
||||
VariantContext vc = VariantContextAdaptors.toVariantContext("variant", variant, ref);
|
||||
if ( vc == null )
|
||||
return 0;
|
||||
return result;
|
||||
|
||||
// if the reference base is not ambiguous, we can annotate
|
||||
Collection<VariantContext> annotatedVCs = Arrays.asList(vc);
|
||||
|
|
@ -292,40 +296,69 @@ public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
|||
}
|
||||
}
|
||||
|
||||
if ( variant instanceof VCFRecord) { //TODO is this if-statement necessary?
|
||||
if(multiThreadedMode) {
|
||||
//keep results in memory, only writing them in onTraversalDone(..) after they have been merged via treeReduce(..)
|
||||
for(VariantContext annotatedVC : annotatedVCs ) {
|
||||
result.add(VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()));
|
||||
}
|
||||
} else {
|
||||
//write results to disk immediately
|
||||
for(VariantContext annotatedVC : annotatedVCs ) {
|
||||
vcfWriter.addRecord(VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()));
|
||||
}
|
||||
}
|
||||
|
||||
return annotatedVCs.size();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Increment the number of loci processed.
|
||||
* Merge lists.
|
||||
*
|
||||
* @param value result of the map.
|
||||
* @param sum accumulator for the reduce.
|
||||
* @return the new number of loci processed.
|
||||
*/
|
||||
public Integer reduce(Integer value, Integer sum) {
|
||||
return sum + value;
|
||||
public LinkedList<VCFRecord> reduce(LinkedList<VCFRecord> value, LinkedList<VCFRecord> sum) {
|
||||
sum.addAll(value);
|
||||
return sum;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Merge lists.
|
||||
*/
|
||||
public LinkedList<VCFRecord> treeReduce(LinkedList<VCFRecord> lhs, LinkedList<VCFRecord> rhs) {
|
||||
lhs.addAll(rhs);
|
||||
return lhs;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Tell the user the number of loci processed and close out the new variants file.
|
||||
*
|
||||
* @param result the number of loci seen.
|
||||
*/
|
||||
public void onTraversalDone(Integer totalOutputVCFRecords) {
|
||||
out.printf("Generated %d annotated VCF records.\n", totalOutputVCFRecords);
|
||||
public void onTraversalDone(LinkedList<VCFRecord> totalOutputVCFRecords) {
|
||||
if(multiThreadedMode) {
|
||||
//finally write results to disk
|
||||
for(VCFRecord vcfRecord : totalOutputVCFRecords ) {
|
||||
vcfWriter.addRecord(vcfRecord);
|
||||
}
|
||||
}
|
||||
|
||||
//out.printf("Generated %d annotated VCF records.\n", totalOutputVCFRecords);
|
||||
Map<String, Integer> inputTableHitCounter = engine.getInputTableHitCounter();
|
||||
for(Entry<String, Integer> e : inputTableHitCounter.entrySet()) {
|
||||
final String bindingName = e.getKey();
|
||||
final int counter = e.getValue();
|
||||
final float percent = 100 * counter /(float) totalOutputVCFRecords;
|
||||
out.printf(" %-6.1f%% (%d) annotated with %s.\n", percent, counter, bindingName );
|
||||
//final float percent = 100 * counter /(float) totalOutputVCFRecords;
|
||||
//out.printf(" %-6.1f%% (%d) annotated with %s.\n", percent, counter, bindingName );
|
||||
out.printf(" %d annotated with %s.\n", counter, bindingName );
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -333,6 +366,5 @@ public class GenomicAnnotator extends RodWalker<Integer, Integer> {
|
|||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue