RBP now supports operating on a subset of samples, outputting a much reduced VCF file appropriate for merging later. Also, general optimization to avoid printing enormous amounts of data to logger.debug by using a glocal static variable DEBUG that conditionally allows writing to the variable. Passes integration tests

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4880 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2010-12-20 16:03:28 +00:00
parent 4ca1da1d07
commit ff90c24f28
1 changed files with 57 additions and 39 deletions

View File

@ -63,6 +63,7 @@ import static org.broadinstitute.sting.utils.vcf.VCFUtils.getVCFHeadersFromRods;
// Filter out all reads with zero mapping quality
public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, PhasingStats> {
private static final boolean DEBUG = false;
@Output(doc = "File to which variants should be written", required = true)
protected VCFWriter writer = null;
@ -87,6 +88,9 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
@Argument(fullName = "min_mapping_quality_score", shortName = "mmq", doc = "Minimum read mapping quality required to consider a read for phasing [default: 20]", required = false)
public int MIN_MAPPING_QUALITY_SCORE = 20;
@Argument(fullName = "sampleToPhase", shortName = "sampleToPhase", doc = "Only include these samples when phasing", required = false)
protected List<String> samplesToPhase = null;
private GenomeLoc mostDownstreamLocusReached = null;
private LinkedList<VariantAndReads> unphasedSiteQueue = null;
@ -170,6 +174,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
hInfo.add(new VCFFormatHeaderLine(PQ_KEY, 1, VCFHeaderLineType.Float, "Read-backed phasing quality"));
hInfo.add(new VCFInfoHeaderLine(PHASING_INCONSISTENT_KEY, 0, VCFHeaderLineType.Flag, "Are the reads significantly haplotype-inconsistent?"));
// todo -- fix samplesToPhase
Map<String, VCFHeader> rodNameToHeader = getVCFHeadersFromRods(getToolkit(), rodNames);
writer.writeHeader(new VCFHeader(hInfo, new TreeSet<String>(rodNameToHeader.get(rodNames.get(0)).getGenotypeSamples())));
}
@ -195,8 +200,9 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
if (tracker == null)
return null;
// todo -- potential performance problem here with unconditional, frequent writes to debug
mostDownstreamLocusReached = ref.getLocus();
logger.debug("map() at: " + mostDownstreamLocusReached);
if ( DEBUG ) logger.debug("map() at: " + mostDownstreamLocusReached);
PhasingStats phaseStats = new PhasingStats();
List<VariantContext> unprocessedList = new LinkedList<VariantContext>();
@ -204,10 +210,14 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
boolean requireStartHere = true; // only see each VariantContext once
boolean takeFirstOnly = false; // take as many entries as the VCF file has
for (VariantContext vc : tracker.getVariantContexts(ref, rodNames, null, context.getLocation(), requireStartHere, takeFirstOnly)) {
if ( samplesToPhase != null ) vc = reduceVCToSamples(vc, samplesToPhase);
if (ReadBackedPhasingWalker.processVariantInPhasing(vc)) {
VariantAndReads vr = new VariantAndReads(vc, context);
unphasedSiteQueue.add(vr);
logger.debug("Added variant to queue = " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vr.variant));
// todo -- potential performance problem here with unconditional, frequent writes to debug
if ( DEBUG ) logger.debug("Added variant to queue = " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vr.variant));
}
else {
unprocessedList.add(vc); // Finished with the unprocessed variant, and writer can enforce sorting on-the-fly
@ -230,6 +240,15 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
return new PhasingStatsAndOutput(phaseStats, completedList);
}
private VariantContext reduceVCToSamples(VariantContext vc, List<String> samplesToPhase) {
for ( String sample : samplesToPhase )
logger.debug(String.format(" Sample %s has genotype %s, het = %s", sample, vc.getGenotype(sample), vc.getGenotype(sample).isHet() ));
VariantContext subvc = vc.subContextFromGenotypes(vc.getGenotypes(samplesToPhase).values());
logger.debug("original VC = " + vc);
logger.debug("sub VC = " + subvc);
return subvc;
}
private List<VariantContext> processQueue(PhasingStats phaseStats, boolean processAll) {
List<VariantContext> oldPhasedList = new LinkedList<VariantContext>();
@ -247,16 +266,16 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
}
// Update partiallyPhasedSites before it's used in phaseSite:
oldPhasedList.addAll(discardIrrelevantPhasedSites());
logger.debug("oldPhasedList(1st) = " + toStringVCL(oldPhasedList));
if ( DEBUG ) logger.debug("oldPhasedList(1st) = " + toStringVCL(oldPhasedList));
VariantAndReads vr = unphasedSiteQueue.remove();
logger.debug("Performing phasing for " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vr.variant));
if ( DEBUG ) logger.debug("Performing phasing for " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vr.variant));
phaseSite(vr, phaseStats);
}
// Update partiallyPhasedSites after phaseSite is done:
oldPhasedList.addAll(discardIrrelevantPhasedSites());
logger.debug("oldPhasedList(2nd) = " + toStringVCL(oldPhasedList));
if ( DEBUG ) logger.debug("oldPhasedList(2nd) = " + toStringVCL(oldPhasedList));
if (outputMultipleBaseCountsWriter != null)
outputMultipleBaseCountsWriter.outputMultipleBaseCounts();
@ -306,7 +325,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
String samp = sampGtEntry.getKey();
Genotype gt = sampGtEntry.getValue();
logger.debug("sample = " + samp);
if ( DEBUG ) logger.debug("sample = " + samp);
if (isUnfilteredCalledDiploidGenotype(gt)) {
if (gt.isHom()) { // Note that this Genotype may be replaced later to contain the PQ of a downstream het site that was phased relative to a het site lying upstream of this hom site:
// true <-> can trivially phase a hom site relative to ANY previous site:
@ -317,7 +336,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
PhasingWindow phaseWindow = new PhasingWindow(vr, samp);
if (phaseWindow.hasPreviousHets()) { // Otherwise, nothing to phase this against
SNPallelePair allelePair = new SNPallelePair(gt);
logger.debug("Want to phase TOP vs. BOTTOM for: " + "\n" + allelePair);
if ( DEBUG ) logger.debug("Want to phase TOP vs. BOTTOM for: " + "\n" + allelePair);
DoublyLinkedList.BidirectionalIterator<UnfinishedVariantAndReads> prevHetAndInteriorIt = phaseWindow.prevHetAndInteriorIt;
/* Notes:
@ -331,15 +350,15 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
boolean genotypesArePhased = passesPhasingThreshold(pr.phaseQuality);
if (pr.phasingContainsInconsistencies) {
logger.debug("MORE than " + (MAX_FRACTION_OF_INCONSISTENT_READS * 100) + "% of the reads are inconsistent for phasing of " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vc));
if ( DEBUG ) logger.debug("MORE than " + (MAX_FRACTION_OF_INCONSISTENT_READS * 100) + "% of the reads are inconsistent for phasing of " + VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vc));
uvc.setPhasingInconsistent();
}
if (genotypesArePhased) {
SNPallelePair prevAllelePair = new SNPallelePair(prevHetGenotype);
logger.debug("THE PHASE PREVIOUSLY CHOSEN FOR PREVIOUS:\n" + prevAllelePair + "\n");
logger.debug("THE PHASE CHOSEN HERE:\n" + allelePair + "\n\n");
if ( DEBUG ) logger.debug("THE PHASE PREVIOUSLY CHOSEN FOR PREVIOUS:\n" + prevAllelePair + "\n");
if ( DEBUG ) logger.debug("THE PHASE CHOSEN HERE:\n" + allelePair + "\n\n");
ensurePhasing(allelePair, prevAllelePair, pr.haplotype);
Map<String, Object> gtAttribs = new HashMap<String, Object>(gt.getAttributes());
@ -439,7 +458,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
else if (gt.isHet()) {
GenotypeAndReadBases grb = new GenotypeAndReadBases(gt, phasedVr.sampleReadBases.get(sample), phasedVr.unfinishedVariant.getLocation());
listHetGenotypes.add(grb);
logger.debug("Using UPSTREAM het site = " + grb.loc);
if ( DEBUG ) logger.debug("Using UPSTREAM het site = " + grb.loc);
prevHetAndInteriorIt = phasedIt.clone();
}
}
@ -455,7 +474,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
GenomeLoc phaseLocus = VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),vr.variant);
GenotypeAndReadBases grbPhase = new GenotypeAndReadBases(vr.variant.getGenotype(sample), vr.sampleReadBases.get(sample), phaseLocus);
listHetGenotypes.add(grbPhase);
logger.debug("PHASING het site = " + grbPhase.loc + " [phasingSiteIndex = " + phasingSiteIndex + "]");
if ( DEBUG ) logger.debug("PHASING het site = " + grbPhase.loc + " [phasingSiteIndex = " + phasingSiteIndex + "]");
// Include as-of-yet unphased sites in the phasing computation:
for (VariantAndReads nextVr : unphasedSiteQueue) {
@ -468,7 +487,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
else if (gt.isHet()) {
GenotypeAndReadBases grb = new GenotypeAndReadBases(gt, nextVr.sampleReadBases.get(sample), VariantContextUtils.getLocation(getToolkit().getGenomeLocParser(),nextVr.variant));
listHetGenotypes.add(grb);
logger.debug("Using DOWNSTREAM het site = " + grb.loc);
if ( DEBUG ) logger.debug("Using DOWNSTREAM het site = " + grb.loc);
}
}
@ -496,7 +515,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
buildReadsAtHetSites(listHetGenotypes, onlyKeepReads);
// Copy to a fixed-size array:
logger.debug("FINAL phasing window of " + listHetGenotypes.size() + " sites:\n" + toStringGRL(listHetGenotypes));
if ( DEBUG ) logger.debug("FINAL phasing window of " + listHetGenotypes.size() + " sites:\n" + toStringGRL(listHetGenotypes));
hetGenotypes = new Genotype[listHetGenotypes.size()];
int index = 0;
for (GenotypeAndReadBases copyGrb : listHetGenotypes)
@ -539,9 +558,9 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
}
index++;
}
logger.debug("Number of sites in window = " + index);
if ( DEBUG ) logger.debug("Number of sites in window = " + index);
if (logger.isDebugEnabled()) {
if ( DEBUG && logger.isDebugEnabled()) {
logger.debug("ALL READS [phasingSiteIndex = " + phasingSiteIndex + "]:");
for (Map.Entry<String, Read> nameToReads : readsAtHetSites.entrySet()) {
String rdName = nameToReads.getKey();
@ -610,7 +629,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
for (int i = 0; i < siteInds.length; i++) {
for (int j = i + 1; j < siteInds.length; j++) {
GraphEdge e = new GraphEdge(siteInds[i], siteInds[j]);
logger.debug("Read = " + rdName + " is adding edge: " + e);
if ( DEBUG ) logger.debug("Read = " + rdName + " is adding edge: " + e);
readGraph.addEdge(e);
edgeToReads.addRead(e, rdName);
@ -620,7 +639,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
}
}
}
logger.debug("Read graph:\n" + readGraph);
if ( DEBUG ) logger.debug("Read graph:\n" + readGraph);
Set<String> keepReads = new HashSet<String>();
/* Check which Reads are involved in acyclic paths from (phasingSiteIndex - 1) to (phasingSiteIndex):
@ -673,7 +692,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
int cur = phasingSiteIndex;
if (!readGraph.getConnectedComponents().inSameSet(prev, cur)) { // There is NO path between cur and prev
logger.debug("NO READ PATH between PHASE site [" + cur + "] and UPSTREAM site [" + prev + "]");
if ( DEBUG ) logger.debug("NO READ PATH between PHASE site [" + cur + "] and UPSTREAM site [" + prev + "]");
readsAtHetSites.clear();
return keepReads;
}
@ -684,7 +703,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
IntegerSet[] removedSiteSameCCAsPrev = new IntegerSet[numHetSites];
IntegerSet[] removedSiteSameCCAsCur = new IntegerSet[numHetSites];
for (int i : sitesWithEdges) {
logger.debug("Calculating CC after removing edges of site: " + i);
if ( DEBUG ) logger.debug("Calculating CC after removing edges of site: " + i);
// Remove all edges incident to i and see which positions have paths to prev and cur:
Collection<GraphEdge> removedEdges = readGraph.removeAllIncidentEdges(i);
@ -694,15 +713,15 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
removedSiteSameCCAsPrev[i] = new IntegerSet(ccAfterRemove.inSameSetAs(prev, sitesWithEdges));
removedSiteSameCCAsCur[i] = new IntegerSet(ccAfterRemove.inSameSetAs(cur, sitesWithEdges));
logger.debug("Same CC as previous [" + prev + "]: " + removedSiteSameCCAsPrev[i]);
logger.debug("Same CC as current [" + cur + "]: " + removedSiteSameCCAsCur[i]);
if ( DEBUG ) logger.debug("Same CC as previous [" + prev + "]: " + removedSiteSameCCAsPrev[i]);
if ( DEBUG ) logger.debug("Same CC as current [" + cur + "]: " + removedSiteSameCCAsCur[i]);
// Add the removed edges back in:
readGraph.addEdges(removedEdges);
}
for (GraphEdge e : readGraph) {
logger.debug("Testing the path-connectivity of Edge: " + e);
if ( DEBUG ) logger.debug("Testing the path-connectivity of Edge: " + e);
/* Edge e={v1,v2} contributes a path between prev and cur for testRead iff:
testRead[v1] != null, testRead[v2] != null, and there is a path from prev ---> v1 -> v2 ---> cur [or vice versa].
@ -716,7 +735,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
for (String readName : edgeToReads.getReads(e)) {
keepReads.add(readName);
if (logger.isDebugEnabled()) {
if (DEBUG && logger.isDebugEnabled()) {
if (prevTo2and1ToCur)
logger.debug("Keep read " + readName + " due to path: " + prev + " ---> " + e.getV2() + " -> " + e.getV1() + " ---> " + cur);
else
@ -733,7 +752,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
String rdName = nameToReads.getKey();
if (!keepReads.contains(rdName)) {
readIt.remove();
logger.debug("Removing extraneous read: " + rdName);
if ( DEBUG ) logger.debug("Removing extraneous read: " + rdName);
}
}
@ -754,13 +773,13 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
int numPrecedingRemoved = 0;
for (GenotypeAndReadBases grb : listHetGenotypes) {
boolean keepSite = sitesWithReads.contains(index);
if (logger.isDebugEnabled() && !keepSite)
if (DEBUG && logger.isDebugEnabled() && !keepSite)
logger.debug("Removing read-less site " + grb.loc);
if (keepSite || index == phasingSiteIndex || index == phasingSiteIndex - 1) {
keepHetSites.add(grb);
if (!keepSite)
logger.debug("Although current or previous sites have no relevant reads, continuing empty attempt to phase them [for sake of program flow]...");
if ( DEBUG ) logger.debug("Although current or previous sites have no relevant reads, continuing empty attempt to phase them [for sake of program flow]...");
}
else if (index <= phasingSiteIndex)
numPrecedingRemoved++;
@ -809,8 +828,8 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
HaplotypeTableCreator tabCreator = new TableCreatorOfHaplotypeAndComplementForDiploidAlleles(phaseWindow.hetGenotypes, phaseWindow.phasingSiteIndex - 1, 2);
PhasingTable sampleHaps = tabCreator.getNewTable();
logger.debug("Number of USED reads [connecting the two positions to be phased] at sites: " + phaseWindow.readsAtHetSites.size());
if (logger.isDebugEnabled()) {
if (DEBUG && logger.isDebugEnabled()) {
logger.debug("Number of USED reads [connecting the two positions to be phased] at sites: " + phaseWindow.readsAtHetSites.size());
logger.debug("USED READS:");
for (Map.Entry<String, Read> nameToReads : phaseWindow.readsAtHetSites.entrySet()) {
String rdName = nameToReads.getKey();
@ -830,17 +849,17 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
for (Map.Entry<String, Read> nameToReads : phaseWindow.readsAtHetSites.entrySet()) {
Read rd = nameToReads.getValue();
logger.debug("\nrd = " + rd + "\tname = " + nameToReads.getKey());
if ( DEBUG ) logger.debug("\nrd = " + rd + "\tname = " + nameToReads.getKey());
for (PhasingTable.PhasingTableEntry pte : sampleHaps) {
PhasingScore score = rd.matchHaplotypeClassScore(pte.getHaplotypeClass());
pte.getScore().integrateReadScore(score);
logger.debug("score(" + rd + ", " + pte.getHaplotypeClass() + ") = " + score);
if ( DEBUG ) logger.debug("score(" + rd + ", " + pte.getHaplotypeClass() + ") = " + score);
}
// Check the current best haplotype assignment and compare it to the previous one:
MaxHaplotypeAndQuality curMaxHapAndQual = new MaxHaplotypeAndQuality(sampleHaps, false);
logger.debug("CUR MAX hap:\t" + curMaxHapAndQual.maxEntry.getHaplotypeClass() + "\tcurPhaseQuality:\t" + curMaxHapAndQual.phaseQuality);
if ( DEBUG ) logger.debug("CUR MAX hap:\t" + curMaxHapAndQual.maxEntry.getHaplotypeClass() + "\tcurPhaseQuality:\t" + curMaxHapAndQual.phaseQuality);
if (prevMaxHapAndQual != null) {
double changeInPQ = prevMaxHapAndQual.phaseQuality - curMaxHapAndQual.phaseQuality;
@ -848,7 +867,7 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
numHighQualityIterations++;
if (!curMaxHapAndQual.hasSameRepresentativeHaplotype(prevMaxHapAndQual) || // switched phase
(numPQchangesObserved > 0 && changeInPQ > FRACTION_OF_MEAN_PQ_CHANGES * (totalAbsPQchange / numPQchangesObserved))) { // a "significant" decrease in PQ
logger.debug("Inconsistent read found!");
if ( DEBUG ) logger.debug("Inconsistent read found!");
numInconsistentIterations++;
}
}
@ -859,12 +878,12 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
prevMaxHapAndQual = curMaxHapAndQual;
}
logger.debug("\nPhasing table [AFTER CALCULATION]:\n" + sampleHaps + "\n");
if ( DEBUG ) logger.debug("\nPhasing table [AFTER CALCULATION]:\n" + sampleHaps + "\n");
MaxHaplotypeAndQuality maxHapQual = new MaxHaplotypeAndQuality(sampleHaps, true);
double posteriorProb = maxHapQual.maxEntry.getScore().getValue();
logger.debug("MAX hap:\t" + maxHapQual.maxEntry.getHaplotypeClass() + "\tposteriorProb:\t" + posteriorProb + "\tphaseQuality:\t" + maxHapQual.phaseQuality);
logger.debug("Number of used reads " + phaseWindow.readsAtHetSites.size() + "; number of high PQ iterations " + numHighQualityIterations + "; number of inconsistencies " + numInconsistentIterations);
if ( DEBUG ) logger.debug("MAX hap:\t" + maxHapQual.maxEntry.getHaplotypeClass() + "\tposteriorProb:\t" + posteriorProb + "\tphaseQuality:\t" + maxHapQual.phaseQuality);
if ( DEBUG ) logger.debug("Number of used reads " + phaseWindow.readsAtHetSites.size() + "; number of high PQ iterations " + numHighQualityIterations + "; number of inconsistencies " + numInconsistentIterations);
boolean phasingContainsInconsistencies = false;
if (numInconsistentIterations / (double) numHighQualityIterations > MAX_FRACTION_OF_INCONSISTENT_READS)
@ -957,8 +976,6 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
* @param result the number of reads and VariantContexts seen.
*/
public void onTraversalDone(PhasingStats result) {
logger.debug("Entering onTraversalDone()");
List<VariantContext> finalList = processQueue(result, true); // process all remaining data
writeVcList(finalList);
writer.close();
@ -989,7 +1006,8 @@ public class ReadBackedPhasingWalker extends RodWalker<PhasingStatsAndOutput, Ph
}
private void writeVCF(VariantContext vc) {
WriteVCF.writeVCF(vc, writer, logger);
if ( samplesToPhase == null || vc.isVariant() ) // if we are only operating on specific samples, don't write out all sites, just those where the VC is variant
WriteVCF.writeVCF(vc, writer, logger);
}
public static boolean processVariantInPhasing(VariantContext vc) {