From b4f482a4212984b9b2927063261917f50b76634f Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Mon, 15 Apr 2013 08:20:28 -0400 Subject: [PATCH 1/2] NanoScheduled ActiveRegionTraversal and HaplotypeCaller -- Made CountReadsInActiveRegions Nano schedulable, confirming identical results for linear and nano results -- Made Haplotype NanoScheduled, requiring misc. changes in the map/reduce type so that the map() function returns a List and reduce actually prints out the results to disk -- Tests for NanoScheduling -- CountReadsInActiveRegionsIntegrationTest now does NCT 1, 2, 4 with CountReadsInActiveRegions -- HaplotypeCallerParallelIntegrationTest does NCT 1,2,4 calling on 100kb of PCR free data -- Some misc. code cleanup of HaplotypeCaller -- Analysis scripts to assess performance of nano scheduled HC -- In order to make the haplotype caller thread safe we needed to use an AtomicInteger for the class-specific static ID counter in SeqVertex and MultiDebrujinVertex, avoiding a race condition where multiple new Vertex() could end up with the same id. --- .../haplotypecaller/HaplotypeCaller.java | 45 ++-- .../LikelihoodCalculationEngine.java | 40 +-- .../haplotypecaller/graphs/SeqVertex.java | 8 +- .../readthreading/MultiDeBruijnVertex.java | 7 +- ...aplotypeCallerParallelIntegrationTest.java | 79 ++++++ .../sting/gatk/executive/MicroScheduler.java | 2 +- .../traversals/TraverseActiveRegions.java | 228 +++++++++++++----- .../TraverseActiveRegionsUnitTest.java | 16 +- 8 files changed, 306 insertions(+), 119 deletions(-) create mode 100644 protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerParallelIntegrationTest.java diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java index 33d1104bc..f065a0d7d 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCaller.java @@ -139,7 +139,7 @@ import java.util.*; @ActiveRegionTraversalParameters(extension=100, maxRegion=300) @ReadFilters({HCMappingQualityFilter.class}) @Downsample(by= DownsampleType.BY_SAMPLE, toCoverage=250) -public class HaplotypeCaller extends ActiveRegionWalker implements AnnotatorCompatible { +public class HaplotypeCaller extends ActiveRegionWalker, Integer> implements AnnotatorCompatible, NanoSchedulable { // ----------------------------------------------------------------------------------------------- // general haplotype caller arguments // ----------------------------------------------------------------------------------------------- @@ -645,15 +645,16 @@ public class HaplotypeCaller extends ActiveRegionWalker implem // //--------------------------------------------------------------------------------------------------------------- + private final static List NO_CALLS = Collections.emptyList(); @Override - public Integer map( final ActiveRegion originalActiveRegion, final RefMetaDataTracker metaDataTracker ) { + public List map( final ActiveRegion originalActiveRegion, final RefMetaDataTracker metaDataTracker ) { if ( justDetermineActiveRegions ) // we're benchmarking ART and/or the active region determination code in the HC, just leave without doing any work - return 1; + return NO_CALLS; - if( !originalActiveRegion.isActive() ) { return 0; } // Not active so nothing to do! + if( !originalActiveRegion.isActive() ) { return NO_CALLS; } // Not active so nothing to do! - final List activeAllelesToGenotype = new ArrayList(); + final List activeAllelesToGenotype = new ArrayList<>(); if( UG_engine.getUAC().GenotypingMode == GenotypeLikelihoodsCalculationModel.GENOTYPING_MODE.GENOTYPE_GIVEN_ALLELES ) { for( final VariantContext vc : allelesToGenotype ) { if( originalActiveRegion.getLocation().overlapsP( getToolkit().getGenomeLocParser().createGenomeLoc(vc) ) ) { @@ -662,23 +663,23 @@ public class HaplotypeCaller extends ActiveRegionWalker implem } allelesToGenotype.removeAll( activeAllelesToGenotype ); // No alleles found in this region so nothing to do! - if ( activeAllelesToGenotype.isEmpty() ) { return 0; } + if ( activeAllelesToGenotype.isEmpty() ) { return NO_CALLS; } } else { - if( originalActiveRegion.size() == 0 ) { return 0; } // No reads here so nothing to do! + if( originalActiveRegion.size() == 0 ) { return NO_CALLS; } // No reads here so nothing to do! } // run the local assembler, getting back a collection of information on how we should proceed final AssemblyResult assemblyResult = assembleReads(originalActiveRegion, activeAllelesToGenotype); // abort early if something is out of the acceptable range - if( ! assemblyResult.isVariationPresent() ) { return 1; } // only the reference haplotype remains so nothing else to do! - if (dontGenotype) return 1; // user requested we not proceed + if( ! assemblyResult.isVariationPresent() ) { return NO_CALLS; } // only the reference haplotype remains so nothing else to do! + if (dontGenotype) return NO_CALLS; // user requested we not proceed // filter out reads from genotyping which fail mapping quality based criteria final List filteredReads = filterNonPassingReads( assemblyResult.regionForGenotyping ); final Map> perSampleFilteredReadList = splitReadsBySample( filteredReads ); - if( assemblyResult.regionForGenotyping.size() == 0 ) { return 1; } // no reads remain after filtering so nothing else to do! + if( assemblyResult.regionForGenotyping.size() == 0 ) { return NO_CALLS; } // no reads remain after filtering so nothing else to do! // evaluate each sample's reads against all haplotypes //logger.info("Computing read likelihoods with " + assemblyResult.regionForGenotyping.size() + " reads"); @@ -697,12 +698,7 @@ public class HaplotypeCaller extends ActiveRegionWalker implem getToolkit().getGenomeLocParser(), activeAllelesToGenotype ); - for( final VariantContext call : calledHaplotypes.getCalls() ) { - // TODO -- uncomment this line once ART-based walkers have a proper RefMetaDataTracker. - // annotationEngine.annotateDBs(metaDataTracker, getToolkit().getGenomeLocParser().createGenomeLoc(call), call); - vcfWriter.add( call ); - } - + // TODO -- must disable if we are doing NCT, or set the output type of ! presorted if ( bamWriter != null ) { haplotypeBAMWriter.writeReadsAlignedToHaplotypes(assemblyResult.haplotypes, assemblyResult.paddedReferenceLoc, bestHaplotypes, @@ -712,7 +708,7 @@ public class HaplotypeCaller extends ActiveRegionWalker implem if( DEBUG ) { logger.info("----------------------------------------------------------------------------------"); } - return 1; // One active region was processed during this map call + return calledHaplotypes.getCalls(); } private final static class AssemblyResult { @@ -855,8 +851,13 @@ public class HaplotypeCaller extends ActiveRegionWalker implem } @Override - public Integer reduce(Integer cur, Integer sum) { - return cur + sum; + public Integer reduce(List callsInRegion, Integer numCalledRegions) { + for( final VariantContext call : callsInRegion ) { + // TODO -- uncomment this line once ART-based walkers have a proper RefMetaDataTracker. + // annotationEngine.annotateDBs(metaDataTracker, getToolkit().getGenomeLocParser().createGenomeLoc(call), call); + vcfWriter.add( call ); + } + return (callsInRegion.isEmpty() ? 0 : 1) + numCalledRegions; } @Override @@ -872,7 +873,7 @@ public class HaplotypeCaller extends ActiveRegionWalker implem private void finalizeActiveRegion( final ActiveRegion activeRegion ) { if( DEBUG ) { logger.info("Assembling " + activeRegion.getLocation() + " with " + activeRegion.size() + " reads: (with overlap region = " + activeRegion.getExtendedLoc() + ")"); } - final List finalizedReadList = new ArrayList(); + final List finalizedReadList = new ArrayList<>(); final FragmentCollection fragmentCollection = FragmentUtils.create( activeRegion.getReads() ); activeRegion.clearReads(); @@ -883,7 +884,7 @@ public class HaplotypeCaller extends ActiveRegionWalker implem } // Loop through the reads hard clipping the adaptor and low quality tails - final List readsToUse = new ArrayList(finalizedReadList.size()); + final List readsToUse = new ArrayList<>(finalizedReadList.size()); for( final GATKSAMRecord myRead : finalizedReadList ) { final GATKSAMRecord postAdapterRead = ( myRead.getReadUnmappedFlag() ? myRead : ReadClipper.hardClipAdaptorSequence( myRead ) ); if( postAdapterRead != null && !postAdapterRead.isEmpty() && postAdapterRead.getCigar().getReadLength() > 0 ) { @@ -937,7 +938,7 @@ public class HaplotypeCaller extends ActiveRegionWalker implem for( final String sample : samplesList) { List readList = returnMap.get( sample ); if( readList == null ) { - readList = new ArrayList(); + readList = new ArrayList<>(); returnMap.put(sample, readList); } } diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java index fbd9b29d5..d5d5f3c09 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java @@ -71,7 +71,20 @@ public class LikelihoodCalculationEngine { private final byte constantGCP; private final double log10globalReadMismappingRate; private final boolean DEBUG; - private final PairHMM pairHMM; + private final PairHMM.HMM_IMPLEMENTATION hmmType; + + private final ThreadLocal pairHMM = new ThreadLocal() { + @Override + protected PairHMM initialValue() { + switch (hmmType) { + case EXACT: return new Log10PairHMM(true); + case ORIGINAL: return new Log10PairHMM(false); + case LOGLESS_CACHING: return new LoglessPairHMM(); + default: + throw new UserException.BadArgumentValue("pairHMM", "Specified pairHMM implementation is unrecognized or incompatible with the HaplotypeCaller. Acceptable options are ORIGINAL, EXACT, CACHING, and LOGLESS_CACHING."); + } + } + }; /** * The expected rate of random sequencing errors for a read originating from its true haplotype. @@ -96,22 +109,9 @@ public class LikelihoodCalculationEngine { * assigned a likelihood of -13. */ public LikelihoodCalculationEngine( final byte constantGCP, final boolean debug, final PairHMM.HMM_IMPLEMENTATION hmmType, final double log10globalReadMismappingRate ) { - switch (hmmType) { - case EXACT: - pairHMM = new Log10PairHMM(true); - break; - case ORIGINAL: - pairHMM = new Log10PairHMM(false); - break; - case LOGLESS_CACHING: - pairHMM = new LoglessPairHMM(); - break; - default: - throw new UserException.BadArgumentValue("pairHMM", "Specified pairHMM implementation is unrecognized or incompatible with the HaplotypeCaller. Acceptable options are ORIGINAL, EXACT, CACHING, and LOGLESS_CACHING."); - } - + this.hmmType = hmmType; this.constantGCP = constantGCP; - DEBUG = debug; + this.DEBUG = debug; this.log10globalReadMismappingRate = log10globalReadMismappingRate; } @@ -143,7 +143,7 @@ public class LikelihoodCalculationEngine { } // initialize arrays to hold the probabilities of being in the match, insertion and deletion cases - pairHMM.initialize(X_METRIC_LENGTH, Y_METRIC_LENGTH); + pairHMM.get().initialize(X_METRIC_LENGTH, Y_METRIC_LENGTH); } public Map computeReadLikelihoods( final List haplotypes, final Map> perSampleReadList ) { @@ -151,7 +151,7 @@ public class LikelihoodCalculationEngine { initializePairHMM(haplotypes, perSampleReadList); // Add likelihoods for each sample's reads to our stratifiedReadMap - final Map stratifiedReadMap = new HashMap(); + final Map stratifiedReadMap = new LinkedHashMap<>(); for( final Map.Entry> sampleEntry : perSampleReadList.entrySet() ) { // evaluate the likelihood of the reads given those haplotypes final PerReadAlleleLikelihoodMap map = computeReadLikelihoods(haplotypes, sampleEntry.getValue()); @@ -170,7 +170,7 @@ public class LikelihoodCalculationEngine { private PerReadAlleleLikelihoodMap computeReadLikelihoods( final List haplotypes, final List reads) { // first, a little set up to get copies of the Haplotypes that are Alleles (more efficient than creating them each time) final int numHaplotypes = haplotypes.size(); - final Map alleleVersions = new HashMap<>(numHaplotypes); + final Map alleleVersions = new LinkedHashMap<>(numHaplotypes); Allele refAllele = null; for ( final Haplotype haplotype : haplotypes ) { final Allele allele = Allele.create(haplotype, true); @@ -202,7 +202,7 @@ public class LikelihoodCalculationEngine { for( int jjj = 0; jjj < numHaplotypes; jjj++ ) { final Haplotype haplotype = haplotypes.get(jjj); final boolean isFirstHaplotype = jjj == 0; - final double log10l = pairHMM.computeReadLikelihoodGivenHaplotypeLog10(haplotype.getBases(), + final double log10l = pairHMM.get().computeReadLikelihoodGivenHaplotypeLog10(haplotype.getBases(), read.getReadBases(), readQuals, readInsQuals, readDelQuals, overallGCP, isFirstHaplotype); if ( haplotype.isNonReference() ) diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/graphs/SeqVertex.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/graphs/SeqVertex.java index f192b54aa..083747db4 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/graphs/SeqVertex.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/graphs/SeqVertex.java @@ -49,6 +49,7 @@ package org.broadinstitute.sting.gatk.walkers.haplotypecaller.graphs; import com.google.java.contract.Requires; import org.broadinstitute.sting.utils.Utils; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; /** * A graph vertex containing a sequence of bases and a unique ID that @@ -71,8 +72,9 @@ import java.util.Arrays; * @since 03/2013 */ public final class SeqVertex extends BaseVertex { - private static int idCounter = 0; - public final int id; + // Note that using an AtomicInteger is critical to allow multi-threaded HaplotypeCaller + private static final AtomicInteger idCounter = new AtomicInteger(0); + private int id = idCounter.getAndIncrement(); /** * Create a new SeqVertex with sequence and the next available id @@ -80,7 +82,6 @@ public final class SeqVertex extends BaseVertex { */ public SeqVertex(final byte[] sequence) { super(sequence); - this.id = idCounter++; } /** @@ -89,7 +90,6 @@ public final class SeqVertex extends BaseVertex { */ public SeqVertex(final String sequence) { super(sequence); - this.id = idCounter++; } /** diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/readthreading/MultiDeBruijnVertex.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/readthreading/MultiDeBruijnVertex.java index 814b3b9a7..5752583c7 100644 --- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/readthreading/MultiDeBruijnVertex.java +++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/readthreading/MultiDeBruijnVertex.java @@ -51,6 +51,7 @@ import org.broadinstitute.sting.utils.Utils; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * A DeBruijnVertex that supports multiple copies of the same kmer @@ -65,10 +66,12 @@ import java.util.List; */ final class MultiDeBruijnVertex extends DeBruijnVertex { private final static boolean KEEP_TRACK_OF_READS = false; - private static int idCounter = 0; + + // Note that using an AtomicInteger is critical to allow multi-threaded HaplotypeCaller + private static final AtomicInteger idCounter = new AtomicInteger(0); + private int id = idCounter.getAndIncrement(); private final List reads = new LinkedList(); - private int id = idCounter++; // TODO -- potential race condition problem here /** * Create a new MultiDeBruijnVertex with kmer sequence diff --git a/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerParallelIntegrationTest.java b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerParallelIntegrationTest.java new file mode 100644 index 000000000..ff5a501cc --- /dev/null +++ b/protected/java/test/org/broadinstitute/sting/gatk/walkers/haplotypecaller/HaplotypeCallerParallelIntegrationTest.java @@ -0,0 +1,79 @@ +/* +* By downloading the PROGRAM you agree to the following terms of use: +* +* BROAD INSTITUTE - SOFTWARE LICENSE AGREEMENT - FOR ACADEMIC NON-COMMERCIAL RESEARCH PURPOSES ONLY +* +* This Agreement is made between the Broad Institute, Inc. with a principal address at 7 Cambridge Center, Cambridge, MA 02142 (BROAD) and the LICENSEE and is effective at the date the downloading is completed (EFFECTIVE DATE). +* +* WHEREAS, LICENSEE desires to license the PROGRAM, as defined hereinafter, and BROAD wishes to have this PROGRAM utilized in the public interest, subject only to the royalty-free, nonexclusive, nontransferable license rights of the United States Government pursuant to 48 CFR 52.227-14; and +* WHEREAS, LICENSEE desires to license the PROGRAM and BROAD desires to grant a license on the following terms and conditions. +* NOW, THEREFORE, in consideration of the promises and covenants made herein, the parties hereto agree as follows: +* +* 1. DEFINITIONS +* 1.1 PROGRAM shall mean copyright in the object code and source code known as GATK2 and related documentation, if any, as they exist on the EFFECTIVE DATE and can be downloaded from http://www.broadinstitute/GATK on the EFFECTIVE DATE. +* +* 2. LICENSE +* 2.1 Grant. Subject to the terms of this Agreement, BROAD hereby grants to LICENSEE, solely for academic non-commercial research purposes, a non-exclusive, non-transferable license to: (a) download, execute and display the PROGRAM and (b) create bug fixes and modify the PROGRAM. +* The LICENSEE may apply the PROGRAM in a pipeline to data owned by users other than the LICENSEE and provide these users the results of the PROGRAM provided LICENSEE does so for academic non-commercial purposes only. For clarification purposes, academic sponsored research is not a commercial use under the terms of this Agreement. +* 2.2 No Sublicensing or Additional Rights. LICENSEE shall not sublicense or distribute the PROGRAM, in whole or in part, without prior written permission from BROAD. LICENSEE shall ensure that all of its users agree to the terms of this Agreement. LICENSEE further agrees that it shall not put the PROGRAM on a network, server, or other similar technology that may be accessed by anyone other than the LICENSEE and its employees and users who have agreed to the terms of this agreement. +* 2.3 License Limitations. Nothing in this Agreement shall be construed to confer any rights upon LICENSEE by implication, estoppel, or otherwise to any computer software, trademark, intellectual property, or patent rights of BROAD, or of any other entity, except as expressly granted herein. LICENSEE agrees that the PROGRAM, in whole or part, shall not be used for any commercial purpose, including without limitation, as the basis of a commercial software or hardware product or to provide services. LICENSEE further agrees that the PROGRAM shall not be copied or otherwise adapted in order to circumvent the need for obtaining a license for use of the PROGRAM. +* +* 3. OWNERSHIP OF INTELLECTUAL PROPERTY +* LICENSEE acknowledges that title to the PROGRAM shall remain with BROAD. The PROGRAM is marked with the following BROAD copyright notice and notice of attribution to contributors. LICENSEE shall retain such notice on all copies. LICENSEE agrees to include appropriate attribution if any results obtained from use of the PROGRAM are included in any publication. +* Copyright 2012 Broad Institute, Inc. +* Notice of attribution: The GATK2 program was made available through the generosity of Medical and Population Genetics program at the Broad Institute, Inc. +* LICENSEE shall not use any trademark or trade name of BROAD, or any variation, adaptation, or abbreviation, of such marks or trade names, or any names of officers, faculty, students, employees, or agents of BROAD except as states above for attribution purposes. +* +* 4. INDEMNIFICATION +* LICENSEE shall indemnify, defend, and hold harmless BROAD, and their respective officers, faculty, students, employees, associated investigators and agents, and their respective successors, heirs and assigns, (Indemnitees), against any liability, damage, loss, or expense (including reasonable attorneys fees and expenses) incurred by or imposed upon any of the Indemnitees in connection with any claims, suits, actions, demands or judgments arising out of any theory of liability (including, without limitation, actions in the form of tort, warranty, or strict liability and regardless of whether such action has any factual basis) pursuant to any right or license granted under this Agreement. +* +* 5. NO REPRESENTATIONS OR WARRANTIES +* THE PROGRAM IS DELIVERED AS IS. BROAD MAKES NO REPRESENTATIONS OR WARRANTIES OF ANY KIND CONCERNING THE PROGRAM OR THE COPYRIGHT, EXPRESS OR IMPLIED, INCLUDING, WITHOUT LIMITATION, WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NONINFRINGEMENT, OR THE ABSENCE OF LATENT OR OTHER DEFECTS, WHETHER OR NOT DISCOVERABLE. BROAD EXTENDS NO WARRANTIES OF ANY KIND AS TO PROGRAM CONFORMITY WITH WHATEVER USER MANUALS OR OTHER LITERATURE MAY BE ISSUED FROM TIME TO TIME. +* IN NO EVENT SHALL BROAD OR ITS RESPECTIVE DIRECTORS, OFFICERS, EMPLOYEES, AFFILIATED INVESTIGATORS AND AFFILIATES BE LIABLE FOR INCIDENTAL OR CONSEQUENTIAL DAMAGES OF ANY KIND, INCLUDING, WITHOUT LIMITATION, ECONOMIC DAMAGES OR INJURY TO PROPERTY AND LOST PROFITS, REGARDLESS OF WHETHER BROAD SHALL BE ADVISED, SHALL HAVE OTHER REASON TO KNOW, OR IN FACT SHALL KNOW OF THE POSSIBILITY OF THE FOREGOING. +* +* 6. ASSIGNMENT +* This Agreement is personal to LICENSEE and any rights or obligations assigned by LICENSEE without the prior written consent of BROAD shall be null and void. +* +* 7. MISCELLANEOUS +* 7.1 Export Control. LICENSEE gives assurance that it will comply with all United States export control laws and regulations controlling the export of the PROGRAM, including, without limitation, all Export Administration Regulations of the United States Department of Commerce. Among other things, these laws and regulations prohibit, or require a license for, the export of certain types of software to specified countries. +* 7.2 Termination. LICENSEE shall have the right to terminate this Agreement for any reason upon prior written notice to BROAD. If LICENSEE breaches any provision hereunder, and fails to cure such breach within thirty (30) days, BROAD may terminate this Agreement immediately. Upon termination, LICENSEE shall provide BROAD with written assurance that the original and all copies of the PROGRAM have been destroyed, except that, upon prior written authorization from BROAD, LICENSEE may retain a copy for archive purposes. +* 7.3 Survival. The following provisions shall survive the expiration or termination of this Agreement: Articles 1, 3, 4, 5 and Sections 2.2, 2.3, 7.3, and 7.4. +* 7.4 Notice. Any notices under this Agreement shall be in writing, shall specifically refer to this Agreement, and shall be sent by hand, recognized national overnight courier, confirmed facsimile transmission, confirmed electronic mail, or registered or certified mail, postage prepaid, return receipt requested. All notices under this Agreement shall be deemed effective upon receipt. +* 7.5 Amendment and Waiver; Entire Agreement. This Agreement may be amended, supplemented, or otherwise modified only by means of a written instrument signed by all parties. Any waiver of any rights or failure to act in a specific instance shall relate only to such instance and shall not be construed as an agreement to waive any rights or fail to act in any other instance, whether or not similar. This Agreement constitutes the entire agreement among the parties with respect to its subject matter and supersedes prior agreements or understandings between the parties relating to its subject matter. +* 7.6 Binding Effect; Headings. This Agreement shall be binding upon and inure to the benefit of the parties and their respective permitted successors and assigns. All headings are for convenience only and shall not affect the meaning of any provision of this Agreement. +* 7.7 Governing Law. This Agreement shall be construed, governed, interpreted and applied in accordance with the internal laws of the Commonwealth of Massachusetts, U.S.A., without regard to conflict of laws principles. +*/ + +package org.broadinstitute.sting.gatk.walkers.haplotypecaller; + +import org.broadinstitute.sting.WalkerTest; +import org.broadinstitute.sting.utils.haplotypeBAMWriter.HaplotypeBAMWriter; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class HaplotypeCallerParallelIntegrationTest extends WalkerTest { + @DataProvider(name = "NCTDataProvider") + public Object[][] makeNCTDataProvider() { + List tests = new ArrayList(); + + for ( final int nct : Arrays.asList(1, 2, 4) ) { + tests.add(new Object[]{nct, "c277fd65365d59b734260dd8423313bb"}); + } + + return tests.toArray(new Object[][]{}); + } + + @Test(dataProvider = "NCTDataProvider") + public void testHCNCT(final int nct, final String md5) { + WalkerTestSpec spec = new WalkerTestSpec( + "-T HaplotypeCaller -R " + b37KGReference + " --no_cmdline_in_header -I " + + privateTestDir + "PCRFree.2x250.Illumina.20_10_11.bam -o %s " + + " -L 20:10,000,000-10,100,000 -G none -A -contamination 0.0 -nct " + nct, 1, + Arrays.asList(md5)); + executeTest("HC test parallel HC with NCT with nct " + nct, spec); + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index dc9dfd77e..23b084d66 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -245,7 +245,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } else if (walker instanceof ReadPairWalker) { return new TraverseReadPairs(); } else if (walker instanceof ActiveRegionWalker) { - return new TraverseActiveRegions(); + return new TraverseActiveRegions(threadAllocation.getNumCPUThreadsPerDataThread()); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java index f9a4fcdbb..b47a355be 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -41,12 +41,22 @@ import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.SampleUtils; import org.broadinstitute.sting.utils.Utils; -import org.broadinstitute.sting.utils.activeregion.*; +import org.broadinstitute.sting.utils.activeregion.ActiveRegion; +import org.broadinstitute.sting.utils.activeregion.ActivityProfile; +import org.broadinstitute.sting.utils.activeregion.ActivityProfileState; +import org.broadinstitute.sting.utils.activeregion.BandPassActivityProfile; +import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.progressmeter.ProgressMeter; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import java.io.PrintStream; -import java.util.*; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; /** * Implement active region traversal @@ -67,7 +77,8 @@ import java.util.*; * variable spanOfLastReadSeen * */ -public class TraverseActiveRegions extends TraversalEngine,LocusShardDataProvider> { +public final class TraverseActiveRegions extends TraversalEngine,LocusShardDataProvider> { + private final static boolean DEBUG = false; protected final static Logger logger = Logger.getLogger(TraversalEngine.class); protected final static boolean LOG_READ_CARRYING = false; @@ -84,7 +95,32 @@ public class TraverseActiveRegions extends TraversalEngine walker; + + final NanoScheduler nanoScheduler; + + /** + * Create a single threaded active region traverser + */ + public TraverseActiveRegions() { + this(1); + } + + /** + * Create an active region traverser that uses nThreads for getting its work done + * @param nThreads number of threads + */ + public TraverseActiveRegions(final int nThreads) { + nanoScheduler = new NanoScheduler<>(nThreads); + nanoScheduler.setProgressFunction(new NSProgressFunction() { + @Override + public void progress(ActiveRegion lastActiveRegion) { + if ( lastActiveRegion != null ) + // note, need to use getStopLocation so we don't give an interval to ProgressMeterDaemon + printProgress(lastActiveRegion.getLocation().getStopLocation()); + } + }); + } /** * Have the debugging output streams been initialized already? @@ -98,7 +134,7 @@ public class TraverseActiveRegions extends TraversalEngine)walker; if ( this.walker.wantsExtendedReads() && ! this.walker.wantsNonPrimaryReads() ) { throw new IllegalArgumentException("Active region walker " + this.walker + " requested extended events but not " + "non-primary reads, an inconsistent state. Please modify the walker"); @@ -217,58 +253,108 @@ public class TraverseActiveRegions extends TraversalEngine reads = locusView.getLIBS().transferReadsFromAllPreviousPileups(); - for( final GATKSAMRecord read : reads ) { - if ( ! appearedInLastShard(locOfLastReadAtTraversalStart, read) ) { - rememberLastReadLocation(read); - myReads.add(read); - } - } - - // skip this location -- it's not part of our engine intervals - if ( outsideEngineIntervals(location) ) - continue; - - // we've move across some interval boundary, restart profile - final boolean flushProfile = ! activityProfile.isEmpty() - && ( activityProfile.getContigIndex() != location.getContigIndex() - || location.getStart() != activityProfile.getStop() + 1); - sum = processActiveRegions(walker, sum, flushProfile, false); - - dataProvider.getShard().getReadMetrics().incrementNumIterations(); - - // create reference context. Note that if we have a pileup of "extended events", the context will - // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). - final ReferenceContext refContext = referenceView.getReferenceContext(location); - - // Iterate forward to get all reference ordered data covering this location - final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); - - // Call the walkers isActive function for this locus and add them to the list to be integrated later - addIsActiveResult(walker, tracker, refContext, locus); - - maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory); - printProgress(location); - } + nanoScheduler.setDebug(false); + final Iterator activeRegionIterator = new ActiveRegionIterator(dataProvider); + final TraverseActiveRegionMap myMap = new TraverseActiveRegionMap(); + final TraverseActiveRegionReduce myReduce = new TraverseActiveRegionReduce(); + final T result = nanoScheduler.execute(activeRegionIterator, myMap, sum, myReduce); updateCumulativeMetrics(dataProvider.getShard()); - return sum; + return result; + } + + private class ActiveRegionIterator implements Iterator { + private final LocusShardDataProvider dataProvider; + private LinkedList readyActiveRegions = new LinkedList(); + private boolean done = false; + private final LocusView locusView; + private final LocusReferenceView referenceView; + private final ReferenceOrderedView referenceOrderedDataView; + private final GenomeLoc locOfLastReadAtTraversalStart; + + public ActiveRegionIterator( final LocusShardDataProvider dataProvider ) { + this.dataProvider = dataProvider; + locusView = new AllLocusView(dataProvider); + referenceView = new LocusReferenceView( walker, dataProvider ); + referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView); + + // We keep processing while the next reference location is within the interval + locOfLastReadAtTraversalStart = spanOfLastSeenRead(); + } + + @Override public void remove() { throw new UnsupportedOperationException("Cannot remove from ActiveRegionIterator"); } + + @Override + public ActiveRegion next() { + return readyActiveRegions.pop(); + } + @Override + public boolean hasNext() { + if ( ! readyActiveRegions.isEmpty() ) + return true; + if ( done ) + return false; + else { + + while( locusView.hasNext() ) { + final AlignmentContext locus = locusView.next(); + final GenomeLoc location = locus.getLocation(); + + rememberLastLocusLocation(location); + + // get all of the new reads that appear in the current pileup, and them to our list of reads + // provided we haven't seen them before + final Collection reads = locusView.getLIBS().transferReadsFromAllPreviousPileups(); + for( final GATKSAMRecord read : reads ) { + // note that ActiveRegionShards span entire contigs, so this check is in some + // sense no longer necessary, as any read that appeared in the last shard would now + // by definition be on a different contig. However, the logic here doesn't hurt anything + // and makes us robust should we decided to provide shards that don't fully span + // contigs at some point in the future + if ( ! appearedInLastShard(locOfLastReadAtTraversalStart, read) ) { + rememberLastReadLocation(read); + myReads.add(read); + } + } + + // skip this location -- it's not part of our engine intervals + if ( outsideEngineIntervals(location) ) + continue; + + // we've move across some interval boundary, restart profile + final boolean flushProfile = ! activityProfile.isEmpty() + && ( activityProfile.getContigIndex() != location.getContigIndex() + || location.getStart() != activityProfile.getStop() + 1); + final List newActiveRegions = prepActiveRegionsForProcessing(walker, flushProfile, false); + + dataProvider.getShard().getReadMetrics().incrementNumIterations(); + + // create reference context. Note that if we have a pileup of "extended events", the context will + // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). + final ReferenceContext refContext = referenceView.getReferenceContext(location); + + // Iterate forward to get all reference ordered data covering this location + final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); + + // Call the walkers isActive function for this locus and add them to the list to be integrated later + addIsActiveResult(walker, tracker, refContext, locus); + + maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory); + printProgress(location); + + if ( ! newActiveRegions.isEmpty() ) { + readyActiveRegions.addAll(newActiveRegions); + if ( DEBUG ) + for ( final ActiveRegion region : newActiveRegions ) + logger.info("Adding region to queue for processing " + region); + return true; + } + } + + return false; + } + } } /** @@ -276,7 +362,11 @@ public class TraverseActiveRegions extends TraversalEngine walker, T sum) { - return processActiveRegions((ActiveRegionWalker)walker, sum, true, true); + for ( final ActiveRegion region : prepActiveRegionsForProcessing((ActiveRegionWalker)walker, true, true) ) { + final M x = ((ActiveRegionWalker) walker).map(region, null); + sum = walker.reduce( x, sum ); + } + return sum; } // ------------------------------------------------------------------------------------- @@ -504,7 +594,7 @@ public class TraverseActiveRegions extends TraversalEngine walker, T sum, final boolean flushActivityProfile, final boolean forceAllRegionsToBeActive) { + private List prepActiveRegionsForProcessing(final ActiveRegionWalker walker, final boolean flushActivityProfile, final boolean forceAllRegionsToBeActive) { if ( ! walkerHasPresetRegions ) { // We don't have preset regions, so we get our regions from the activity profile final Collection activeRegions = activityProfile.popReadyActiveRegions(getActiveRegionExtension(), getMinRegionSize(), getMaxRegionSize(), flushActivityProfile); @@ -513,21 +603,23 @@ public class TraverseActiveRegions extends TraversalEngine readyRegions = new LinkedList(); while( workQueue.peek() != null ) { final ActiveRegion activeRegion = workQueue.peek(); if ( forceAllRegionsToBeActive || regionCompletelyWithinDeadZone(activeRegion) ) { writeActivityProfile(activeRegion.getSupportingStates()); writeActiveRegion(activeRegion); - sum = processActiveRegion( workQueue.remove(), sum, walker ); + readyRegions.add(prepActiveRegionForProcessing(workQueue.remove(), walker)); } else { break; } } - return sum; + return readyRegions; + } - private T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker walker) { + private ActiveRegion prepActiveRegionForProcessing(final ActiveRegion activeRegion, final ActiveRegionWalker walker) { final List stillLive = new LinkedList(); for ( final GATKSAMRecord read : myReads.popCurrentReads() ) { boolean killed = false; @@ -561,7 +653,21 @@ public class TraverseActiveRegions extends TraversalEngine { + @Override + public M apply(final ActiveRegion activeRegion) { + if ( DEBUG ) logger.info("Executing walker.map for " + activeRegion + " in thread " + Thread.currentThread().getName()); + return walker.map(activeRegion, null); + } + } + + private class TraverseActiveRegionReduce implements NSReduceFunction { + @Override + public T apply(M one, T sum) { + return walker.reduce(one, sum); + } } } diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java index b6106d4bc..2e6705d77 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java @@ -77,7 +77,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { @DataProvider(name = "TraversalEngineProvider") public Object[][] makeTraversals() { final List traversals = new LinkedList(); - traversals.add(new Object[]{new TraverseActiveRegions()}); + traversals.add(new Object[]{new TraverseActiveRegions<>()}); return traversals.toArray(new Object[][]{}); } @@ -523,8 +523,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { final int maxTests = Integer.MAX_VALUE; int nTests = 0; - for ( final int readLength : Arrays.asList(10, 100) ) { - for ( final int skips : Arrays.asList(0, 1, 10) ) { + for ( final int readLength : Arrays.asList(100) ) { + for ( final int skips : Arrays.asList(0, 10) ) { for ( final int start : starts ) { for ( final int nReadsPerLocus : Arrays.asList(1, 2) ) { for ( final int nLoci : Arrays.asList(1, 1000) ) { @@ -536,7 +536,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { for ( final GenomeLocSortedSet activeRegions : enumerateActiveRegions(bamBuilder.getAlignmentStart(), bamBuilder.getAlignmentEnd())) { nTests++; if ( nTests < maxTests ) // && nTests == 1238 ) - tests.add(new Object[]{nTests, activeRegions, readStates, bamBuilder}); + tests.add(new Object[]{new TraverseActiveRegions<>(), nTests, activeRegions, readStates, bamBuilder}); } } } @@ -586,7 +586,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { @Test(enabled = true && ! DEBUG, dataProvider = "CombinatorialARTTilingProvider") - public void testARTReadsInActiveRegions(final int id, final GenomeLocSortedSet activeRegions, final EnumSet readStates, final ArtificialBAMBuilder bamBuilder) { + public void testARTReadsInActiveRegions(final TraverseActiveRegions traversal, final int id, final GenomeLocSortedSet activeRegions, final EnumSet readStates, final ArtificialBAMBuilder bamBuilder) { logger.warn("Running testARTReadsInActiveRegions id=" + id + " locs " + activeRegions + " against bam " + bamBuilder); final List intervals = Arrays.asList( genomeLocParser.createGenomeLoc("1", bamBuilder.getAlignmentStart(), bamBuilder.getAlignmentEnd()) @@ -595,7 +595,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { final DummyActiveRegionWalker walker = new DummyActiveRegionWalker(activeRegions, false); walker.setStates(readStates); - final TraverseActiveRegions traversal = new TraverseActiveRegions(); final Map activeRegionsMap = getActiveRegions(traversal, walker, intervals, bamBuilder.makeTemporarilyBAMFile()); final Set alreadySeenReads = new HashSet(); // for use with the primary / non-primary @@ -640,8 +639,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { // // --------------------------------------------------------------------------------------------------------- - @Test(enabled = true && ! DEBUG) - public void ensureAllInsertionReadsAreInActiveRegions() { + @Test(dataProvider = "TraversalEngineProvider", enabled = true && ! DEBUG) + public void ensureAllInsertionReadsAreInActiveRegions(final TraverseActiveRegions traversal) { final int readLength = 10; final int start = 20; @@ -667,7 +666,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { final DummyActiveRegionWalker walker = new DummyActiveRegionWalker(activeRegions, false); - final TraverseActiveRegions traversal = new TraverseActiveRegions(); final Map activeRegionsMap = getActiveRegions(traversal, walker, intervals, bamBuilder.makeTemporarilyBAMFile()); final ActiveRegion region = activeRegionsMap.values().iterator().next(); From 39e4396de0189f0acd39af66f08b0932028906cb Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 9 May 2013 11:39:19 -0400 Subject: [PATCH 2/2] New ActiveRegionShardBalancer allows efficient NanoScheduling -- Previously we used the LocusShardBalancer for the haplotype caller, which meant that TraverseActiveRegions saw its shards grouped in chunks of 16kb bits on the genome. These locus shards are useful when you want to use the HierarchicalMicroScheduler, as they provide fine-grained accessed to the underlying BAM, but they have two major drawbacks (1) we have to fairly frequently reset our state in TAR to handle moving between shard boundaries and (2) with the nano scheduled TAR we end up blocking at the end of each shard while our threads all finish processing. -- This commit changes the system over to using an ActiveRegionShardBalancers, that combines all of the shard data for a single contig into a single combined shard. This ensures that TAR, and by extensions the HaplotypeCaller, gets all of the data on a single contig together so the the NanoSchedule runs efficiently instead of blocking over and over at shard boundaries. This simple change allows us to scale efficiently to around 8 threads in the nano scheduler: -- See https://www.dropbox.com/s/k7f280pd2zt0lyh/hc_nano_linear_scale.pdf -- See https://www.dropbox.com/s/fflpnan802m2906/hc_nano_log_scale.pdf -- Misc. changes throughout the codebase so we Use the ActiveRegionShardBalancer where appropriate. -- Added unit tests for ActiveRegionShardBalancer to confirm it does the merging as expected. -- Fix bad toString in FilePointer --- .../sting/gatk/GenomeAnalysisEngine.java | 4 +- .../reads/ActiveRegionShardBalancer.java | 85 +++++++++++++++ .../gatk/datasources/reads/FilePointer.java | 4 +- .../ActiveRegionShardBalancerUnitTest.java | 101 ++++++++++++++++++ .../TraverseActiveRegionsUnitTest.java | 2 +- 5 files changed, 191 insertions(+), 5 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java create mode 100644 public/java/test/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancerUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 82bee7826..9dcba25ff 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -570,9 +570,9 @@ public class GenomeAnalysisEngine { if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate) throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Active region walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately."); if(intervals == null) - return readsDataSource.createShardIteratorOverMappedReads(new LocusShardBalancer()); + return readsDataSource.createShardIteratorOverMappedReads(new ActiveRegionShardBalancer()); else - return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer()); + return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new ActiveRegionShardBalancer()); } else if(walker instanceof ReadWalker || walker instanceof ReadPairWalker || walker instanceof DuplicateWalker) { // Apply special validation to read pair walkers. diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java new file mode 100644 index 000000000..febdc788e --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancer.java @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.gatk.datasources.reads; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * ActiveRegionShardBalancer + * + * Merges all of the file pointer information for a single contig index into a single + * combined shard. The purpose of doing this is to ensure that the HaplotypeCaller, which + * doesn't support TreeReduction by construction, gets all of the data on a single + * contig together so the the NanoSchedule runs efficiently + */ +public class ActiveRegionShardBalancer extends ShardBalancer { + /** + * Convert iterators of file pointers into balanced iterators of shards. + * @return An iterator over balanced shards. + */ + public Iterator iterator() { + return new Iterator() { + public boolean hasNext() { + return filePointers.hasNext(); + } + + public Shard next() { + FilePointer current = getCombinedFilePointersOnSingleContig(); + + // FilePointers have already been combined as necessary at the IntervalSharder level. No + // need to do so again here. + + return new LocusShard(parser,readsDataSource,current.getLocations(),current.fileSpans); + } + + public void remove() { + throw new UnsupportedOperationException("Unable to remove from shard balancing iterator"); + } + }; + } + + /** + * Combine all of the file pointers in the filePointers iterator into a single combined + * FilePointer that spans all of the file pointers on a single contig + * @return a non-null FilePointer + */ + private FilePointer getCombinedFilePointersOnSingleContig() { + FilePointer current = filePointers.next(); + + final List toCombine = new LinkedList<>(); + toCombine.add(current); + + while ( filePointers.hasNext() && + current.isRegionUnmapped == filePointers.peek().isRegionUnmapped && + (current.getContigIndex() == filePointers.peek().getContigIndex() || current.isRegionUnmapped) ) { + toCombine.add(filePointers.next()); + } + + return FilePointer.union(toCombine, parser); + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java index 56bf5197d..517903da3 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/FilePointer.java @@ -407,10 +407,10 @@ public class FilePointer { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("FilePointer:%n"); + builder.append("FilePointer:\n"); builder.append("\tlocations = {"); builder.append(Utils.join(";",locations)); - builder.append("}%n\tregions = %n"); + builder.append("}\n\tregions = \n"); for(Map.Entry entry: fileSpans.entrySet()) { builder.append(entry.getKey()); builder.append("= {"); diff --git a/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancerUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancerUnitTest.java new file mode 100644 index 000000000..e768faba4 --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/gatk/datasources/reads/ActiveRegionShardBalancerUnitTest.java @@ -0,0 +1,101 @@ +/* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package org.broadinstitute.sting.gatk.datasources.reads; + +import net.sf.samtools.SAMFileHeader; +import net.sf.samtools.SAMFileSpan; +import net.sf.samtools.SAMSequenceRecord; +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.FileNotFoundException; +import java.util.*; + +public class ActiveRegionShardBalancerUnitTest extends BaseTest { + // example genome loc parser for this test, can be deleted if you don't use the reference + private GenomeLocParser genomeLocParser; + protected SAMDataSource readsDataSource; + + @BeforeClass + public void setup() throws FileNotFoundException { + // sequence + final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(10, 0, 10000); + genomeLocParser = new GenomeLocParser(header.getSequenceDictionary()); + readsDataSource = null; + } + + @Test + public void testMergingManyContigs() { + executeTest(genomeLocParser.getContigs().getSequences()); + } + + @Test + public void testMergingAllPointersOnSingleContig() { + executeTest(Arrays.asList(genomeLocParser.getContigs().getSequences().get(1))); + } + + @Test + public void testMergingMultipleDiscontinuousContigs() { + final List all = genomeLocParser.getContigs().getSequences(); + executeTest(Arrays.asList(all.get(1), all.get(3))); + } + + private void executeTest(final Collection records) { + final ActiveRegionShardBalancer balancer = new ActiveRegionShardBalancer(); + + final List> expectedLocs = new LinkedList<>(); + final List pointers = new LinkedList<>(); + + for ( final SAMSequenceRecord record : records ) { + final int size = 10; + int end = 0; + for ( int i = 0; i < record.getSequenceLength(); i += size) { + final int myEnd = i + size - 1; + end = myEnd; + final GenomeLoc loc = genomeLocParser.createGenomeLoc(record.getSequenceName(), i, myEnd); + final Map fileSpans = Collections.emptyMap(); + final FilePointer fp = new FilePointer(fileSpans, Collections.singletonList(loc)); + pointers.add(fp); + } + expectedLocs.add(Collections.singleton(genomeLocParser.createGenomeLoc(record.getSequenceName(), 0, end))); + } + + balancer.initialize(readsDataSource, pointers.iterator(), genomeLocParser); + + int i = 0; + int nShardsFound = 0; + for ( final Shard shard : balancer ) { + nShardsFound++; + Assert.assertEquals(new HashSet<>(shard.getGenomeLocs()), expectedLocs.get(i++)); + } + Assert.assertEquals(nShardsFound, records.size(), "Didn't find exactly one shard for each contig in the sequence dictionary"); + } +} diff --git a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java index 2e6705d77..1f5cd6d0e 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegionsUnitTest.java @@ -490,7 +490,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest { traverseActiveRegions.initialize(engine, walker); List providers = new ArrayList(); - for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) { + for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ActiveRegionShardBalancer())) { for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs(), samples)) { providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList())); }