Updated SortingVCFWriterBase to use PriorityBlockingQueue so that the class is thread-safe

-- Uses PriorityBlockingQueue instead of PriorityQueue
-- synchronized keywords added to all key functions that modify internal state

Note that this hasn't been tested extensivesly.  Based on report:

http://getsatisfaction.com/gsa/topics/missing_loci_output_in_multi_thread_mode_when_implement_sortingvcfwriterbase?utm_content=topic_link&utm_medium=email&utm_source=new_topic
This commit is contained in:
Mark DePristo 2012-01-13 09:24:13 -05:00
parent 28aa353501
commit b06074d6e7
1 changed files with 65 additions and 35 deletions

View File

@ -27,10 +27,8 @@ package org.broadinstitute.sting.utils.codecs.vcf;
import org.broadinstitute.sting.utils.variantcontext.VariantContext;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
/**
* This class writes VCF files, allowing records to be passed in unsorted.
@ -39,20 +37,26 @@ import java.util.TreeSet;
public abstract class SortingVCFWriterBase implements VCFWriter {
// The VCFWriter to which to actually write the sorted VCF records
private VCFWriter innerWriter = null;
private final VCFWriter innerWriter;
// the current queue of un-emitted records
private PriorityQueue<VCFRecord> queue = null;
private final Queue<VCFRecord> queue;
// The locus until which we are permitted to write out (inclusive)
protected Integer mostUpstreamWritableLoc;
protected static final int BEFORE_MOST_UPSTREAM_LOC = 0; // No real locus index is <= 0
// The set of chromosomes already passed over and to which it is forbidden to return
private Set<String> finishedChromosomes = null;
private final Set<String> finishedChromosomes;
// Should we call innerWriter.close() in close()
private boolean takeOwnershipOfInner;
private final boolean takeOwnershipOfInner;
// --------------------------------------------------------------------------------
//
// Constructors
//
// --------------------------------------------------------------------------------
/**
* create a local-sorting VCF writer, given an inner VCF writer to write to
@ -62,16 +66,27 @@ public abstract class SortingVCFWriterBase implements VCFWriter {
*/
public SortingVCFWriterBase(VCFWriter innerWriter, boolean takeOwnershipOfInner) {
this.innerWriter = innerWriter;
this.queue = new PriorityQueue<VCFRecord>(50, new VariantContextComparator());
this.mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC;
this.finishedChromosomes = new TreeSet<String>();
this.takeOwnershipOfInner = takeOwnershipOfInner;
// has to be PriorityBlockingQueue to be thread-safe
// see http://getsatisfaction.com/gsa/topics/missing_loci_output_in_multi_thread_mode_when_implement_sortingvcfwriterbase?utm_content=topic_link&utm_medium=email&utm_source=new_topic
this.queue = new PriorityBlockingQueue<VCFRecord>(50, new VariantContextComparator());
this.mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC;
}
public SortingVCFWriterBase(VCFWriter innerWriter) {
this(innerWriter, false); // by default, don't own inner
}
// --------------------------------------------------------------------------------
//
// public interface functions
//
// --------------------------------------------------------------------------------
@Override
public void writeHeader(VCFHeader header) {
innerWriter.writeHeader(header);
}
@ -79,6 +94,7 @@ public abstract class SortingVCFWriterBase implements VCFWriter {
/**
* attempt to close the VCF file; we need to flush the queue first
*/
@Override
public void close() {
stopWaitingToSort();
@ -86,27 +102,14 @@ public abstract class SortingVCFWriterBase implements VCFWriter {
innerWriter.close();
}
private void stopWaitingToSort() {
emitRecords(true);
mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC;
}
protected void emitSafeRecords() {
emitRecords(false);
}
protected void noteCurrentRecord(VariantContext vc) {
// did the user break the contract by giving a record too late?
if (mostUpstreamWritableLoc != null && vc.getStart() < mostUpstreamWritableLoc) // went too far back, since may have already written anything that is <= mostUpstreamWritableLoc
throw new IllegalArgumentException("Permitted to write any record upstream of position " + mostUpstreamWritableLoc + ", but a record at " + vc.getChr() + ":" + vc.getStart() + " was just added.");
}
/**
* add a record to the file
*
* @param vc the Variant Context object
*/
public void add(VariantContext vc) {
@Override
public synchronized void add(VariantContext vc) {
/* Note that the code below does not prevent the successive add()-ing of: (chr1, 10), (chr20, 200), (chr15, 100)
since there is no implicit ordering of chromosomes:
*/
@ -125,7 +128,43 @@ public abstract class SortingVCFWriterBase implements VCFWriter {
emitSafeRecords();
}
private void emitRecords(boolean emitUnsafe) {
/**
* Gets a string representation of this object.
* @return a string representation of this object
*/
@Override
public String toString() {
return getClass().getName();
}
// --------------------------------------------------------------------------------
//
// protected interface functions for subclasses to use
//
// --------------------------------------------------------------------------------
private synchronized void stopWaitingToSort() {
emitRecords(true);
mostUpstreamWritableLoc = BEFORE_MOST_UPSTREAM_LOC;
}
protected synchronized void emitSafeRecords() {
emitRecords(false);
}
protected void noteCurrentRecord(VariantContext vc) {
// did the user break the contract by giving a record too late?
if (mostUpstreamWritableLoc != null && vc.getStart() < mostUpstreamWritableLoc) // went too far back, since may have already written anything that is <= mostUpstreamWritableLoc
throw new IllegalArgumentException("Permitted to write any record upstream of position " + mostUpstreamWritableLoc + ", but a record at " + vc.getChr() + ":" + vc.getStart() + " was just added.");
}
// --------------------------------------------------------------------------------
//
// private implementation functions
//
// --------------------------------------------------------------------------------
private synchronized void emitRecords(boolean emitUnsafe) {
while (!queue.isEmpty()) {
VCFRecord firstRec = queue.peek();
@ -140,15 +179,6 @@ public abstract class SortingVCFWriterBase implements VCFWriter {
}
}
/**
* Gets a string representation of this object.
* @return a string representation of this object
*/
@Override
public String toString() {
return getClass().getName();
}
private static class VariantContextComparator implements Comparator<VCFRecord> {
public int compare(VCFRecord r1, VCFRecord r2) {
return r1.vc.getStart() - r2.vc.getStart();