Merge pull request #220 from broadinstitute/md_nano_hc
NanoScheduling for the HaplotypeCaller!
This commit is contained in:
commit
349521e4c2
|
|
@ -139,7 +139,7 @@ import java.util.*;
|
|||
@ActiveRegionTraversalParameters(extension=100, maxRegion=300)
|
||||
@ReadFilters({HCMappingQualityFilter.class})
|
||||
@Downsample(by= DownsampleType.BY_SAMPLE, toCoverage=250)
|
||||
public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implements AnnotatorCompatible {
|
||||
public class HaplotypeCaller extends ActiveRegionWalker<List<VariantContext>, Integer> implements AnnotatorCompatible, NanoSchedulable {
|
||||
// -----------------------------------------------------------------------------------------------
|
||||
// general haplotype caller arguments
|
||||
// -----------------------------------------------------------------------------------------------
|
||||
|
|
@ -645,15 +645,16 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
//
|
||||
//---------------------------------------------------------------------------------------------------------------
|
||||
|
||||
private final static List<VariantContext> NO_CALLS = Collections.emptyList();
|
||||
@Override
|
||||
public Integer map( final ActiveRegion originalActiveRegion, final RefMetaDataTracker metaDataTracker ) {
|
||||
public List<VariantContext> map( final ActiveRegion originalActiveRegion, final RefMetaDataTracker metaDataTracker ) {
|
||||
if ( justDetermineActiveRegions )
|
||||
// we're benchmarking ART and/or the active region determination code in the HC, just leave without doing any work
|
||||
return 1;
|
||||
return NO_CALLS;
|
||||
|
||||
if( !originalActiveRegion.isActive() ) { return 0; } // Not active so nothing to do!
|
||||
if( !originalActiveRegion.isActive() ) { return NO_CALLS; } // Not active so nothing to do!
|
||||
|
||||
final List<VariantContext> activeAllelesToGenotype = new ArrayList<VariantContext>();
|
||||
final List<VariantContext> activeAllelesToGenotype = new ArrayList<>();
|
||||
if( UG_engine.getUAC().GenotypingMode == GenotypeLikelihoodsCalculationModel.GENOTYPING_MODE.GENOTYPE_GIVEN_ALLELES ) {
|
||||
for( final VariantContext vc : allelesToGenotype ) {
|
||||
if( originalActiveRegion.getLocation().overlapsP( getToolkit().getGenomeLocParser().createGenomeLoc(vc) ) ) {
|
||||
|
|
@ -662,23 +663,23 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
}
|
||||
allelesToGenotype.removeAll( activeAllelesToGenotype );
|
||||
// No alleles found in this region so nothing to do!
|
||||
if ( activeAllelesToGenotype.isEmpty() ) { return 0; }
|
||||
if ( activeAllelesToGenotype.isEmpty() ) { return NO_CALLS; }
|
||||
} else {
|
||||
if( originalActiveRegion.size() == 0 ) { return 0; } // No reads here so nothing to do!
|
||||
if( originalActiveRegion.size() == 0 ) { return NO_CALLS; } // No reads here so nothing to do!
|
||||
}
|
||||
|
||||
// run the local assembler, getting back a collection of information on how we should proceed
|
||||
final AssemblyResult assemblyResult = assembleReads(originalActiveRegion, activeAllelesToGenotype);
|
||||
|
||||
// abort early if something is out of the acceptable range
|
||||
if( ! assemblyResult.isVariationPresent() ) { return 1; } // only the reference haplotype remains so nothing else to do!
|
||||
if (dontGenotype) return 1; // user requested we not proceed
|
||||
if( ! assemblyResult.isVariationPresent() ) { return NO_CALLS; } // only the reference haplotype remains so nothing else to do!
|
||||
if (dontGenotype) return NO_CALLS; // user requested we not proceed
|
||||
|
||||
// filter out reads from genotyping which fail mapping quality based criteria
|
||||
final List<GATKSAMRecord> filteredReads = filterNonPassingReads( assemblyResult.regionForGenotyping );
|
||||
final Map<String, List<GATKSAMRecord>> perSampleFilteredReadList = splitReadsBySample( filteredReads );
|
||||
|
||||
if( assemblyResult.regionForGenotyping.size() == 0 ) { return 1; } // no reads remain after filtering so nothing else to do!
|
||||
if( assemblyResult.regionForGenotyping.size() == 0 ) { return NO_CALLS; } // no reads remain after filtering so nothing else to do!
|
||||
|
||||
// evaluate each sample's reads against all haplotypes
|
||||
//logger.info("Computing read likelihoods with " + assemblyResult.regionForGenotyping.size() + " reads");
|
||||
|
|
@ -697,12 +698,7 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
getToolkit().getGenomeLocParser(),
|
||||
activeAllelesToGenotype );
|
||||
|
||||
for( final VariantContext call : calledHaplotypes.getCalls() ) {
|
||||
// TODO -- uncomment this line once ART-based walkers have a proper RefMetaDataTracker.
|
||||
// annotationEngine.annotateDBs(metaDataTracker, getToolkit().getGenomeLocParser().createGenomeLoc(call), call);
|
||||
vcfWriter.add( call );
|
||||
}
|
||||
|
||||
// TODO -- must disable if we are doing NCT, or set the output type of ! presorted
|
||||
if ( bamWriter != null ) {
|
||||
haplotypeBAMWriter.writeReadsAlignedToHaplotypes(assemblyResult.haplotypes, assemblyResult.paddedReferenceLoc,
|
||||
bestHaplotypes,
|
||||
|
|
@ -712,7 +708,7 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
|
||||
if( DEBUG ) { logger.info("----------------------------------------------------------------------------------"); }
|
||||
|
||||
return 1; // One active region was processed during this map call
|
||||
return calledHaplotypes.getCalls();
|
||||
}
|
||||
|
||||
private final static class AssemblyResult {
|
||||
|
|
@ -855,8 +851,13 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
}
|
||||
|
||||
@Override
|
||||
public Integer reduce(Integer cur, Integer sum) {
|
||||
return cur + sum;
|
||||
public Integer reduce(List<VariantContext> callsInRegion, Integer numCalledRegions) {
|
||||
for( final VariantContext call : callsInRegion ) {
|
||||
// TODO -- uncomment this line once ART-based walkers have a proper RefMetaDataTracker.
|
||||
// annotationEngine.annotateDBs(metaDataTracker, getToolkit().getGenomeLocParser().createGenomeLoc(call), call);
|
||||
vcfWriter.add( call );
|
||||
}
|
||||
return (callsInRegion.isEmpty() ? 0 : 1) + numCalledRegions;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -872,7 +873,7 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
|
||||
private void finalizeActiveRegion( final ActiveRegion activeRegion ) {
|
||||
if( DEBUG ) { logger.info("Assembling " + activeRegion.getLocation() + " with " + activeRegion.size() + " reads: (with overlap region = " + activeRegion.getExtendedLoc() + ")"); }
|
||||
final List<GATKSAMRecord> finalizedReadList = new ArrayList<GATKSAMRecord>();
|
||||
final List<GATKSAMRecord> finalizedReadList = new ArrayList<>();
|
||||
final FragmentCollection<GATKSAMRecord> fragmentCollection = FragmentUtils.create( activeRegion.getReads() );
|
||||
activeRegion.clearReads();
|
||||
|
||||
|
|
@ -883,7 +884,7 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
}
|
||||
|
||||
// Loop through the reads hard clipping the adaptor and low quality tails
|
||||
final List<GATKSAMRecord> readsToUse = new ArrayList<GATKSAMRecord>(finalizedReadList.size());
|
||||
final List<GATKSAMRecord> readsToUse = new ArrayList<>(finalizedReadList.size());
|
||||
for( final GATKSAMRecord myRead : finalizedReadList ) {
|
||||
final GATKSAMRecord postAdapterRead = ( myRead.getReadUnmappedFlag() ? myRead : ReadClipper.hardClipAdaptorSequence( myRead ) );
|
||||
if( postAdapterRead != null && !postAdapterRead.isEmpty() && postAdapterRead.getCigar().getReadLength() > 0 ) {
|
||||
|
|
@ -937,7 +938,7 @@ public class HaplotypeCaller extends ActiveRegionWalker<Integer, Integer> implem
|
|||
for( final String sample : samplesList) {
|
||||
List<GATKSAMRecord> readList = returnMap.get( sample );
|
||||
if( readList == null ) {
|
||||
readList = new ArrayList<GATKSAMRecord>();
|
||||
readList = new ArrayList<>();
|
||||
returnMap.put(sample, readList);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,7 +71,20 @@ public class LikelihoodCalculationEngine {
|
|||
private final byte constantGCP;
|
||||
private final double log10globalReadMismappingRate;
|
||||
private final boolean DEBUG;
|
||||
private final PairHMM pairHMM;
|
||||
private final PairHMM.HMM_IMPLEMENTATION hmmType;
|
||||
|
||||
private final ThreadLocal<PairHMM> pairHMM = new ThreadLocal<PairHMM>() {
|
||||
@Override
|
||||
protected PairHMM initialValue() {
|
||||
switch (hmmType) {
|
||||
case EXACT: return new Log10PairHMM(true);
|
||||
case ORIGINAL: return new Log10PairHMM(false);
|
||||
case LOGLESS_CACHING: return new LoglessPairHMM();
|
||||
default:
|
||||
throw new UserException.BadArgumentValue("pairHMM", "Specified pairHMM implementation is unrecognized or incompatible with the HaplotypeCaller. Acceptable options are ORIGINAL, EXACT, CACHING, and LOGLESS_CACHING.");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* The expected rate of random sequencing errors for a read originating from its true haplotype.
|
||||
|
|
@ -96,22 +109,9 @@ public class LikelihoodCalculationEngine {
|
|||
* assigned a likelihood of -13.
|
||||
*/
|
||||
public LikelihoodCalculationEngine( final byte constantGCP, final boolean debug, final PairHMM.HMM_IMPLEMENTATION hmmType, final double log10globalReadMismappingRate ) {
|
||||
switch (hmmType) {
|
||||
case EXACT:
|
||||
pairHMM = new Log10PairHMM(true);
|
||||
break;
|
||||
case ORIGINAL:
|
||||
pairHMM = new Log10PairHMM(false);
|
||||
break;
|
||||
case LOGLESS_CACHING:
|
||||
pairHMM = new LoglessPairHMM();
|
||||
break;
|
||||
default:
|
||||
throw new UserException.BadArgumentValue("pairHMM", "Specified pairHMM implementation is unrecognized or incompatible with the HaplotypeCaller. Acceptable options are ORIGINAL, EXACT, CACHING, and LOGLESS_CACHING.");
|
||||
}
|
||||
|
||||
this.hmmType = hmmType;
|
||||
this.constantGCP = constantGCP;
|
||||
DEBUG = debug;
|
||||
this.DEBUG = debug;
|
||||
this.log10globalReadMismappingRate = log10globalReadMismappingRate;
|
||||
}
|
||||
|
||||
|
|
@ -143,7 +143,7 @@ public class LikelihoodCalculationEngine {
|
|||
}
|
||||
|
||||
// initialize arrays to hold the probabilities of being in the match, insertion and deletion cases
|
||||
pairHMM.initialize(X_METRIC_LENGTH, Y_METRIC_LENGTH);
|
||||
pairHMM.get().initialize(X_METRIC_LENGTH, Y_METRIC_LENGTH);
|
||||
}
|
||||
|
||||
public Map<String, PerReadAlleleLikelihoodMap> computeReadLikelihoods( final List<Haplotype> haplotypes, final Map<String, List<GATKSAMRecord>> perSampleReadList ) {
|
||||
|
|
@ -151,7 +151,7 @@ public class LikelihoodCalculationEngine {
|
|||
initializePairHMM(haplotypes, perSampleReadList);
|
||||
|
||||
// Add likelihoods for each sample's reads to our stratifiedReadMap
|
||||
final Map<String, PerReadAlleleLikelihoodMap> stratifiedReadMap = new HashMap<String, PerReadAlleleLikelihoodMap>();
|
||||
final Map<String, PerReadAlleleLikelihoodMap> stratifiedReadMap = new LinkedHashMap<>();
|
||||
for( final Map.Entry<String, List<GATKSAMRecord>> sampleEntry : perSampleReadList.entrySet() ) {
|
||||
// evaluate the likelihood of the reads given those haplotypes
|
||||
final PerReadAlleleLikelihoodMap map = computeReadLikelihoods(haplotypes, sampleEntry.getValue());
|
||||
|
|
@ -170,7 +170,7 @@ public class LikelihoodCalculationEngine {
|
|||
private PerReadAlleleLikelihoodMap computeReadLikelihoods( final List<Haplotype> haplotypes, final List<GATKSAMRecord> reads) {
|
||||
// first, a little set up to get copies of the Haplotypes that are Alleles (more efficient than creating them each time)
|
||||
final int numHaplotypes = haplotypes.size();
|
||||
final Map<Haplotype, Allele> alleleVersions = new HashMap<>(numHaplotypes);
|
||||
final Map<Haplotype, Allele> alleleVersions = new LinkedHashMap<>(numHaplotypes);
|
||||
Allele refAllele = null;
|
||||
for ( final Haplotype haplotype : haplotypes ) {
|
||||
final Allele allele = Allele.create(haplotype, true);
|
||||
|
|
@ -202,7 +202,7 @@ public class LikelihoodCalculationEngine {
|
|||
for( int jjj = 0; jjj < numHaplotypes; jjj++ ) {
|
||||
final Haplotype haplotype = haplotypes.get(jjj);
|
||||
final boolean isFirstHaplotype = jjj == 0;
|
||||
final double log10l = pairHMM.computeReadLikelihoodGivenHaplotypeLog10(haplotype.getBases(),
|
||||
final double log10l = pairHMM.get().computeReadLikelihoodGivenHaplotypeLog10(haplotype.getBases(),
|
||||
read.getReadBases(), readQuals, readInsQuals, readDelQuals, overallGCP, isFirstHaplotype);
|
||||
|
||||
if ( haplotype.isNonReference() )
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ package org.broadinstitute.sting.gatk.walkers.haplotypecaller.graphs;
|
|||
import com.google.java.contract.Requires;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* A graph vertex containing a sequence of bases and a unique ID that
|
||||
|
|
@ -71,8 +72,9 @@ import java.util.Arrays;
|
|||
* @since 03/2013
|
||||
*/
|
||||
public final class SeqVertex extends BaseVertex {
|
||||
private static int idCounter = 0;
|
||||
public final int id;
|
||||
// Note that using an AtomicInteger is critical to allow multi-threaded HaplotypeCaller
|
||||
private static final AtomicInteger idCounter = new AtomicInteger(0);
|
||||
private int id = idCounter.getAndIncrement();
|
||||
|
||||
/**
|
||||
* Create a new SeqVertex with sequence and the next available id
|
||||
|
|
@ -80,7 +82,6 @@ public final class SeqVertex extends BaseVertex {
|
|||
*/
|
||||
public SeqVertex(final byte[] sequence) {
|
||||
super(sequence);
|
||||
this.id = idCounter++;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -89,7 +90,6 @@ public final class SeqVertex extends BaseVertex {
|
|||
*/
|
||||
public SeqVertex(final String sequence) {
|
||||
super(sequence);
|
||||
this.id = idCounter++;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ import org.broadinstitute.sting.utils.Utils;
|
|||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* A DeBruijnVertex that supports multiple copies of the same kmer
|
||||
|
|
@ -65,10 +66,12 @@ import java.util.List;
|
|||
*/
|
||||
final class MultiDeBruijnVertex extends DeBruijnVertex {
|
||||
private final static boolean KEEP_TRACK_OF_READS = false;
|
||||
private static int idCounter = 0;
|
||||
|
||||
// Note that using an AtomicInteger is critical to allow multi-threaded HaplotypeCaller
|
||||
private static final AtomicInteger idCounter = new AtomicInteger(0);
|
||||
private int id = idCounter.getAndIncrement();
|
||||
|
||||
private final List<String> reads = new LinkedList<String>();
|
||||
private int id = idCounter++; // TODO -- potential race condition problem here
|
||||
|
||||
/**
|
||||
* Create a new MultiDeBruijnVertex with kmer sequence
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* By downloading the PROGRAM you agree to the following terms of use:
|
||||
*
|
||||
* BROAD INSTITUTE - SOFTWARE LICENSE AGREEMENT - FOR ACADEMIC NON-COMMERCIAL RESEARCH PURPOSES ONLY
|
||||
*
|
||||
* This Agreement is made between the Broad Institute, Inc. with a principal address at 7 Cambridge Center, Cambridge, MA 02142 (BROAD) and the LICENSEE and is effective at the date the downloading is completed (EFFECTIVE DATE).
|
||||
*
|
||||
* WHEREAS, LICENSEE desires to license the PROGRAM, as defined hereinafter, and BROAD wishes to have this PROGRAM utilized in the public interest, subject only to the royalty-free, nonexclusive, nontransferable license rights of the United States Government pursuant to 48 CFR 52.227-14; and
|
||||
* WHEREAS, LICENSEE desires to license the PROGRAM and BROAD desires to grant a license on the following terms and conditions.
|
||||
* NOW, THEREFORE, in consideration of the promises and covenants made herein, the parties hereto agree as follows:
|
||||
*
|
||||
* 1. DEFINITIONS
|
||||
* 1.1 PROGRAM shall mean copyright in the object code and source code known as GATK2 and related documentation, if any, as they exist on the EFFECTIVE DATE and can be downloaded from http://www.broadinstitute/GATK on the EFFECTIVE DATE.
|
||||
*
|
||||
* 2. LICENSE
|
||||
* 2.1 Grant. Subject to the terms of this Agreement, BROAD hereby grants to LICENSEE, solely for academic non-commercial research purposes, a non-exclusive, non-transferable license to: (a) download, execute and display the PROGRAM and (b) create bug fixes and modify the PROGRAM.
|
||||
* The LICENSEE may apply the PROGRAM in a pipeline to data owned by users other than the LICENSEE and provide these users the results of the PROGRAM provided LICENSEE does so for academic non-commercial purposes only. For clarification purposes, academic sponsored research is not a commercial use under the terms of this Agreement.
|
||||
* 2.2 No Sublicensing or Additional Rights. LICENSEE shall not sublicense or distribute the PROGRAM, in whole or in part, without prior written permission from BROAD. LICENSEE shall ensure that all of its users agree to the terms of this Agreement. LICENSEE further agrees that it shall not put the PROGRAM on a network, server, or other similar technology that may be accessed by anyone other than the LICENSEE and its employees and users who have agreed to the terms of this agreement.
|
||||
* 2.3 License Limitations. Nothing in this Agreement shall be construed to confer any rights upon LICENSEE by implication, estoppel, or otherwise to any computer software, trademark, intellectual property, or patent rights of BROAD, or of any other entity, except as expressly granted herein. LICENSEE agrees that the PROGRAM, in whole or part, shall not be used for any commercial purpose, including without limitation, as the basis of a commercial software or hardware product or to provide services. LICENSEE further agrees that the PROGRAM shall not be copied or otherwise adapted in order to circumvent the need for obtaining a license for use of the PROGRAM.
|
||||
*
|
||||
* 3. OWNERSHIP OF INTELLECTUAL PROPERTY
|
||||
* LICENSEE acknowledges that title to the PROGRAM shall remain with BROAD. The PROGRAM is marked with the following BROAD copyright notice and notice of attribution to contributors. LICENSEE shall retain such notice on all copies. LICENSEE agrees to include appropriate attribution if any results obtained from use of the PROGRAM are included in any publication.
|
||||
* Copyright 2012 Broad Institute, Inc.
|
||||
* Notice of attribution: The GATK2 program was made available through the generosity of Medical and Population Genetics program at the Broad Institute, Inc.
|
||||
* LICENSEE shall not use any trademark or trade name of BROAD, or any variation, adaptation, or abbreviation, of such marks or trade names, or any names of officers, faculty, students, employees, or agents of BROAD except as states above for attribution purposes.
|
||||
*
|
||||
* 4. INDEMNIFICATION
|
||||
* LICENSEE shall indemnify, defend, and hold harmless BROAD, and their respective officers, faculty, students, employees, associated investigators and agents, and their respective successors, heirs and assigns, (Indemnitees), against any liability, damage, loss, or expense (including reasonable attorneys fees and expenses) incurred by or imposed upon any of the Indemnitees in connection with any claims, suits, actions, demands or judgments arising out of any theory of liability (including, without limitation, actions in the form of tort, warranty, or strict liability and regardless of whether such action has any factual basis) pursuant to any right or license granted under this Agreement.
|
||||
*
|
||||
* 5. NO REPRESENTATIONS OR WARRANTIES
|
||||
* THE PROGRAM IS DELIVERED AS IS. BROAD MAKES NO REPRESENTATIONS OR WARRANTIES OF ANY KIND CONCERNING THE PROGRAM OR THE COPYRIGHT, EXPRESS OR IMPLIED, INCLUDING, WITHOUT LIMITATION, WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NONINFRINGEMENT, OR THE ABSENCE OF LATENT OR OTHER DEFECTS, WHETHER OR NOT DISCOVERABLE. BROAD EXTENDS NO WARRANTIES OF ANY KIND AS TO PROGRAM CONFORMITY WITH WHATEVER USER MANUALS OR OTHER LITERATURE MAY BE ISSUED FROM TIME TO TIME.
|
||||
* IN NO EVENT SHALL BROAD OR ITS RESPECTIVE DIRECTORS, OFFICERS, EMPLOYEES, AFFILIATED INVESTIGATORS AND AFFILIATES BE LIABLE FOR INCIDENTAL OR CONSEQUENTIAL DAMAGES OF ANY KIND, INCLUDING, WITHOUT LIMITATION, ECONOMIC DAMAGES OR INJURY TO PROPERTY AND LOST PROFITS, REGARDLESS OF WHETHER BROAD SHALL BE ADVISED, SHALL HAVE OTHER REASON TO KNOW, OR IN FACT SHALL KNOW OF THE POSSIBILITY OF THE FOREGOING.
|
||||
*
|
||||
* 6. ASSIGNMENT
|
||||
* This Agreement is personal to LICENSEE and any rights or obligations assigned by LICENSEE without the prior written consent of BROAD shall be null and void.
|
||||
*
|
||||
* 7. MISCELLANEOUS
|
||||
* 7.1 Export Control. LICENSEE gives assurance that it will comply with all United States export control laws and regulations controlling the export of the PROGRAM, including, without limitation, all Export Administration Regulations of the United States Department of Commerce. Among other things, these laws and regulations prohibit, or require a license for, the export of certain types of software to specified countries.
|
||||
* 7.2 Termination. LICENSEE shall have the right to terminate this Agreement for any reason upon prior written notice to BROAD. If LICENSEE breaches any provision hereunder, and fails to cure such breach within thirty (30) days, BROAD may terminate this Agreement immediately. Upon termination, LICENSEE shall provide BROAD with written assurance that the original and all copies of the PROGRAM have been destroyed, except that, upon prior written authorization from BROAD, LICENSEE may retain a copy for archive purposes.
|
||||
* 7.3 Survival. The following provisions shall survive the expiration or termination of this Agreement: Articles 1, 3, 4, 5 and Sections 2.2, 2.3, 7.3, and 7.4.
|
||||
* 7.4 Notice. Any notices under this Agreement shall be in writing, shall specifically refer to this Agreement, and shall be sent by hand, recognized national overnight courier, confirmed facsimile transmission, confirmed electronic mail, or registered or certified mail, postage prepaid, return receipt requested. All notices under this Agreement shall be deemed effective upon receipt.
|
||||
* 7.5 Amendment and Waiver; Entire Agreement. This Agreement may be amended, supplemented, or otherwise modified only by means of a written instrument signed by all parties. Any waiver of any rights or failure to act in a specific instance shall relate only to such instance and shall not be construed as an agreement to waive any rights or fail to act in any other instance, whether or not similar. This Agreement constitutes the entire agreement among the parties with respect to its subject matter and supersedes prior agreements or understandings between the parties relating to its subject matter.
|
||||
* 7.6 Binding Effect; Headings. This Agreement shall be binding upon and inure to the benefit of the parties and their respective permitted successors and assigns. All headings are for convenience only and shall not affect the meaning of any provision of this Agreement.
|
||||
* 7.7 Governing Law. This Agreement shall be construed, governed, interpreted and applied in accordance with the internal laws of the Commonwealth of Massachusetts, U.S.A., without regard to conflict of laws principles.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.walkers.haplotypecaller;
|
||||
|
||||
import org.broadinstitute.sting.WalkerTest;
|
||||
import org.broadinstitute.sting.utils.haplotypeBAMWriter.HaplotypeBAMWriter;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class HaplotypeCallerParallelIntegrationTest extends WalkerTest {
|
||||
@DataProvider(name = "NCTDataProvider")
|
||||
public Object[][] makeNCTDataProvider() {
|
||||
List<Object[]> tests = new ArrayList<Object[]>();
|
||||
|
||||
for ( final int nct : Arrays.asList(1, 2, 4) ) {
|
||||
tests.add(new Object[]{nct, "c277fd65365d59b734260dd8423313bb"});
|
||||
}
|
||||
|
||||
return tests.toArray(new Object[][]{});
|
||||
}
|
||||
|
||||
@Test(dataProvider = "NCTDataProvider")
|
||||
public void testHCNCT(final int nct, final String md5) {
|
||||
WalkerTestSpec spec = new WalkerTestSpec(
|
||||
"-T HaplotypeCaller -R " + b37KGReference + " --no_cmdline_in_header -I "
|
||||
+ privateTestDir + "PCRFree.2x250.Illumina.20_10_11.bam -o %s " +
|
||||
" -L 20:10,000,000-10,100,000 -G none -A -contamination 0.0 -nct " + nct, 1,
|
||||
Arrays.asList(md5));
|
||||
executeTest("HC test parallel HC with NCT with nct " + nct, spec);
|
||||
}
|
||||
}
|
||||
|
|
@ -570,9 +570,9 @@ public class GenomeAnalysisEngine {
|
|||
if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate)
|
||||
throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Active region walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately.");
|
||||
if(intervals == null)
|
||||
return readsDataSource.createShardIteratorOverMappedReads(new LocusShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverMappedReads(new ActiveRegionShardBalancer());
|
||||
else
|
||||
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new ActiveRegionShardBalancer());
|
||||
}
|
||||
else if(walker instanceof ReadWalker || walker instanceof ReadPairWalker || walker instanceof DuplicateWalker) {
|
||||
// Apply special validation to read pair walkers.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright (c) 2012 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ActiveRegionShardBalancer
|
||||
*
|
||||
* Merges all of the file pointer information for a single contig index into a single
|
||||
* combined shard. The purpose of doing this is to ensure that the HaplotypeCaller, which
|
||||
* doesn't support TreeReduction by construction, gets all of the data on a single
|
||||
* contig together so the the NanoSchedule runs efficiently
|
||||
*/
|
||||
public class ActiveRegionShardBalancer extends ShardBalancer {
|
||||
/**
|
||||
* Convert iterators of file pointers into balanced iterators of shards.
|
||||
* @return An iterator over balanced shards.
|
||||
*/
|
||||
public Iterator<Shard> iterator() {
|
||||
return new Iterator<Shard>() {
|
||||
public boolean hasNext() {
|
||||
return filePointers.hasNext();
|
||||
}
|
||||
|
||||
public Shard next() {
|
||||
FilePointer current = getCombinedFilePointersOnSingleContig();
|
||||
|
||||
// FilePointers have already been combined as necessary at the IntervalSharder level. No
|
||||
// need to do so again here.
|
||||
|
||||
return new LocusShard(parser,readsDataSource,current.getLocations(),current.fileSpans);
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Unable to remove from shard balancing iterator");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Combine all of the file pointers in the filePointers iterator into a single combined
|
||||
* FilePointer that spans all of the file pointers on a single contig
|
||||
* @return a non-null FilePointer
|
||||
*/
|
||||
private FilePointer getCombinedFilePointersOnSingleContig() {
|
||||
FilePointer current = filePointers.next();
|
||||
|
||||
final List<FilePointer> toCombine = new LinkedList<>();
|
||||
toCombine.add(current);
|
||||
|
||||
while ( filePointers.hasNext() &&
|
||||
current.isRegionUnmapped == filePointers.peek().isRegionUnmapped &&
|
||||
(current.getContigIndex() == filePointers.peek().getContigIndex() || current.isRegionUnmapped) ) {
|
||||
toCombine.add(filePointers.next());
|
||||
}
|
||||
|
||||
return FilePointer.union(toCombine, parser);
|
||||
}
|
||||
}
|
||||
|
|
@ -407,10 +407,10 @@ public class FilePointer {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("FilePointer:%n");
|
||||
builder.append("FilePointer:\n");
|
||||
builder.append("\tlocations = {");
|
||||
builder.append(Utils.join(";",locations));
|
||||
builder.append("}%n\tregions = %n");
|
||||
builder.append("}\n\tregions = \n");
|
||||
for(Map.Entry<SAMReaderID,SAMFileSpan> entry: fileSpans.entrySet()) {
|
||||
builder.append(entry.getKey());
|
||||
builder.append("= {");
|
||||
|
|
|
|||
|
|
@ -245,7 +245,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
} else if (walker instanceof ReadPairWalker) {
|
||||
return new TraverseReadPairs();
|
||||
} else if (walker instanceof ActiveRegionWalker) {
|
||||
return new TraverseActiveRegions();
|
||||
return new TraverseActiveRegions(threadAllocation.getNumCPUThreadsPerDataThread());
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,12 +41,22 @@ import org.broadinstitute.sting.gatk.walkers.Walker;
|
|||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.SampleUtils;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
import org.broadinstitute.sting.utils.activeregion.*;
|
||||
import org.broadinstitute.sting.utils.activeregion.ActiveRegion;
|
||||
import org.broadinstitute.sting.utils.activeregion.ActivityProfile;
|
||||
import org.broadinstitute.sting.utils.activeregion.ActivityProfileState;
|
||||
import org.broadinstitute.sting.utils.activeregion.BandPassActivityProfile;
|
||||
import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction;
|
||||
import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction;
|
||||
import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction;
|
||||
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
|
||||
import org.broadinstitute.sting.utils.progressmeter.ProgressMeter;
|
||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.*;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Implement active region traversal
|
||||
|
|
@ -67,7 +77,8 @@ import java.util.*;
|
|||
* variable spanOfLastReadSeen
|
||||
*
|
||||
*/
|
||||
public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
|
||||
public final class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegionWalker<M,T>,LocusShardDataProvider> {
|
||||
private final static boolean DEBUG = false;
|
||||
protected final static Logger logger = Logger.getLogger(TraversalEngine.class);
|
||||
protected final static boolean LOG_READ_CARRYING = false;
|
||||
|
||||
|
|
@ -84,7 +95,32 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
private GenomeLoc spanOfLastReadSeen = null;
|
||||
private ActivityProfile activityProfile = null;
|
||||
int maxReadsInMemory = 0;
|
||||
ActiveRegionWalker walker;
|
||||
ActiveRegionWalker<M, T> walker;
|
||||
|
||||
final NanoScheduler<ActiveRegion, M, T> nanoScheduler;
|
||||
|
||||
/**
|
||||
* Create a single threaded active region traverser
|
||||
*/
|
||||
public TraverseActiveRegions() {
|
||||
this(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an active region traverser that uses nThreads for getting its work done
|
||||
* @param nThreads number of threads
|
||||
*/
|
||||
public TraverseActiveRegions(final int nThreads) {
|
||||
nanoScheduler = new NanoScheduler<>(nThreads);
|
||||
nanoScheduler.setProgressFunction(new NSProgressFunction<ActiveRegion>() {
|
||||
@Override
|
||||
public void progress(ActiveRegion lastActiveRegion) {
|
||||
if ( lastActiveRegion != null )
|
||||
// note, need to use getStopLocation so we don't give an interval to ProgressMeterDaemon
|
||||
printProgress(lastActiveRegion.getLocation().getStopLocation());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Have the debugging output streams been initialized already?
|
||||
|
|
@ -98,7 +134,7 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
public void initialize(GenomeAnalysisEngine engine, Walker walker, ProgressMeter progressMeter) {
|
||||
super.initialize(engine, walker, progressMeter);
|
||||
|
||||
this.walker = (ActiveRegionWalker)walker;
|
||||
this.walker = (ActiveRegionWalker<M,T>)walker;
|
||||
if ( this.walker.wantsExtendedReads() && ! this.walker.wantsNonPrimaryReads() ) {
|
||||
throw new IllegalArgumentException("Active region walker " + this.walker + " requested extended events but not " +
|
||||
"non-primary reads, an inconsistent state. Please modify the walker");
|
||||
|
|
@ -217,58 +253,108 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
if ( LOG_READ_CARRYING || logger.isDebugEnabled() )
|
||||
logger.info(String.format("TraverseActiveRegions.traverse: Shard is %s", dataProvider));
|
||||
|
||||
final LocusView locusView = new AllLocusView(dataProvider);
|
||||
final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider );
|
||||
final ReferenceOrderedView referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
|
||||
|
||||
// We keep processing while the next reference location is within the interval
|
||||
final GenomeLoc locOfLastReadAtTraversalStart = spanOfLastSeenRead();
|
||||
|
||||
while( locusView.hasNext() ) {
|
||||
final AlignmentContext locus = locusView.next();
|
||||
final GenomeLoc location = locus.getLocation();
|
||||
|
||||
rememberLastLocusLocation(location);
|
||||
|
||||
// get all of the new reads that appear in the current pileup, and them to our list of reads
|
||||
// provided we haven't seen them before
|
||||
final Collection<GATKSAMRecord> reads = locusView.getLIBS().transferReadsFromAllPreviousPileups();
|
||||
for( final GATKSAMRecord read : reads ) {
|
||||
if ( ! appearedInLastShard(locOfLastReadAtTraversalStart, read) ) {
|
||||
rememberLastReadLocation(read);
|
||||
myReads.add(read);
|
||||
}
|
||||
}
|
||||
|
||||
// skip this location -- it's not part of our engine intervals
|
||||
if ( outsideEngineIntervals(location) )
|
||||
continue;
|
||||
|
||||
// we've move across some interval boundary, restart profile
|
||||
final boolean flushProfile = ! activityProfile.isEmpty()
|
||||
&& ( activityProfile.getContigIndex() != location.getContigIndex()
|
||||
|| location.getStart() != activityProfile.getStop() + 1);
|
||||
sum = processActiveRegions(walker, sum, flushProfile, false);
|
||||
|
||||
dataProvider.getShard().getReadMetrics().incrementNumIterations();
|
||||
|
||||
// create reference context. Note that if we have a pileup of "extended events", the context will
|
||||
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
|
||||
final ReferenceContext refContext = referenceView.getReferenceContext(location);
|
||||
|
||||
// Iterate forward to get all reference ordered data covering this location
|
||||
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
|
||||
|
||||
// Call the walkers isActive function for this locus and add them to the list to be integrated later
|
||||
addIsActiveResult(walker, tracker, refContext, locus);
|
||||
|
||||
maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory);
|
||||
printProgress(location);
|
||||
}
|
||||
nanoScheduler.setDebug(false);
|
||||
final Iterator<ActiveRegion> activeRegionIterator = new ActiveRegionIterator(dataProvider);
|
||||
final TraverseActiveRegionMap myMap = new TraverseActiveRegionMap();
|
||||
final TraverseActiveRegionReduce myReduce = new TraverseActiveRegionReduce();
|
||||
final T result = nanoScheduler.execute(activeRegionIterator, myMap, sum, myReduce);
|
||||
|
||||
updateCumulativeMetrics(dataProvider.getShard());
|
||||
|
||||
return sum;
|
||||
return result;
|
||||
}
|
||||
|
||||
private class ActiveRegionIterator implements Iterator<ActiveRegion> {
|
||||
private final LocusShardDataProvider dataProvider;
|
||||
private LinkedList<ActiveRegion> readyActiveRegions = new LinkedList<ActiveRegion>();
|
||||
private boolean done = false;
|
||||
private final LocusView locusView;
|
||||
private final LocusReferenceView referenceView;
|
||||
private final ReferenceOrderedView referenceOrderedDataView;
|
||||
private final GenomeLoc locOfLastReadAtTraversalStart;
|
||||
|
||||
public ActiveRegionIterator( final LocusShardDataProvider dataProvider ) {
|
||||
this.dataProvider = dataProvider;
|
||||
locusView = new AllLocusView(dataProvider);
|
||||
referenceView = new LocusReferenceView( walker, dataProvider );
|
||||
referenceOrderedDataView = getReferenceOrderedView(walker, dataProvider, locusView);
|
||||
|
||||
// We keep processing while the next reference location is within the interval
|
||||
locOfLastReadAtTraversalStart = spanOfLastSeenRead();
|
||||
}
|
||||
|
||||
@Override public void remove() { throw new UnsupportedOperationException("Cannot remove from ActiveRegionIterator"); }
|
||||
|
||||
@Override
|
||||
public ActiveRegion next() {
|
||||
return readyActiveRegions.pop();
|
||||
}
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if ( ! readyActiveRegions.isEmpty() )
|
||||
return true;
|
||||
if ( done )
|
||||
return false;
|
||||
else {
|
||||
|
||||
while( locusView.hasNext() ) {
|
||||
final AlignmentContext locus = locusView.next();
|
||||
final GenomeLoc location = locus.getLocation();
|
||||
|
||||
rememberLastLocusLocation(location);
|
||||
|
||||
// get all of the new reads that appear in the current pileup, and them to our list of reads
|
||||
// provided we haven't seen them before
|
||||
final Collection<GATKSAMRecord> reads = locusView.getLIBS().transferReadsFromAllPreviousPileups();
|
||||
for( final GATKSAMRecord read : reads ) {
|
||||
// note that ActiveRegionShards span entire contigs, so this check is in some
|
||||
// sense no longer necessary, as any read that appeared in the last shard would now
|
||||
// by definition be on a different contig. However, the logic here doesn't hurt anything
|
||||
// and makes us robust should we decided to provide shards that don't fully span
|
||||
// contigs at some point in the future
|
||||
if ( ! appearedInLastShard(locOfLastReadAtTraversalStart, read) ) {
|
||||
rememberLastReadLocation(read);
|
||||
myReads.add(read);
|
||||
}
|
||||
}
|
||||
|
||||
// skip this location -- it's not part of our engine intervals
|
||||
if ( outsideEngineIntervals(location) )
|
||||
continue;
|
||||
|
||||
// we've move across some interval boundary, restart profile
|
||||
final boolean flushProfile = ! activityProfile.isEmpty()
|
||||
&& ( activityProfile.getContigIndex() != location.getContigIndex()
|
||||
|| location.getStart() != activityProfile.getStop() + 1);
|
||||
final List<ActiveRegion> newActiveRegions = prepActiveRegionsForProcessing(walker, flushProfile, false);
|
||||
|
||||
dataProvider.getShard().getReadMetrics().incrementNumIterations();
|
||||
|
||||
// create reference context. Note that if we have a pileup of "extended events", the context will
|
||||
// hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup).
|
||||
final ReferenceContext refContext = referenceView.getReferenceContext(location);
|
||||
|
||||
// Iterate forward to get all reference ordered data covering this location
|
||||
final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext);
|
||||
|
||||
// Call the walkers isActive function for this locus and add them to the list to be integrated later
|
||||
addIsActiveResult(walker, tracker, refContext, locus);
|
||||
|
||||
maxReadsInMemory = Math.max(myReads.size(), maxReadsInMemory);
|
||||
printProgress(location);
|
||||
|
||||
if ( ! newActiveRegions.isEmpty() ) {
|
||||
readyActiveRegions.addAll(newActiveRegions);
|
||||
if ( DEBUG )
|
||||
for ( final ActiveRegion region : newActiveRegions )
|
||||
logger.info("Adding region to queue for processing " + region);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -276,7 +362,11 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
* Ugly for now but will be cleaned up when we push this functionality more into the engine
|
||||
*/
|
||||
public T endTraversal(final Walker<M, T> walker, T sum) {
|
||||
return processActiveRegions((ActiveRegionWalker<M, T>)walker, sum, true, true);
|
||||
for ( final ActiveRegion region : prepActiveRegionsForProcessing((ActiveRegionWalker<M, T>)walker, true, true) ) {
|
||||
final M x = ((ActiveRegionWalker<M, T>) walker).map(region, null);
|
||||
sum = walker.reduce( x, sum );
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------
|
||||
|
|
@ -504,7 +594,7 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
* add these blocks of work to the work queue
|
||||
* band-pass filter the list of isActive probabilities and turn into active regions
|
||||
*/
|
||||
private T processActiveRegions(final ActiveRegionWalker<M, T> walker, T sum, final boolean flushActivityProfile, final boolean forceAllRegionsToBeActive) {
|
||||
private List<ActiveRegion> prepActiveRegionsForProcessing(final ActiveRegionWalker<M, T> walker, final boolean flushActivityProfile, final boolean forceAllRegionsToBeActive) {
|
||||
if ( ! walkerHasPresetRegions ) {
|
||||
// We don't have preset regions, so we get our regions from the activity profile
|
||||
final Collection<ActiveRegion> activeRegions = activityProfile.popReadyActiveRegions(getActiveRegionExtension(), getMinRegionSize(), getMaxRegionSize(), flushActivityProfile);
|
||||
|
|
@ -513,21 +603,23 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
}
|
||||
|
||||
// Since we've traversed sufficiently past this point (or this contig!) in the workQueue we can unload those regions and process them
|
||||
final LinkedList<ActiveRegion> readyRegions = new LinkedList<ActiveRegion>();
|
||||
while( workQueue.peek() != null ) {
|
||||
final ActiveRegion activeRegion = workQueue.peek();
|
||||
if ( forceAllRegionsToBeActive || regionCompletelyWithinDeadZone(activeRegion) ) {
|
||||
writeActivityProfile(activeRegion.getSupportingStates());
|
||||
writeActiveRegion(activeRegion);
|
||||
sum = processActiveRegion( workQueue.remove(), sum, walker );
|
||||
readyRegions.add(prepActiveRegionForProcessing(workQueue.remove(), walker));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return sum;
|
||||
return readyRegions;
|
||||
|
||||
}
|
||||
|
||||
private T processActiveRegion(final ActiveRegion activeRegion, final T sum, final ActiveRegionWalker<M, T> walker) {
|
||||
private ActiveRegion prepActiveRegionForProcessing(final ActiveRegion activeRegion, final ActiveRegionWalker<M, T> walker) {
|
||||
final List<GATKSAMRecord> stillLive = new LinkedList<GATKSAMRecord>();
|
||||
for ( final GATKSAMRecord read : myReads.popCurrentReads() ) {
|
||||
boolean killed = false;
|
||||
|
|
@ -561,7 +653,21 @@ public class TraverseActiveRegions<M, T> extends TraversalEngine<M,T,ActiveRegio
|
|||
logger.info(String.format("Processing region %20s span=%3d active?=%5b with %4d reads. Overall max reads carried is %s",
|
||||
activeRegion.getLocation(), activeRegion.getLocation().size(), activeRegion.isActive(), activeRegion.size(), maxReadsInMemory));
|
||||
|
||||
final M x = walker.map(activeRegion, null);
|
||||
return walker.reduce( x, sum );
|
||||
return activeRegion;
|
||||
}
|
||||
|
||||
private class TraverseActiveRegionMap implements NSMapFunction<ActiveRegion, M> {
|
||||
@Override
|
||||
public M apply(final ActiveRegion activeRegion) {
|
||||
if ( DEBUG ) logger.info("Executing walker.map for " + activeRegion + " in thread " + Thread.currentThread().getName());
|
||||
return walker.map(activeRegion, null);
|
||||
}
|
||||
}
|
||||
|
||||
private class TraverseActiveRegionReduce implements NSReduceFunction<M, T> {
|
||||
@Override
|
||||
public T apply(M one, T sum) {
|
||||
return walker.reduce(one, sum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Copyright (c) 2012 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.samtools.SAMFileHeader;
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import net.sf.samtools.SAMSequenceRecord;
|
||||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.*;
|
||||
|
||||
public class ActiveRegionShardBalancerUnitTest extends BaseTest {
|
||||
// example genome loc parser for this test, can be deleted if you don't use the reference
|
||||
private GenomeLocParser genomeLocParser;
|
||||
protected SAMDataSource readsDataSource;
|
||||
|
||||
@BeforeClass
|
||||
public void setup() throws FileNotFoundException {
|
||||
// sequence
|
||||
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(10, 0, 10000);
|
||||
genomeLocParser = new GenomeLocParser(header.getSequenceDictionary());
|
||||
readsDataSource = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingManyContigs() {
|
||||
executeTest(genomeLocParser.getContigs().getSequences());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingAllPointersOnSingleContig() {
|
||||
executeTest(Arrays.asList(genomeLocParser.getContigs().getSequences().get(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingMultipleDiscontinuousContigs() {
|
||||
final List<SAMSequenceRecord> all = genomeLocParser.getContigs().getSequences();
|
||||
executeTest(Arrays.asList(all.get(1), all.get(3)));
|
||||
}
|
||||
|
||||
private void executeTest(final Collection<SAMSequenceRecord> records) {
|
||||
final ActiveRegionShardBalancer balancer = new ActiveRegionShardBalancer();
|
||||
|
||||
final List<Set<GenomeLoc>> expectedLocs = new LinkedList<>();
|
||||
final List<FilePointer> pointers = new LinkedList<>();
|
||||
|
||||
for ( final SAMSequenceRecord record : records ) {
|
||||
final int size = 10;
|
||||
int end = 0;
|
||||
for ( int i = 0; i < record.getSequenceLength(); i += size) {
|
||||
final int myEnd = i + size - 1;
|
||||
end = myEnd;
|
||||
final GenomeLoc loc = genomeLocParser.createGenomeLoc(record.getSequenceName(), i, myEnd);
|
||||
final Map<SAMReaderID, SAMFileSpan> fileSpans = Collections.emptyMap();
|
||||
final FilePointer fp = new FilePointer(fileSpans, Collections.singletonList(loc));
|
||||
pointers.add(fp);
|
||||
}
|
||||
expectedLocs.add(Collections.singleton(genomeLocParser.createGenomeLoc(record.getSequenceName(), 0, end)));
|
||||
}
|
||||
|
||||
balancer.initialize(readsDataSource, pointers.iterator(), genomeLocParser);
|
||||
|
||||
int i = 0;
|
||||
int nShardsFound = 0;
|
||||
for ( final Shard shard : balancer ) {
|
||||
nShardsFound++;
|
||||
Assert.assertEquals(new HashSet<>(shard.getGenomeLocs()), expectedLocs.get(i++));
|
||||
}
|
||||
Assert.assertEquals(nShardsFound, records.size(), "Didn't find exactly one shard for each contig in the sequence dictionary");
|
||||
}
|
||||
}
|
||||
|
|
@ -77,7 +77,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
@DataProvider(name = "TraversalEngineProvider")
|
||||
public Object[][] makeTraversals() {
|
||||
final List<Object[]> traversals = new LinkedList<Object[]>();
|
||||
traversals.add(new Object[]{new TraverseActiveRegions<Integer, Integer>()});
|
||||
traversals.add(new Object[]{new TraverseActiveRegions<>()});
|
||||
return traversals.toArray(new Object[][]{});
|
||||
}
|
||||
|
||||
|
|
@ -490,7 +490,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
|
||||
traverseActiveRegions.initialize(engine, walker);
|
||||
List<LocusShardDataProvider> providers = new ArrayList<LocusShardDataProvider>();
|
||||
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {
|
||||
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ActiveRegionShardBalancer())) {
|
||||
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs(), samples)) {
|
||||
providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList<ReferenceOrderedDataSource>()));
|
||||
}
|
||||
|
|
@ -523,8 +523,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
|
||||
final int maxTests = Integer.MAX_VALUE;
|
||||
int nTests = 0;
|
||||
for ( final int readLength : Arrays.asList(10, 100) ) {
|
||||
for ( final int skips : Arrays.asList(0, 1, 10) ) {
|
||||
for ( final int readLength : Arrays.asList(100) ) {
|
||||
for ( final int skips : Arrays.asList(0, 10) ) {
|
||||
for ( final int start : starts ) {
|
||||
for ( final int nReadsPerLocus : Arrays.asList(1, 2) ) {
|
||||
for ( final int nLoci : Arrays.asList(1, 1000) ) {
|
||||
|
|
@ -536,7 +536,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
for ( final GenomeLocSortedSet activeRegions : enumerateActiveRegions(bamBuilder.getAlignmentStart(), bamBuilder.getAlignmentEnd())) {
|
||||
nTests++;
|
||||
if ( nTests < maxTests ) // && nTests == 1238 )
|
||||
tests.add(new Object[]{nTests, activeRegions, readStates, bamBuilder});
|
||||
tests.add(new Object[]{new TraverseActiveRegions<>(), nTests, activeRegions, readStates, bamBuilder});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -586,7 +586,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
|
||||
|
||||
@Test(enabled = true && ! DEBUG, dataProvider = "CombinatorialARTTilingProvider")
|
||||
public void testARTReadsInActiveRegions(final int id, final GenomeLocSortedSet activeRegions, final EnumSet<ActiveRegionReadState> readStates, final ArtificialBAMBuilder bamBuilder) {
|
||||
public void testARTReadsInActiveRegions(final TraverseActiveRegions<Integer, Integer> traversal, final int id, final GenomeLocSortedSet activeRegions, final EnumSet<ActiveRegionReadState> readStates, final ArtificialBAMBuilder bamBuilder) {
|
||||
logger.warn("Running testARTReadsInActiveRegions id=" + id + " locs " + activeRegions + " against bam " + bamBuilder);
|
||||
final List<GenomeLoc> intervals = Arrays.asList(
|
||||
genomeLocParser.createGenomeLoc("1", bamBuilder.getAlignmentStart(), bamBuilder.getAlignmentEnd())
|
||||
|
|
@ -595,7 +595,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
final DummyActiveRegionWalker walker = new DummyActiveRegionWalker(activeRegions, false);
|
||||
walker.setStates(readStates);
|
||||
|
||||
final TraverseActiveRegions traversal = new TraverseActiveRegions<Integer, Integer>();
|
||||
final Map<GenomeLoc, ActiveRegion> activeRegionsMap = getActiveRegions(traversal, walker, intervals, bamBuilder.makeTemporarilyBAMFile());
|
||||
|
||||
final Set<String> alreadySeenReads = new HashSet<String>(); // for use with the primary / non-primary
|
||||
|
|
@ -640,8 +639,8 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
//
|
||||
// ---------------------------------------------------------------------------------------------------------
|
||||
|
||||
@Test(enabled = true && ! DEBUG)
|
||||
public void ensureAllInsertionReadsAreInActiveRegions() {
|
||||
@Test(dataProvider = "TraversalEngineProvider", enabled = true && ! DEBUG)
|
||||
public void ensureAllInsertionReadsAreInActiveRegions(final TraverseActiveRegions<Integer, Integer> traversal) {
|
||||
|
||||
final int readLength = 10;
|
||||
final int start = 20;
|
||||
|
|
@ -667,7 +666,6 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
|
||||
final DummyActiveRegionWalker walker = new DummyActiveRegionWalker(activeRegions, false);
|
||||
|
||||
final TraverseActiveRegions traversal = new TraverseActiveRegions<Integer, Integer>();
|
||||
final Map<GenomeLoc, ActiveRegion> activeRegionsMap = getActiveRegions(traversal, walker, intervals, bamBuilder.makeTemporarilyBAMFile());
|
||||
|
||||
final ActiveRegion region = activeRegionsMap.values().iterator().next();
|
||||
|
|
|
|||
Loading…
Reference in New Issue