Enable efficient parallel output of BCF2

-- Previous IO stub was hardcoded to write VCF.  So when you ran -nt 2 -o my.bcf you actually created intermediate VCF files that were then encoded single threaded as BCF.  Now we emit natively per thread BCF, and use the fast mergeInfo code to read BCF -> write BCF.  Upcoming optimizations to avoid decoding genotype data unnecessarily will enable us to really quickly process BCF2 in parallel
-- VariantContextWriterStub forces BCF output for intermediate files
-- Nicer debug log message in BCF2Codec
-- Turn off debug logging of BCF2LazyGenotypesDecoder
-- BCF2FieldWriterManager now uses .debug not .info, so you won't see all of that field manager debugging info with BCF2 any longer
-- VariantContextWriterFactory.isBCFOutput now has version that accepts just a file path, not path + options
This commit is contained in:
Mark DePristo 2012-08-14 22:08:49 -04:00
parent 290fd33f3b
commit bd7ed0d028
6 changed files with 45 additions and 24 deletions

View File

@ -27,9 +27,10 @@ package org.broadinstitute.sting.gatk.io.storage;
import net.sf.samtools.util.BlockCompressedOutputStream;
import org.apache.log4j.Logger;
import org.broad.tribble.AbstractFeatureReader;
import org.broad.tribble.FeatureCodec;
import org.broadinstitute.sting.gatk.io.stubs.VariantContextWriterStub;
import org.broadinstitute.sting.gatk.refdata.tracks.FeatureManager;
import org.broadinstitute.sting.utils.codecs.bcf2.BCF2Utils;
import org.broadinstitute.sting.utils.codecs.vcf.VCFCodec;
import org.broadinstitute.sting.utils.codecs.vcf.VCFHeader;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -81,6 +82,18 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
throw new ReviewedStingException("Unable to create target to which to write; storage was provided with neither a file nor a stream.");
}
/**
* Constructs an object which will redirect into a different file.
* @param stub Stub to use when synthesizing file / header info.
* @param tempFile File into which to direct the output data.
*/
public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) {
logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
this.file = tempFile;
this.writer = vcfWriterToFile(stub, file, false);
writer.writeHeader(stub.getVCFHeader());
}
/**
* common initialization routine for multiple constructors
* @param stub Stub to use when constructing the output file.
@ -139,19 +152,6 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
}
}
/**
* Constructs an object which will redirect into a different file.
* @param stub Stub to use when synthesizing file / header info.
* @param tempFile File into which to direct the output data.
*/
public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) {
logger.debug("Creating temporary VCF file " + tempFile.getAbsolutePath() + " for VCF output.");
this.file = tempFile;
this.writer = vcfWriterToFile(stub, file, false);
writer.writeHeader(stub.getVCFHeader());
}
public void add(VariantContext vc) {
writer.add(vc);
}
@ -176,12 +176,20 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
public void mergeInto(VariantContextWriterStorage target) {
try {
String sourceFilePath = file.getAbsolutePath();
String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin";
logger.debug(String.format("Merging %s into %s",sourceFilePath,targetFilePath));
AbstractFeatureReader<VariantContext> source = AbstractFeatureReader.getFeatureReader(file.getAbsolutePath(), new VCFCodec(), false);
final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin";
logger.debug(String.format("Merging %s into %s",file.getAbsolutePath(),targetFilePath));
// use the feature manager to determine the right codec for the tmp file
// that way we don't assume it's a specific type
final FeatureManager.FeatureDescriptor fd = new FeatureManager().getByFiletype(file);
if ( fd == null )
throw new ReviewedStingException("Unexpectedly couldn't find valid codec for temporary output file " + file);
final FeatureCodec<VariantContext> codec = fd.getCodec();
final AbstractFeatureReader<VariantContext> source =
AbstractFeatureReader.getFeatureReader(file.getAbsolutePath(), codec, false);
for ( VariantContext vc : source.iterator() ) {
for ( final VariantContext vc : source.iterator() ) {
target.writer.add(vc);
}

View File

@ -35,6 +35,7 @@ import org.broadinstitute.sting.utils.codecs.vcf.VCFUtils;
import org.broadinstitute.sting.utils.variantcontext.writer.Options;
import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter;
import org.broadinstitute.sting.utils.variantcontext.VariantContext;
import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriterFactory;
import java.io.File;
import java.io.OutputStream;
@ -186,6 +187,9 @@ public class VariantContextWriterStub implements Stub<VariantContextWriter>, Var
if ( engine.lenientVCFProcessing() ) options.add(Options.ALLOW_MISSING_FIELDS_IN_HEADER);
if ( indexOnTheFly && ! isCompressed() ) options.add(Options.INDEX_ON_THE_FLY);
if ( getFile() != null && VariantContextWriterFactory.isBCFOutput(getFile()) )
options.add(Options.FORCE_BCF);
return options.isEmpty() ? EnumSet.noneOf(Options.class) : EnumSet.copyOf(options);
}

View File

@ -149,7 +149,7 @@ public final class BCF2Codec implements FeatureCodec<VariantContext> {
if ( bcfVersion.getMinorVersion() < MIN_MINOR_VERSION )
error("BCF2Codec can only process BCF2 files with minor version >= " + MIN_MINOR_VERSION + " but this file has minor version " + bcfVersion.getMinorVersion());
logger.info("BCF version " + bcfVersion);
logger.info("Parsing data stream with BCF version " + bcfVersion);
final int headerSizeInBytes = BCF2Type.INT32.read(inputStream);

View File

@ -63,9 +63,8 @@ class BCF2LazyGenotypesDecoder implements LazyGenotypesContext.LazyParser {
@Override
public LazyGenotypesContext.LazyData parse(final Object data) {
if ( logger.isDebugEnabled() )
logger.debug("Decoding BCF genotypes for " + nSamples + " samples with " + nFields + " fields each");
// if ( logger.isDebugEnabled() )
// logger.debug("Decoding BCF genotypes for " + nSamples + " samples with " + nFields + " fields each");
try {
// load our byte[] data into the decoder

View File

@ -76,7 +76,7 @@ public class BCF2FieldWriterManager {
if ( map.containsKey(field) )
throw new ReviewedStingException("BUG: field " + field + " already seen in VCFHeader while building BCF2 field encoders");
map.put(field, writer);
logger.info(writer);
if ( logger.isDebugEnabled() ) logger.debug(writer);
}
// -----------------------------------------------------------------

View File

@ -84,6 +84,16 @@ public class VariantContextWriterFactory {
}
}
/**
* Should we output a BCF file based solely on the name of the file at location?
*
* @param location
* @return
*/
public static boolean isBCFOutput(final File location) {
return isBCFOutput(location, EnumSet.noneOf(Options.class));
}
public static boolean isBCFOutput(final File location, final EnumSet<Options> options) {
return options.contains(Options.FORCE_BCF) || (location != null && location.getName().contains(".bcf"));
}