diff --git a/java/src/org/broadinstitute/sting/playground/gatk/walkers/annotator/GenomicAnnotator.java b/java/src/org/broadinstitute/sting/playground/gatk/walkers/annotator/GenomicAnnotator.java index cf08e9b58..914c356c7 100644 --- a/java/src/org/broadinstitute/sting/playground/gatk/walkers/annotator/GenomicAnnotator.java +++ b/java/src/org/broadinstitute/sting/playground/gatk/walkers/annotator/GenomicAnnotator.java @@ -56,6 +56,7 @@ import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTa import org.broadinstitute.sting.gatk.walkers.By; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.RodWalker; +import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.annotator.VariantAnnotatorEngine; import org.broadinstitute.sting.utils.BaseUtils; import org.broadinstitute.sting.utils.SampleUtils; @@ -73,7 +74,7 @@ import org.broadinstitute.sting.utils.genotype.vcf.VCFWriter; //@Allows(value={DataSource.READS, DataSource.REFERENCE}) //@Reference(window=@Window(start=-50,stop=50)) @By(DataSource.REFERENCE) -public class GenomicAnnotator extends RodWalker { +public class GenomicAnnotator extends RodWalker, LinkedList> implements TreeReducible> { @Argument(fullName="vcfOutput", shortName="vcf", doc="VCF file to which all variants should be written with annotations", required=true) protected File VCF_OUT; @@ -96,19 +97,20 @@ public class GenomicAnnotator extends RodWalker { private boolean strict = true; + private boolean multiThreadedMode = false; //whether map will be called by more than one thread. /** * Prepare the output file and the list of available features. */ public void initialize() { + multiThreadedMode = getToolkit().getArguments().numberOfThreads > 1; + // get the list of all sample names from the various VCF input rods TreeSet samples = new TreeSet(); SampleUtils.getUniquifiedSamplesFromRods(getToolkit(), samples, new HashMap, String>()); - - //read all ROD file headers and construct a set of all column names to be used for validation of command-line args final Set allFullyQualifiedColumnNames = new LinkedHashSet(); final Set allBindingNames = new LinkedHashSet(); @@ -247,7 +249,7 @@ public class GenomicAnnotator extends RodWalker { * * @return 0 */ - public Integer reduceInit() { return 0; } + public LinkedList reduceInit() { return new LinkedList(); } /** @@ -265,23 +267,25 @@ public class GenomicAnnotator extends RodWalker { * @param context the context for the given locus * @return 1 if the locus was successfully processed, 0 if otherwise */ - public Integer map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) { + public LinkedList map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) { + LinkedList result = new LinkedList(); + if ( tracker == null ) - return 0; + return result; List rods = tracker.getReferenceMetaData("variant"); // ignore places where we don't have a variant if ( rods.size() == 0 ) - return 0; + return result; Object variant = rods.get(0); if( BaseUtils.isNBase(ref.getBase()) ) { - return 0; //TODO Currently, VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()) fails when base is 'N'. is this right? + return result; //TODO Currently, VariantContextAdaptors.toVCF(annotatedVC, ref.getBase()) fails when base is 'N'. is this right? } VariantContext vc = VariantContextAdaptors.toVariantContext("variant", variant, ref); if ( vc == null ) - return 0; + return result; // if the reference base is not ambiguous, we can annotate Collection annotatedVCs = Arrays.asList(vc); @@ -292,40 +296,69 @@ public class GenomicAnnotator extends RodWalker { } } - if ( variant instanceof VCFRecord) { //TODO is this if-statement necessary? + if(multiThreadedMode) { + //keep results in memory, only writing them in onTraversalDone(..) after they have been merged via treeReduce(..) + for(VariantContext annotatedVC : annotatedVCs ) { + result.add(VariantContextAdaptors.toVCF(annotatedVC, ref.getBase())); + } + } else { + //write results to disk immediately for(VariantContext annotatedVC : annotatedVCs ) { vcfWriter.addRecord(VariantContextAdaptors.toVCF(annotatedVC, ref.getBase())); } } - return annotatedVCs.size(); + + return result; } /** - * Increment the number of loci processed. + * Merge lists. * * @param value result of the map. * @param sum accumulator for the reduce. * @return the new number of loci processed. */ - public Integer reduce(Integer value, Integer sum) { - return sum + value; + public LinkedList reduce(LinkedList value, LinkedList sum) { + sum.addAll(value); + return sum; } + + + /** + * Merge lists. + */ + public LinkedList treeReduce(LinkedList lhs, LinkedList rhs) { + lhs.addAll(rhs); + return lhs; + } + + + + /** * Tell the user the number of loci processed and close out the new variants file. * * @param result the number of loci seen. */ - public void onTraversalDone(Integer totalOutputVCFRecords) { - out.printf("Generated %d annotated VCF records.\n", totalOutputVCFRecords); + public void onTraversalDone(LinkedList totalOutputVCFRecords) { + if(multiThreadedMode) { + //finally write results to disk + for(VCFRecord vcfRecord : totalOutputVCFRecords ) { + vcfWriter.addRecord(vcfRecord); + } + } + + //out.printf("Generated %d annotated VCF records.\n", totalOutputVCFRecords); Map inputTableHitCounter = engine.getInputTableHitCounter(); for(Entry e : inputTableHitCounter.entrySet()) { final String bindingName = e.getKey(); final int counter = e.getValue(); - final float percent = 100 * counter /(float) totalOutputVCFRecords; - out.printf(" %-6.1f%% (%d) annotated with %s.\n", percent, counter, bindingName ); + //final float percent = 100 * counter /(float) totalOutputVCFRecords; + //out.printf(" %-6.1f%% (%d) annotated with %s.\n", percent, counter, bindingName ); + out.printf(" %d annotated with %s.\n", counter, bindingName ); } @@ -333,6 +366,5 @@ public class GenomicAnnotator extends RodWalker { } - }