Actual working version of unflushing VCFWriter

-- Uses high-performance local writer backed by byte array that writes the entire VCF line in some write operation to the underlying output stream.
-- Fixes problems with indexing of unflushed writes while still allowing efficient block zipping
-- Same (or better) IO performance as previous implementation
-- IndexingVariantContextWriter now properly closes the underlying output stream when it's closed
-- Updated compressed VCF output file
This commit is contained in:
Mark DePristo 2012-12-13 12:24:08 -05:00
parent 5e66109268
commit aeab932c63
3 changed files with 97 additions and 47 deletions

View File

@ -96,7 +96,7 @@ public class UnifiedGenotyperIntegrationTest extends WalkerTest {
// //
// -------------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------
private final static String COMPRESSED_OUTPUT_MD5 = "3eba6c309514d1e9ee06a20a112b68e6"; private final static String COMPRESSED_OUTPUT_MD5 = "af8187e2baf516dde1cddea787a52b8a";
@Test @Test
public void testCompressedOutput() { public void testCompressedOutput() {

View File

@ -93,18 +93,21 @@ abstract class IndexingVariantContextWriter implements VariantContextWriter {
* attempt to close the VCF file * attempt to close the VCF file
*/ */
public void close() { public void close() {
try {
// try to close the index stream (keep it separate to help debugging efforts) // try to close the index stream (keep it separate to help debugging efforts)
if ( indexer != null ) { if ( indexer != null ) {
try {
Index index = indexer.finalizeIndex(positionalOutputStream.getPosition()); Index index = indexer.finalizeIndex(positionalOutputStream.getPosition());
IndexDictionaryUtils.setIndexSequenceDictionary(index, refDict); IndexDictionaryUtils.setIndexSequenceDictionary(index, refDict);
index.write(idxStream); index.write(idxStream);
idxStream.close(); idxStream.close();
}
// close the underlying output stream as well
outputStream.close();
} catch (IOException e) { } catch (IOException e) {
throw new ReviewedStingException("Unable to close index for " + getStreamName(), e); throw new ReviewedStingException("Unable to close index for " + getStreamName(), e);
} }
} }
}
/** /**
* @return the reference sequence dictionary used for the variant contexts being written * @return the reference sequence dictionary used for the variant contexts being written

View File

@ -34,6 +34,7 @@ import org.broadinstitute.sting.utils.variantcontext.*;
import java.io.*; import java.io.*;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.nio.charset.Charset;
import java.util.*; import java.util.*;
/** /**
@ -42,9 +43,6 @@ import java.util.*;
class VCFWriter extends IndexingVariantContextWriter { class VCFWriter extends IndexingVariantContextWriter {
private final static String VERSION_LINE = VCFHeader.METADATA_INDICATOR + VCFHeaderVersion.VCF4_1.getFormatString() + "=" + VCFHeaderVersion.VCF4_1.getVersionString(); private final static String VERSION_LINE = VCFHeader.METADATA_INDICATOR + VCFHeaderVersion.VCF4_1.getFormatString() + "=" + VCFHeaderVersion.VCF4_1.getVersionString();
// the print stream we're writing to
final protected BufferedWriter mWriter;
// should we write genotypes or just sites? // should we write genotypes or just sites?
final protected boolean doNotWriteGenotypes; final protected boolean doNotWriteGenotypes;
@ -53,15 +51,33 @@ class VCFWriter extends IndexingVariantContextWriter {
final private boolean allowMissingFieldsInHeader; final private boolean allowMissingFieldsInHeader;
/**
* The VCF writer uses an internal Writer, based by the ByteArrayOutputStream lineBuffer,
* to temp. buffer the header and per-site output before flushing the per line output
* in one go to the super.getOutputStream. This results in high-performance, proper encoding,
* and allows us to avoid flushing explicitly the output stream getOutputStream, which
* allows us to properly compress vcfs in gz format without breaking indexing on the fly
* for uncompressed streams.
*/
private static final int INITIAL_BUFFER_SIZE = 1024 * 16;
private final ByteArrayOutputStream lineBuffer = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
private final Writer writer;
/**
* The encoding used for VCF files. ISO-8859-1
*/
final private Charset charset;
private IntGenotypeFieldAccessors intGenotypeFieldAccessors = new IntGenotypeFieldAccessors(); private IntGenotypeFieldAccessors intGenotypeFieldAccessors = new IntGenotypeFieldAccessors();
public VCFWriter(final File location, final OutputStream output, final SAMSequenceDictionary refDict, public VCFWriter(final File location, final OutputStream output, final SAMSequenceDictionary refDict,
final boolean enableOnTheFlyIndexing, boolean doNotWriteGenotypes, final boolean enableOnTheFlyIndexing, boolean doNotWriteGenotypes,
final boolean allowMissingFieldsInHeader ) { final boolean allowMissingFieldsInHeader ) {
super(writerName(location, output), location, output, refDict, enableOnTheFlyIndexing); super(writerName(location, output), location, output, refDict, enableOnTheFlyIndexing);
mWriter = new BufferedWriter(new OutputStreamWriter(getOutputStream())); // todo -- fix buffer size
this.doNotWriteGenotypes = doNotWriteGenotypes; this.doNotWriteGenotypes = doNotWriteGenotypes;
this.allowMissingFieldsInHeader = allowMissingFieldsInHeader; this.allowMissingFieldsInHeader = allowMissingFieldsInHeader;
this.charset = Charset.forName("ISO-8859-1");
this.writer = new OutputStreamWriter(lineBuffer, charset);
} }
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
@ -70,14 +86,44 @@ class VCFWriter extends IndexingVariantContextWriter {
// //
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
/**
* Write String s to the internal buffered writer.
*
* flushBuffer() must be called to actually write the data to the true output stream.
*
* @param s the string to write
* @throws IOException
*/
private void write(final String s) throws IOException {
writer.write(s);
}
/**
* Actually write the line buffer contents to the destination output stream.
*
* After calling this function the line buffer is reset, so the contents of the buffer can be reused
*
* @throws IOException
*/
private void flushBuffer() throws IOException {
writer.flush();
getOutputStream().write(lineBuffer.toByteArray());
lineBuffer.reset();
}
@Override @Override
public void writeHeader(VCFHeader header) { public void writeHeader(VCFHeader header) {
// note we need to update the mHeader object after this call because they header // note we need to update the mHeader object after this call because they header
// may have genotypes trimmed out of it, if doNotWriteGenotypes is true // may have genotypes trimmed out of it, if doNotWriteGenotypes is true
mHeader = writeHeader(header, mWriter, doNotWriteGenotypes, getVersionLine(), getStreamName()); try {
mHeader = writeHeader(header, writer, doNotWriteGenotypes, getVersionLine(), getStreamName());
flushBuffer();
} catch ( IOException e ) {
throw new UserException.CouldNotCreateOutputFile(getStreamName(), e);
}
} }
public static final String getVersionLine() { public static String getVersionLine() {
return VERSION_LINE; return VERSION_LINE;
} }
@ -138,8 +184,8 @@ class VCFWriter extends IndexingVariantContextWriter {
public void close() { public void close() {
// try to close the vcf stream // try to close the vcf stream
try { try {
mWriter.flush(); // TODO -- would it be useful to null out the line buffer so we don't have it around unnecessarily?
mWriter.close(); writer.close();
} catch (IOException e) { } catch (IOException e) {
throw new ReviewedStingException("Unable to close " + getStreamName(), e); throw new ReviewedStingException("Unable to close " + getStreamName(), e);
} }
@ -166,51 +212,51 @@ class VCFWriter extends IndexingVariantContextWriter {
Map<Allele, String> alleleMap = buildAlleleMap(vc); Map<Allele, String> alleleMap = buildAlleleMap(vc);
// CHROM // CHROM
mWriter.write(vc.getChr()); write(vc.getChr());
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// POS // POS
mWriter.write(String.valueOf(vc.getStart())); write(String.valueOf(vc.getStart()));
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// ID // ID
String ID = vc.getID(); String ID = vc.getID();
mWriter.write(ID); write(ID);
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// REF // REF
String refString = vc.getReference().getDisplayString(); String refString = vc.getReference().getDisplayString();
mWriter.write(refString); write(refString);
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// ALT // ALT
if ( vc.isVariant() ) { if ( vc.isVariant() ) {
Allele altAllele = vc.getAlternateAllele(0); Allele altAllele = vc.getAlternateAllele(0);
String alt = altAllele.getDisplayString(); String alt = altAllele.getDisplayString();
mWriter.write(alt); write(alt);
for (int i = 1; i < vc.getAlternateAlleles().size(); i++) { for (int i = 1; i < vc.getAlternateAlleles().size(); i++) {
altAllele = vc.getAlternateAllele(i); altAllele = vc.getAlternateAllele(i);
alt = altAllele.getDisplayString(); alt = altAllele.getDisplayString();
mWriter.write(","); write(",");
mWriter.write(alt); write(alt);
} }
} else { } else {
mWriter.write(VCFConstants.EMPTY_ALTERNATE_ALLELE_FIELD); write(VCFConstants.EMPTY_ALTERNATE_ALLELE_FIELD);
} }
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// QUAL // QUAL
if ( !vc.hasLog10PError() ) if ( !vc.hasLog10PError() )
mWriter.write(VCFConstants.MISSING_VALUE_v4); write(VCFConstants.MISSING_VALUE_v4);
else else
mWriter.write(formatQualValue(vc.getPhredScaledQual())); write(formatQualValue(vc.getPhredScaledQual()));
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// FILTER // FILTER
String filters = getFilterString(vc); String filters = getFilterString(vc);
mWriter.write(filters); write(filters);
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
// INFO // INFO
Map<String, String> infoFields = new TreeMap<String, String>(); Map<String, String> infoFields = new TreeMap<String, String>();
@ -229,8 +275,8 @@ class VCFWriter extends IndexingVariantContextWriter {
// FORMAT // FORMAT
final GenotypesContext gc = vc.getGenotypes(); final GenotypesContext gc = vc.getGenotypes();
if ( gc.isLazyWithData() && ((LazyGenotypesContext)gc).getUnparsedGenotypeData() instanceof String ) { if ( gc.isLazyWithData() && ((LazyGenotypesContext)gc).getUnparsedGenotypeData() instanceof String ) {
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
mWriter.write(((LazyGenotypesContext)gc).getUnparsedGenotypeData().toString()); write(((LazyGenotypesContext) gc).getUnparsedGenotypeData().toString());
} else { } else {
List<String> genotypeAttributeKeys = calcVCFGenotypeKeys(vc, mHeader); List<String> genotypeAttributeKeys = calcVCFGenotypeKeys(vc, mHeader);
if ( ! genotypeAttributeKeys.isEmpty() ) { if ( ! genotypeAttributeKeys.isEmpty() ) {
@ -240,16 +286,17 @@ class VCFWriter extends IndexingVariantContextWriter {
final String genotypeFormatString = ParsingUtils.join(VCFConstants.GENOTYPE_FIELD_SEPARATOR, genotypeAttributeKeys); final String genotypeFormatString = ParsingUtils.join(VCFConstants.GENOTYPE_FIELD_SEPARATOR, genotypeAttributeKeys);
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
mWriter.write(genotypeFormatString); write(genotypeFormatString);
addGenotypeData(vc, alleleMap, genotypeAttributeKeys); addGenotypeData(vc, alleleMap, genotypeAttributeKeys);
} }
} }
mWriter.write("\n"); write("\n");
// note that we cannot call flush here if we want block gzipping to work properly // note that we cannot call flush here if we want block gzipping to work properly
// calling flush results in all gzipped blocks for each variant // calling flush results in all gzipped blocks for each variant
flushBuffer();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e); throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e);
} }
@ -305,7 +352,7 @@ class VCFWriter extends IndexingVariantContextWriter {
*/ */
private void writeInfoString(Map<String, String> infoFields) throws IOException { private void writeInfoString(Map<String, String> infoFields) throws IOException {
if ( infoFields.isEmpty() ) { if ( infoFields.isEmpty() ) {
mWriter.write(VCFConstants.EMPTY_INFO_FIELD); write(VCFConstants.EMPTY_INFO_FIELD);
return; return;
} }
@ -314,16 +361,16 @@ class VCFWriter extends IndexingVariantContextWriter {
if ( isFirst ) if ( isFirst )
isFirst = false; isFirst = false;
else else
mWriter.write(VCFConstants.INFO_FIELD_SEPARATOR); write(VCFConstants.INFO_FIELD_SEPARATOR);
String key = entry.getKey(); String key = entry.getKey();
mWriter.write(key); write(key);
if ( !entry.getValue().equals("") ) { if ( !entry.getValue().equals("") ) {
VCFInfoHeaderLine metaData = mHeader.getInfoHeaderLine(key); VCFInfoHeaderLine metaData = mHeader.getInfoHeaderLine(key);
if ( metaData == null || metaData.getCountType() != VCFHeaderLineCount.INTEGER || metaData.getCount() != 0 ) { if ( metaData == null || metaData.getCountType() != VCFHeaderLineCount.INTEGER || metaData.getCount() != 0 ) {
mWriter.write("="); write("=");
mWriter.write(entry.getValue()); write(entry.getValue());
} }
} }
} }
@ -342,7 +389,7 @@ class VCFWriter extends IndexingVariantContextWriter {
final int ploidy = vc.getMaxPloidy(2); final int ploidy = vc.getMaxPloidy(2);
for ( String sample : mHeader.getGenotypeSamples() ) { for ( String sample : mHeader.getGenotypeSamples() ) {
mWriter.write(VCFConstants.FIELD_SEPARATOR); write(VCFConstants.FIELD_SEPARATOR);
Genotype g = vc.getGenotype(sample); Genotype g = vc.getGenotype(sample);
if ( g == null ) g = GenotypeBuilder.createMissing(sample, ploidy); if ( g == null ) g = GenotypeBuilder.createMissing(sample, ploidy);
@ -356,7 +403,7 @@ class VCFWriter extends IndexingVariantContextWriter {
writeAllele(g.getAllele(0), alleleMap); writeAllele(g.getAllele(0), alleleMap);
for (int i = 1; i < g.getPloidy(); i++) { for (int i = 1; i < g.getPloidy(); i++) {
mWriter.write(g.isPhased() ? VCFConstants.PHASED : VCFConstants.UNPHASED); write(g.isPhased() ? VCFConstants.PHASED : VCFConstants.UNPHASED);
writeAllele(g.getAllele(i), alleleMap); writeAllele(g.getAllele(i), alleleMap);
} }
@ -420,8 +467,8 @@ class VCFWriter extends IndexingVariantContextWriter {
for (int i = 0; i < attrs.size(); i++) { for (int i = 0; i < attrs.size(); i++) {
if ( i > 0 || genotypeFormatKeys.contains(VCFConstants.GENOTYPE_KEY) ) if ( i > 0 || genotypeFormatKeys.contains(VCFConstants.GENOTYPE_KEY) )
mWriter.write(VCFConstants.GENOTYPE_FIELD_SEPARATOR); write(VCFConstants.GENOTYPE_FIELD_SEPARATOR);
mWriter.write(attrs.get(i)); write(attrs.get(i));
} }
} }
} }
@ -435,7 +482,7 @@ class VCFWriter extends IndexingVariantContextWriter {
String encoding = alleleMap.get(allele); String encoding = alleleMap.get(allele);
if ( encoding == null ) if ( encoding == null )
throw new TribbleException.InternalCodecException("Allele " + allele + " is not an allele in the variant context"); throw new TribbleException.InternalCodecException("Allele " + allele + " is not an allele in the variant context");
mWriter.write(encoding); write(encoding);
} }
/** /**