终于解决了最后的bug,并行结果与串行结果一致,与gatk一致。是index的search interval函数导致的,之前这个函数搜索的区间是包含大于等于start,而且大于等于end,正确的搜索应该是变异的end大于等于start

This commit is contained in:
zzh 2025-12-31 23:01:16 +08:00
parent 65878bbf96
commit 95c4a16151
4 changed files with 65 additions and 23 deletions

View File

@ -249,7 +249,7 @@ int SerialBQSR(AuxVar &aux) {
// 9. 计算这条read需要跳过的位置 // 9. 计算这条read需要跳过的位置
PROF_START(read_vcf); PROF_START(read_vcf);
RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, 0);
for (int ii = 0; ii < sd.read_len; ++ii) { for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
@ -317,7 +317,9 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
StableArray<double>&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; StableArray<double>&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors;
StableArray<uint8_t>& skips = aux.skips; // 该位置是否是已知位点 StableArray<uint8_t>& skips = aux.skips; // 该位置是否是已知位点
auto &bams = *aux.bamArr; auto &bams = *aux.bamArr;
if (steal) for (auto& vcf : aux.vcfArr) vcf.knownSites.clear(); if (steal)
for (auto& vcf : aux.vcfArr) vcf.knownSites.clear();
int f = tid * 4;
#if 1 #if 1
int startIdx = idx * aux.BAM_BLOCK_NUM; int startIdx = idx * aux.BAM_BLOCK_NUM;
int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size()); int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size());
@ -326,6 +328,7 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
int startIdx = idx * blockReadNums; int startIdx = idx * blockReadNums;
int stopIdx = std::min((size_t)(idx + 1) * blockReadNums, bams.size()); int stopIdx = std::min((size_t)(idx + 1) * blockReadNums, bams.size());
#endif #endif
// spdlog::info("tid {}, index {}, steal {}", tid, idx, steal);
aux.threadProcessedReads += stopIdx - startIdx; aux.threadProcessedReads += stopIdx - startIdx;
for (int i = startIdx; i < stopIdx; ++i) { for (int i = startIdx; i < stopIdx; ++i) {
// spdlog::info("Thread {} processing read idx: {}", tid, i); // spdlog::info("Thread {} processing read idx: {}", tid, i);
@ -344,7 +347,17 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
// PROF_END(gprof[GP_clip_read], clip_read); // PROF_END(gprof[GP_clip_read], clip_read);
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel);
#if 0
fprintf(gf[f + 0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 0], "%d ", isSNP[ii]);
fprintf(gf[f + 0], "\n");
fprintf(gf[f + 1], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 1], "%d ", isIns[ii]);
fprintf(gf[f + 1], "\n");
fprintf(gf[f + 2], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 2], "%d ", isDel[ii]);
fprintf(gf[f + 2], "\n");
#endif
bool baqCalculated = false; bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = BAQ::flatBAQArray(sd, baqArray); baqCalculated = BAQ::flatBAQArray(sd, baqArray);
@ -358,12 +371,19 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
// PROF_END(gprof[GP_covariate], covariate); // PROF_END(gprof[GP_covariate], covariate);
// PROF_START(read_vcf); // PROF_START(read_vcf);
RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, tid);
for (int ii = 0; ii < sd.read_len; ++ii) { for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] =
skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
} }
// PROF_GP_END(read_vcf); // PROF_GP_END(read_vcf);
#if 0
fprintf(gf[f + 3], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 3], "%d ", skips[ii] ? 1 : 0);
fprintf(gf[f + 3], "\n");
#endif
#if 0 #if 0
int fidx = 0 + 2 * tid; int fidx = 0 + 2 * tid;
//if (sd.rid % 2 == 0) fidx = 0 + 2 * tid; //if (sd.rid % 2 == 0) fidx = 0 + 2 * tid;
@ -446,7 +466,7 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable);
roundTableValues(recalTables); roundTableValues(recalTables);
printRecalTables(recalTables); // printRecalTables(recalTables);
// 量化质量分数 // 量化质量分数
QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);

View File

@ -15,6 +15,7 @@
#include "util/interval.h" #include "util/interval.h"
#include "util/vcf_parser.h" #include "util/vcf_parser.h"
#include "util/profiling.h" #include "util/profiling.h"
#include "util/debug.h"
struct RecalFuncs { struct RecalFuncs {
static constexpr int MAX_SITES_INTERVAL = 100000; static constexpr int MAX_SITES_INTERVAL = 100000;
@ -104,32 +105,39 @@ struct RecalFuncs {
} }
// 计算与read有交叉的已知位点信息 应该要判断一下是按照read的范围去读取vcf还是按照一个batch read的范围去读取 // 计算与read有交叉的已知位点信息 应该要判断一下是按照read的范围去读取vcf还是按照一个batch read的范围去读取
static void calculateKnownSites(SamData& sd, vector<VCFParser>& vcfs, sam_hdr_t* samHdr, StableArray<uint8_t>& knownSites) { static void calculateKnownSites(SamData& sd, vector<VCFParser>& vcfs, sam_hdr_t* samHdr, StableArray<uint8_t>& knownSites, int thid) {
BamWrap* bw = sd.bw; BamWrap* bw = sd.bw;
int tid = bw->contig_id(); int tid = bw->contig_id();
int64_t startPos = bw->start_pos(); // 闭区间 int64_t startPos = bw->start_pos(); // 闭区间使用clip之前的read匹配的范围
int64_t endPos = bw->end_pos(); // 闭区间 int64_t endPos = bw->end_pos(); // 闭区间
knownSites.resize_fill(sd.read_len, 0); knownSites.resize_fill(sd.read_len, 0);
// update vcfs // update vcfs
// int idx = 0;
for (auto& vcf : vcfs) { for (auto& vcf : vcfs) {
if (!vcf.knownSites.empty() && vcf.knownSites.back().right < startPos) // 为啥多线程环境会出现deque的front和[0]不一样?好像是调试的时候的问题,实际运行时没再出现
vcf.knownSites.clear(); // if (vcf.knownSites.front().left != vcf.knownSites[0].left || vcf.knownSites.front().right != vcf.knownSites[0].right)
// 清理旧的interval // spdlog::error("front is different with [0]: {} - {}", vcf.knownSites.front().left, vcf.knownSites[0].left);
while (!vcf.knownSites.empty()) {
auto& intv = vcf.knownSites.front(); if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) {// 此时vcf的区域包含bam不需要读取
if (intv.right < startPos) // 清理旧的interval只有此时才有清理的必要
vcf.knownSites.pop_front(); while (!vcf.knownSites.empty()) {
else auto& intv = vcf.knownSites.front();
break; if (intv.right < startPos)
} vcf.knownSites.pop_front();
if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) // 此时vcf的区域包含bam不需要读取 else
break;
}
continue; continue;
}
vcf.knownSites.clear(); // 清空因为后面会读入覆盖bam的所有vcf位点
// 读取新的interval // 读取新的interval
int64_t fpos, flen; int64_t fpos, flen;
endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos); endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos);
Interval readIntv(startPos, endPos); Interval readIntv(startPos, endPos);
vcf.index.SearchInterval(startPos, endPos, &fpos, &flen); vcf.index.SearchInterval(startPos, endPos, &fpos, &flen);
// fprintf(gf[thid * 2 + idx], "%s %d %ld %ld %ld\n", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid, fpos, flen);
if (flen > 0) { if (flen > 0) {
vcf.inStm.seekg(fpos, ios::beg); vcf.inStm.seekg(fpos, ios::beg);
if (flen > vcf.bufLen) { if (flen > vcf.bufLen) {
@ -149,22 +157,30 @@ struct RecalFuncs {
string id, ref; string id, ref;
ss_line >> stid >> pos >> id >> ref; ss_line >> stid >> pos >> id >> ref;
tid = sam_hdr_name2tid(samHdr, stid.c_str()); tid = sam_hdr_name2tid(samHdr, stid.c_str());
int64_t varStart = BamWrap::bam_global_pos(tid, pos); int64_t varStart = BamWrap::bam_global_pos(tid, pos - 1); // vcf中的pos是1-based程序中计算是0-based
Interval varIntv(varStart, varStart + ref.size() - 1); Interval varIntv(varStart, varStart + ref.size() - 1);
if (varIntv.overlaps(readIntv)) { if (varIntv.overlaps(readIntv)) {
vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间 vcf.knownSites.push_back(varIntv); // 闭区间
} }
get_line_from_buf(buf, flen, &cur, &line); get_line_from_buf(buf, flen, &cur, &line);
} }
} }
} }
// fprintf(gf[0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (auto& vcf : vcfs) { for (auto& vcf : vcfs) {
for (auto& intv : vcf.knownSites) { for (auto& intv : vcf.knownSites) {
// knownSite is outside clipping window for the read, ignore // knownSite is outside clipping window for the read, ignore
#if 1
if (intv.right < sd.softStart()) if (intv.right < sd.softStart())
continue; continue;
if (intv.left > sd.softEnd()) if (intv.left > sd.softEnd())
break; break;
#else
if (intv.right < sd.softStart() || intv.left > sd.softEnd())
continue;
#endif
//fprintf(gf[0], "%ld %ld ", intv.left, intv.right);
// 计算对应的位点 // 计算对应的位点
ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.contigLeft()); ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.contigLeft());
int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx; int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx;
@ -180,7 +196,9 @@ struct RecalFuncs {
knownSites[i] = true; knownSites[i] = true;
} }
} }
//idx += 1;
} }
//fprintf(gf[0], "\n");
} }
// 应该是计算一段数据的平均值 // 应该是计算一段数据的平均值

View File

@ -20,7 +20,7 @@
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
#define DEBUG_FILE_NUM 5 #define DEBUG_FILE_NUM 8
extern FILE* gf[DEBUG_FILE_NUM]; extern FILE* gf[DEBUG_FILE_NUM];

View File

@ -30,7 +30,11 @@ void LinearIndex::SearchInterval(int64_t start, int64_t end, int64_t* file_pos,
} }
int sb_idx, eb_idx; int sb_idx, eb_idx;
if (stid == stid_origin) { if (stid == stid_origin) {
sb_idx = spos / idx_[stid].binWidth; // 下面修改的这两行代码与原版不同的,是查找所有包含[spos, epos]的变异信息
spos -= idx_[stid].longestFeature; // 保证起始位置小于spos但是结束位置大于spos的变异包含进来
spos = spos >= 0 ? spos : 0;
sb_idx = spos / idx_[stid].binWidth; // 找到spos对应的桶
sb_idx = sb_idx >= 0 ? sb_idx : 0;
if (sb_idx >= idx_[stid].size()) { // 开始区域没有对应的block if (sb_idx >= idx_[stid].size()) { // 开始区域没有对应的block
sb_idx = idx_[stid].size() - 1; sb_idx = idx_[stid].size() - 1;
Block& sb = idx_[stid][sb_idx]; Block& sb = idx_[stid][sb_idx];