diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/SortingVCFWriterBase.java b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/SortingVCFWriterBase.java index c299511db..84ecc7fcd 100755 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/SortingVCFWriterBase.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/vcf/SortingVCFWriterBase.java @@ -27,10 +27,8 @@ package org.broadinstitute.sting.utils.codecs.vcf; import org.broadinstitute.sting.utils.variantcontext.VariantContext; -import java.util.Comparator; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; +import java.util.concurrent.PriorityBlockingQueue; /** * This class writes VCF files, allowing records to be passed in unsorted. @@ -39,20 +37,26 @@ import java.util.TreeSet; public abstract class SortingVCFWriterBase implements VCFWriter { // The VCFWriter to which to actually write the sorted VCF records - private VCFWriter innerWriter = null; + private final VCFWriter innerWriter; // the current queue of un-emitted records - private PriorityQueue queue = null; + private final Queue queue; // The locus until which we are permitted to write out (inclusive) protected Integer mostUpstreamWritableLoc; protected static final int BEFORE_MOST_UPSTREAM_LOC = 0; // No real locus index is <= 0 // The set of chromosomes already passed over and to which it is forbidden to return - private Set finishedChromosomes = null; + private final Set finishedChromosomes; // Should we call innerWriter.close() in close() - private boolean takeOwnershipOfInner; + private final boolean takeOwnershipOfInner; + + // -------------------------------------------------------------------------------- + // + // Constructors + // + // -------------------------------------------------------------------------------- /** * create a local-sorting VCF writer, given an inner VCF writer to write to @@ -62,16 +66,27 @@ public abstract class SortingVCFWriterBase implements VCFWriter { */ public SortingVCFWriterBase(VCFWriter innerWriter, boolean takeOwnershipOfInner) { this.innerWriter = innerWriter; - this.queue = new PriorityQueue(50, new VariantContextComparator()); - this.mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC; this.finishedChromosomes = new TreeSet(); this.takeOwnershipOfInner = takeOwnershipOfInner; + + // has to be PriorityBlockingQueue to be thread-safe + // see http://getsatisfaction.com/gsa/topics/missing_loci_output_in_multi_thread_mode_when_implement_sortingvcfwriterbase?utm_content=topic_link&utm_medium=email&utm_source=new_topic + this.queue = new PriorityBlockingQueue(50, new VariantContextComparator()); + + this.mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC; } public SortingVCFWriterBase(VCFWriter innerWriter) { this(innerWriter, false); // by default, don't own inner } + // -------------------------------------------------------------------------------- + // + // public interface functions + // + // -------------------------------------------------------------------------------- + + @Override public void writeHeader(VCFHeader header) { innerWriter.writeHeader(header); } @@ -79,6 +94,7 @@ public abstract class SortingVCFWriterBase implements VCFWriter { /** * attempt to close the VCF file; we need to flush the queue first */ + @Override public void close() { stopWaitingToSort(); @@ -86,27 +102,14 @@ public abstract class SortingVCFWriterBase implements VCFWriter { innerWriter.close(); } - private void stopWaitingToSort() { - emitRecords(true); - mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC; - } - - protected void emitSafeRecords() { - emitRecords(false); - } - - protected void noteCurrentRecord(VariantContext vc) { - // did the user break the contract by giving a record too late? - if (mostUpstreamWritableLoc != null && vc.getStart() < mostUpstreamWritableLoc) // went too far back, since may have already written anything that is <= mostUpstreamWritableLoc - throw new IllegalArgumentException("Permitted to write any record upstream of position " + mostUpstreamWritableLoc + ", but a record at " + vc.getChr() + ":" + vc.getStart() + " was just added."); - } /** * add a record to the file * * @param vc the Variant Context object */ - public void add(VariantContext vc) { + @Override + public synchronized void add(VariantContext vc) { /* Note that the code below does not prevent the successive add()-ing of: (chr1, 10), (chr20, 200), (chr15, 100) since there is no implicit ordering of chromosomes: */ @@ -125,7 +128,43 @@ public abstract class SortingVCFWriterBase implements VCFWriter { emitSafeRecords(); } - private void emitRecords(boolean emitUnsafe) { + /** + * Gets a string representation of this object. + * @return a string representation of this object + */ + @Override + public String toString() { + return getClass().getName(); + } + + // -------------------------------------------------------------------------------- + // + // protected interface functions for subclasses to use + // + // -------------------------------------------------------------------------------- + + private synchronized void stopWaitingToSort() { + emitRecords(true); + mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC; + } + + protected synchronized void emitSafeRecords() { + emitRecords(false); + } + + protected void noteCurrentRecord(VariantContext vc) { + // did the user break the contract by giving a record too late? + if (mostUpstreamWritableLoc != null && vc.getStart() < mostUpstreamWritableLoc) // went too far back, since may have already written anything that is <= mostUpstreamWritableLoc + throw new IllegalArgumentException("Permitted to write any record upstream of position " + mostUpstreamWritableLoc + ", but a record at " + vc.getChr() + ":" + vc.getStart() + " was just added."); + } + + // -------------------------------------------------------------------------------- + // + // private implementation functions + // + // -------------------------------------------------------------------------------- + + private synchronized void emitRecords(boolean emitUnsafe) { while (!queue.isEmpty()) { VCFRecord firstRec = queue.peek(); @@ -140,15 +179,6 @@ public abstract class SortingVCFWriterBase implements VCFWriter { } } - /** - * Gets a string representation of this object. - * @return a string representation of this object - */ - @Override - public String toString() { - return getClass().getName(); - } - private static class VariantContextComparator implements Comparator { public int compare(VCFRecord r1, VCFRecord r2) { return r1.vc.getStart() - r2.vc.getStart();