From bd7ed0d02889e683dd4494d73357d95299fd837d Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Tue, 14 Aug 2012 22:08:49 -0400 Subject: [PATCH] Enable efficient parallel output of BCF2 -- Previous IO stub was hardcoded to write VCF. So when you ran -nt 2 -o my.bcf you actually created intermediate VCF files that were then encoded single threaded as BCF. Now we emit natively per thread BCF, and use the fast mergeInfo code to read BCF -> write BCF. Upcoming optimizations to avoid decoding genotype data unnecessarily will enable us to really quickly process BCF2 in parallel -- VariantContextWriterStub forces BCF output for intermediate files -- Nicer debug log message in BCF2Codec -- Turn off debug logging of BCF2LazyGenotypesDecoder -- BCF2FieldWriterManager now uses .debug not .info, so you won't see all of that field manager debugging info with BCF2 any longer -- VariantContextWriterFactory.isBCFOutput now has version that accepts just a file path, not path + options --- .../storage/VariantContextWriterStorage.java | 46 +++++++++++-------- .../io/stubs/VariantContextWriterStub.java | 4 ++ .../sting/utils/codecs/bcf2/BCF2Codec.java | 2 +- .../codecs/bcf2/BCF2LazyGenotypesDecoder.java | 5 +- .../writer/BCF2FieldWriterManager.java | 2 +- .../writer/VariantContextWriterFactory.java | 10 ++++ 6 files changed, 45 insertions(+), 24 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java b/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java index fb05a6b04..161179f84 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java @@ -27,9 +27,10 @@ package org.broadinstitute.sting.gatk.io.storage; import net.sf.samtools.util.BlockCompressedOutputStream; import org.apache.log4j.Logger; import org.broad.tribble.AbstractFeatureReader; +import org.broad.tribble.FeatureCodec; import org.broadinstitute.sting.gatk.io.stubs.VariantContextWriterStub; +import org.broadinstitute.sting.gatk.refdata.tracks.FeatureManager; import org.broadinstitute.sting.utils.codecs.bcf2.BCF2Utils; -import org.broadinstitute.sting.utils.codecs.vcf.VCFCodec; import org.broadinstitute.sting.utils.codecs.vcf.VCFHeader; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -81,6 +82,18 @@ public class VariantContextWriterStorage implements Storage source = AbstractFeatureReader.getFeatureReader(file.getAbsolutePath(), new VCFCodec(), false); + final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin"; + logger.debug(String.format("Merging %s into %s",file.getAbsolutePath(),targetFilePath)); + + // use the feature manager to determine the right codec for the tmp file + // that way we don't assume it's a specific type + final FeatureManager.FeatureDescriptor fd = new FeatureManager().getByFiletype(file); + if ( fd == null ) + throw new ReviewedStingException("Unexpectedly couldn't find valid codec for temporary output file " + file); + + final FeatureCodec codec = fd.getCodec(); + final AbstractFeatureReader source = + AbstractFeatureReader.getFeatureReader(file.getAbsolutePath(), codec, false); - for ( VariantContext vc : source.iterator() ) { + for ( final VariantContext vc : source.iterator() ) { target.writer.add(vc); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java b/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java index 6ed889eb6..bea7172ea 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java @@ -35,6 +35,7 @@ import org.broadinstitute.sting.utils.codecs.vcf.VCFUtils; import org.broadinstitute.sting.utils.variantcontext.writer.Options; import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter; import org.broadinstitute.sting.utils.variantcontext.VariantContext; +import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriterFactory; import java.io.File; import java.io.OutputStream; @@ -186,6 +187,9 @@ public class VariantContextWriterStub implements Stub, Var if ( engine.lenientVCFProcessing() ) options.add(Options.ALLOW_MISSING_FIELDS_IN_HEADER); if ( indexOnTheFly && ! isCompressed() ) options.add(Options.INDEX_ON_THE_FLY); + if ( getFile() != null && VariantContextWriterFactory.isBCFOutput(getFile()) ) + options.add(Options.FORCE_BCF); + return options.isEmpty() ? EnumSet.noneOf(Options.class) : EnumSet.copyOf(options); } diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2Codec.java b/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2Codec.java index 570ca7c1c..67e189d11 100644 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2Codec.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2Codec.java @@ -149,7 +149,7 @@ public final class BCF2Codec implements FeatureCodec { if ( bcfVersion.getMinorVersion() < MIN_MINOR_VERSION ) error("BCF2Codec can only process BCF2 files with minor version >= " + MIN_MINOR_VERSION + " but this file has minor version " + bcfVersion.getMinorVersion()); - logger.info("BCF version " + bcfVersion); + logger.info("Parsing data stream with BCF version " + bcfVersion); final int headerSizeInBytes = BCF2Type.INT32.read(inputStream); diff --git a/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2LazyGenotypesDecoder.java b/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2LazyGenotypesDecoder.java index cf34a8b48..513b9fcb5 100644 --- a/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2LazyGenotypesDecoder.java +++ b/public/java/src/org/broadinstitute/sting/utils/codecs/bcf2/BCF2LazyGenotypesDecoder.java @@ -63,9 +63,8 @@ class BCF2LazyGenotypesDecoder implements LazyGenotypesContext.LazyParser { @Override public LazyGenotypesContext.LazyData parse(final Object data) { - if ( logger.isDebugEnabled() ) - logger.debug("Decoding BCF genotypes for " + nSamples + " samples with " + nFields + " fields each"); - +// if ( logger.isDebugEnabled() ) +// logger.debug("Decoding BCF genotypes for " + nSamples + " samples with " + nFields + " fields each"); try { // load our byte[] data into the decoder diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/BCF2FieldWriterManager.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/BCF2FieldWriterManager.java index 219daf315..7b8224568 100644 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/BCF2FieldWriterManager.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/BCF2FieldWriterManager.java @@ -76,7 +76,7 @@ public class BCF2FieldWriterManager { if ( map.containsKey(field) ) throw new ReviewedStingException("BUG: field " + field + " already seen in VCFHeader while building BCF2 field encoders"); map.put(field, writer); - logger.info(writer); + if ( logger.isDebugEnabled() ) logger.debug(writer); } // ----------------------------------------------------------------- diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VariantContextWriterFactory.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VariantContextWriterFactory.java index f23166a02..035aff7d6 100644 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VariantContextWriterFactory.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VariantContextWriterFactory.java @@ -84,6 +84,16 @@ public class VariantContextWriterFactory { } } + /** + * Should we output a BCF file based solely on the name of the file at location? + * + * @param location + * @return + */ + public static boolean isBCFOutput(final File location) { + return isBCFOutput(location, EnumSet.noneOf(Options.class)); + } + public static boolean isBCFOutput(final File location, final EnumSet options) { return options.contains(Options.FORCE_BCF) || (location != null && location.getName().contains(".bcf")); }