FastBQSR/src/bqsr/bqsr_entry.cpp

1059 lines
45 KiB
C++
Raw Normal View History

2025-11-23 23:03:37 +08:00
/*
Description:
bambambam
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/10/23
*/
#include <header.h>
#include <htslib/faidx.h>
#include <htslib/kstring.h>
2025-11-23 23:03:37 +08:00
#include <htslib/sam.h>
2025-12-04 22:26:13 +08:00
#include <htslib/synced_bcf_reader.h>
2025-11-23 23:03:37 +08:00
#include <htslib/thread_pool.h>
#include <spdlog/spdlog.h>
#include <iomanip>
#include <numeric>
#include <queue>
#include <vector>
2025-11-23 23:03:37 +08:00
#include "baq.h"
2025-11-23 23:03:37 +08:00
#include "bqsr_args.h"
#include "bqsr_funcs.h"
#include "bqsr_pipeline.h"
#include "covariate.h"
2025-12-04 22:26:13 +08:00
#include "dup_metrics.h"
#include "fastbqsr_version.h"
2025-11-23 23:03:37 +08:00
#include "read_name_parser.h"
#include "read_recal_info.h"
#include "recal_datum.h"
#include "recal_tables.h"
#include "recal_utils.h"
#include "util/interval.h"
#include "util/linear_index.h"
2025-11-23 23:03:37 +08:00
#include "util/profiling.h"
2025-12-04 22:26:13 +08:00
#include "util/utils.h"
#include "util/math/math_utils.h"
#include "quant_info.h"
#include "util/debug.h"
using std::deque;
2025-11-23 23:03:37 +08:00
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
#define MAX_SITES_INTERVAL 100000
const uint8_t NO_BAQ_UNCERTAINTY = (uint8_t)'@';
2025-11-23 23:03:37 +08:00
const char cBaseToChar[16] = {'N', 'A', 'C', 'N', 'G', 'N', 'N', 'N', 'T', 'N', 'N', 'N', 'N', 'N', 'N', 'N'};
// 解析knownSites
struct VCFParser {
deque<Interval> knownSites; // 已知的变异位点
char* buf = nullptr; // // 数据buffer
uint32_t bufLen = 4 * 1024; // 数据buffer长度
LinearIndex index; // vcf文件索引
ifstream inStm; // vcf文件流
VCFParser() { Init(); }
VCFParser(const string& vcfFileName) { Init(vcfFileName); }
VCFParser(const string& vcfFileName, sam_hdr_t* samHeader) { Init(vcfFileName, samHeader); }
void Init() { buf = (char*)malloc(bufLen); }
void Init(const string& vcfFileName) {
Init();
inStm.open(vcfFileName, ifstream::in);
string idxFileName = vcfFileName + ".idx";
if (!index.ReadIndex(idxFileName))
error("[%s] fail to load the %s index file\n", __func__, idxFileName.c_str());
}
void Init(const string& vcfFileName, sam_hdr_t *samHeader) {
index.SetHeader(samHeader);
Init(vcfFileName);
}
};
// 解析后的一些参数,文件,数据等
struct AuxVar {
const static int REF_CONTEXT_PAD = 3; // 需要做一些填充
const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列防止边界效应
sam_hdr_t* header = nullptr; // bam header
faidx_t* faidx = nullptr; // reference index
char* ref_seq = nullptr; // reference sequence
int ref_len = 0; // reference sequence length
int offset = 0; // 在要求的ref序列两边多余取出的碱基数量
vector<VCFParser> vcfArr; // 从vcf中获取已知位点
};
2025-11-23 23:03:37 +08:00
namespace nsgv {
// 全局变量 for bqsr
BQSRArg gBqsrArg; // bqsr arguments
samFile* gInBamFp; // input BAM file pointer
sam_hdr_t* gInBamHeader; // input BAM header
vector<AuxVar> gAuxVars; // auxiliary variables保存一些文件数据等每个线程对应一个
RecalTables gRecalTables; // 记录bqsr所有的数据输出table结果
vector<EventTypeValue> gEventTypes; // 需要真正计算的eventtype
2025-12-04 22:26:13 +08:00
// 下面是需要删除或修改的变量
2025-11-23 23:03:37 +08:00
std::vector<ReadNameParser> gNameParsers; // read name parser
2025-12-04 22:26:13 +08:00
DuplicationMetrics gMetrics; //
DupResult gDupRes;
PipelineArg gPipe(&gDupRes);
2025-11-23 23:03:37 +08:00
samFile *gOutBamFp; // , sambam
sam_hdr_t *gOutBamHeader; // header
2025-12-04 22:26:13 +08:00
vector <bcf_srs_t*> gKnownSitesVcfSrs; // known sites vcf srs
2025-11-23 23:03:37 +08:00
}; // namespace nsgv
//
struct ByteBuf {
uint8_t *buf = nullptr;
int size = 0; //
int capacity = 0; //
};
// 读进来的这一批bam总共占了几个染色体这个方案不行读取太多没必要
// 开区间
struct Region {
int64_t start;
int64_t end;
};
2025-11-23 23:03:37 +08:00
/*
*
*/
static string getFileExtension(const string &filename) {
auto last_dot = filename.find_last_of('.');
if (last_dot == string::npos) {
return "";
}
return filename.substr(last_dot + 1);
}
// 过滤掉bqsr过程不符合要求的bam数据
bool bqsrReadFilterOut(const bam1_t *b) {
// 过滤掉unmapped的read
if (b->core.qual == 0) // mapping quality 0
return true;
if (b->core.qual == 255) // mapping quality not available
return true;
if (b->core.flag & BAM_FUNMAP || b->core.tid == -1 || b->core.pos == -1) { // unmapped
return true;
}
if (b->core.flag & BAM_FSECONDARY) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FDUP) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FQCFAIL) { // Not passing quality controls
return true;
}
return false;
}
// 该操作符是否消耗read的碱基
bool consumeReadBases(char cigar) {
return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'I' || cigar == 'S';
}
// 该操作符是否消耗参考基因组的碱基
bool consumeRefBases(char cigar) {
return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'D' || cigar == 'N';
}
// 给定一个ref位置在read内部找到对应的位置和操作符
struct PosAndOperator {
int readPosAtRefCoord = -1; // read中的位置
char cigarOperator = '0'; // cigar操作符
int cigarIndex = -1; // cigar索引
int cigarLen = 0;
int preCigarLen = 0; // 截止cigar之前的消耗read base的长度
};
/**
* Find the 0-based index within a read base array corresponding to a given 0-based position in the reference, along with the cigar operator of
* the element containing that base. If the reference coordinate occurs within a deletion, the first index after the deletion is returned.
* Note that this treats soft-clipped bases as if they align with the reference, which is useful for hard-clipping reads with soft clips.
*
* @param alignmentStart The soft start of the read on the reference
* @param cigar The read's cigar
* @param refCoord The target reference coordinate
* @return If the reference coordinate occurs before the read start or after the read end {@code CLIPPING_GOAL_NOT_REACHED};
* if the reference coordinate falls within an alignment block of the read's cigar, the corresponding read coordinate;
* if the reference coordinate falls within a deletion, the first read coordinate after the deletion. Note: if the last
* cigar element is a deletion (which isn't meaningful), it returns {@code CLIPPING_GOAL_NOT_REACHED}.
*/
PosAndOperator getReadIndexForReferenceCoordinate(BamWrap *bw, int alignmentStart, int refCoord) {
PosAndOperator po;
if (refCoord < alignmentStart) {
return po;
}
int firstReadPosOfElement = 0; // inclusive
int firstRefPosOfElement = alignmentStart; // inclusive
int lastReadPosOfElement = 0; // exclusive
int lastRefPosOfElement = alignmentStart; // exclusive
// advance forward through all the cigar elements until we bracket the reference coordinate
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
const int idx = bc.n_cigar - 1;
if (idx < 0)
return po;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
firstReadPosOfElement = lastReadPosOfElement;
firstRefPosOfElement = lastRefPosOfElement;
lastReadPosOfElement += consumeReadBases(c) ? len : 0;
lastRefPosOfElement += (consumeRefBases(c) || c == 'S') ? len : 0;
if (firstRefPosOfElement <= refCoord && refCoord < lastRefPosOfElement) { // refCoord falls within this cigar element
int readPosAtRefCoord = firstReadPosOfElement + (consumeReadBases(c) ? (refCoord - firstRefPosOfElement) : 0);
return PosAndOperator{readPosAtRefCoord, c, i, len, firstReadPosOfElement};
}
}
return po;
}
// 根据adapter位置对read进行hardclip返回左侧或右侧减掉的base数量
void clipByReferenceCoordinates(BamWrap *bw, int refStart, int refStop, SamData &sd) {
int start, stop;
// Determine the read coordinate to start and stop hard clipping
if (refStart < 0) {
if (refStop < 0) return;
PosAndOperator stopPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStop);
// if the refStop falls in a deletion, the above method returns the position after the deletion. Since the stop we return here
// is inclusive, we decrement the stop to avoid overclipping by one base. As a result we do not clip the deletion, which is fine.
stop = stopPosAndOperator.readPosAtRefCoord - (consumeReadBases(stopPosAndOperator.cigarOperator) ? 0 : 1);
sd.left_clip = stop + 1;
sd.cigar_start = stopPosAndOperator.cigarIndex;
sd.first_cigar_clip = sd.left_clip - stopPosAndOperator.preCigarLen;
} else {
if (refStop >= 0) return;
// unlike the above case where we clip the start fo the read, here we clip the end and returning the base to the right of a deletion avoids
// overclipping
PosAndOperator startPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStart);
start = startPosAndOperator.readPosAtRefCoord;
sd.right_clip = bw->b->core.l_qseq - start;
sd.cigar_end = startPosAndOperator.cigarIndex + 1;
sd.last_cigar_clip = startPosAndOperator.preCigarLen + startPosAndOperator.cigarLen - start;
}
}
// 计算切掉adapter之后ref相对原始ref的偏移量
void calculateRefOffset(BamWrap *bw, SamData &sd) {
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int i = 0;
for (i = 0; i < sd.cigar_start; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (consumeRefBases(c)) {
sd.ref_offset += len;
}
}
const char c = bam_cigar_opchr(cigar[i]);
if (consumeRefBases(c)) {
sd.ref_offset += sd.first_cigar_clip;
}
}
// 计算clip处理之后剩余的碱基
void calculateReadBases(BamWrap* bw, SamData& sd) {
sd.bases.resize(sd.read_len);
sd.base_quals.resize(sd.read_len);
uint8_t* seq = bam_get_seq(bw->b);
uint8_t* quals = bam_get_qual(bw->b);
for (int i = 0; i < sd.read_len; ++i) {
sd.bases[i] = cBaseToChar[bam_seqi(seq, i + sd.left_clip)];
sd.base_quals[i] = quals[i + sd.left_clip];
}
}
// 计算read两端clip之后的softstart和softend
void calculateSoftStartEnd(BamWrap* bw, SamData& sd) {
int64_t softStart = bw->b->core.pos + sd.ref_offset;
int64_t softEnd = softStart - 1; // 闭区间
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int cigar_start = sd.cigar_start;
int cigar_end = sd.cigar_end;
bool rightTail = false;
for (int i = sd.cigar_start; i < sd.cigar_end; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (i == sd.cigar_start) len -= sd.first_cigar_clip;
if (i == sd.cigar_end - 1) len -= sd.last_cigar_clip;
// if (c == 'S' || c == 'I')
if (c == 'S')
softStart -= len;
else if (c != 'H')
rightTail = true;
if (rightTail) {
if (consumeRefBases(c) || c == 'S')
softEnd += len;
}
}
sd.softStart() = softStart;
sd.softEnd() = softEnd;
}
// 计算clip之后的cigar
void calculateCigar(BamWrap* bw, SamData& sd) {
sd.cigars.clear();
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int cigar_start = sd.cigar_start;
int cigar_end = sd.cigar_end;
for (int i = sd.cigar_start; i < sd.cigar_end; ++i) {
char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (i == sd.cigar_start)
len -= sd.first_cigar_clip;
if (i == sd.cigar_end - 1)
len -= sd.last_cigar_clip;
//if ((i == sd.cigar_start || i == sd.cigar_end - 1) && c == 'D') // 跳过开头的deletion
if (i == sd.cigar_start && c == 'D') { // 跳过开头的deletion
c = 'H';
// sd.ref_offset += len;
sd.start_pos += len;
}
sd.cigars.push_back({c, len});
}
//for(auto &cigar : sd.cigars) {
// spdlog::info("op: {}, len: {}", cigar.op, cigar.len);
//}
}
// 计算read两端softclip的碱基数量可能会修改ad里的clip值
void calculateSoftClip(BamWrap *bw, SamData &sd) {
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int readIndex = sd.left_clip;
int cutLeft = -1; // first position to hard clip (inclusive)
int cutRight = -1; // first position to hard clip (inclusive)
int cigar_start = sd.cigar_start;
int cigar_end = sd.cigar_end;
bool rightTail = false; // trigger to stop clipping the left tail and start cutting the right tail
for (int i = sd.cigar_start; i < sd.cigar_end; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (i == sd.cigar_start) len -= sd.first_cigar_clip;
if (i == sd.cigar_end - 1) len -= sd.last_cigar_clip;
if (c == 'S') {
if (rightTail) {
cutRight = readIndex;
cigar_end = i;
} else {
cutLeft = readIndex + len - 1;
cigar_start = i + 1;
}
} else if (c != 'H') {
rightTail = true;
}
if (consumeReadBases(c)) {
readIndex += len;
}
}
if (cutRight >= 0) {
sd.right_clip = bw->b->core.l_qseq - cutRight;
sd.cigar_end = cigar_end;
sd.last_cigar_clip = 0;
}
if (cutLeft >= 0) {
sd.left_clip = cutLeft + 1;
sd.cigar_start = cigar_start;
sd.first_cigar_clip = 0;
}
}
// 读取给定区间的reference
static inline void read_ref_base(AuxVar& aux, int64_t cur_pos, Interval& interval) {
if (aux.ref_seq != NULL)
free(aux.ref_seq);
int tid = BamWrap::bam_tid(cur_pos);
const char* chr = sam_hdr_tid2name(aux.header, tid);
int seq_begin = BamWrap::bam_pos(interval.left); //- aux.REFERENCE_HALF_WINDOW_LENGTH;
int seq_end = BamWrap::bam_pos(interval.right); //+ aux.REFERENCE_HALF_WINDOW_LENGTH;
aux.ref_seq = faidx_fetch_seq(aux.faidx, chr, seq_begin, seq_end, &aux.ref_len);
// aux.offset = aux.REFERENCE_HALF_WINDOW_LENGTH;
}
// 设置某个位置是indel
inline void updateIndel(vector<int> &isIndel, int index) {
if (index >=0 && index < isIndel.size()) {
isIndel[index] = 1;
}
}
// 计算该read的每个碱基位置是否是SNP或Indel
int calculateIsSNPOrIndel(AuxVar& aux, SamData &sd, vector<int> &isSNP, vector<int> &isIns, vector<int> &isDel) {
// 1. 读取参考基因组先看看串行运行性能稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤
//Interval interval{bw->start_pos() + sd.ref_offset, bw->end_pos()}; // 闭区间
Interval interval{sd.start_pos, sd.end_pos}; // 闭区间
PROF_START(ref);
read_ref_base(aux, interval.left, interval);
PROF_END(gprof[GP_read_ref], ref);
string refBases(aux.ref_seq);
// spdlog::info("ref: {}, {}, {} - {}", aux.ref_seq, aux.ref_len, bw->contig_pos(), bw->contig_end_pos());
// 2. 遍历cigar计算每个碱基是否是SNP或Indel
int readPos = 0, refPos = 0, nEvents = 0;
for (int i = 0; i<sd.cigars.size(); ++i) {
const char c = sd.cigars[i].op;
int len = sd.cigars[i].len;
if (c == 'M' || c == '=' || c == 'X') {
for (int j = 0; j < len; ++j) {
// 按位置将read和ref碱基进行比较不同则是snp注意read起始位置要加上left_clip
int snpInt = sd.bases[readPos] == refBases[refPos] ? 0 : 1;
// if (snpInt > 0) { spdlog::info("snp {}, readpos: {}", snpInt, readPos); }
isSNP[readPos] = snpInt;
nEvents += snpInt;
readPos++;
refPos++;
}
} else if (c == 'D') {
// 应该是在上一个消耗碱基的cigar的最后一个位置标记Del
int index = sd.bw->GetReadNegativeStrandFlag() ? readPos : readPos - 1;
updateIndel(isDel, index);
refPos += len;
} else if (c == 'N') {
refPos += len;
} else if (c == 'I') {
// 与Del不同Ins应该是在下一个cigar开始的位置标记Ins
bool forwardStrandRead = !sd.bw->GetReadNegativeStrandFlag();
if (forwardStrandRead) {
updateIndel(isIns, readPos - 1);
}
readPos += len;
if (!forwardStrandRead) {
updateIndel(isIns, readPos);
}
} else if (c == 'S') {
readPos += len;
}
}
// const uint32_t* cigar = bam_get_cigar(bw->b);
// const bam1_core_t& bc = bw->b->core;
// uint8_t* seq = bam_get_seq(bw->b);
// for (int i = sd.cigar_start; i < sd.cigar_end; ++i) {
// const char c = bam_cigar_opchr(cigar[i]);
// int len = bam_cigar_oplen(cigar[i]);
// if (i == sd.cigar_start) len -= sd.first_cigar_clip;
// if (i == sd.cigar_end - 1) len -= sd.last_cigar_clip;
// if (c == 'M' || c == '=' || c == 'X') {
// for (int j = 0; j < len; ++j) {
// // 按位置将read和ref碱基进行比较不同则是snp注意read起始位置要加上left_clip
// int snpInt = cBaseToChar[bam_seqi(seq, readPos + sd.left_clip)] == refBases[refPos] ? 0 : 1;
// // if (snpInt > 0) { spdlog::info("snp {}, readpos: {}", snpInt, readPos); }
// isSNP[readPos] = snpInt;
// nEvents += snpInt;
// readPos++;
// refPos++;
// }
// } else if (c == 'D') {
// // 应该是在上一个消耗碱基的cigar的最后一个位置标记Del
// int index = bw->GetReadNegativeStrandFlag() ? readPos : readPos - 1;
// updateIndel(isDel, index);
// refPos += len;
// } else if (c == 'N') {
// refPos += len;
// } else if (c == 'I') {
// // 与Del不同Ins应该是在下一个cigar开始的位置标记Ins
// bool forwardStrandRead = !bw->GetReadNegativeStrandFlag();
// if (forwardStrandRead) {
// updateIndel(isIns, readPos - 1);
// }
// readPos += len;
// if (!forwardStrandRead) {
// updateIndel(isIns, readPos);
// }
// } else if (c == 'S') {
// readPos += len;
// }
// }
nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0);
// spdlog::info("nEvents: {}", nEvents);
//spdlog::info("SNPs: {}, Ins: {}, Del: {}, total events: {}", std::accumulate(isSNP.begin(), isSNP.end(), 0),
// std::accumulate(isIns.begin(), isIns.end(), 0), std::accumulate(isDel.begin(), isDel.end(), 0), nEvents);
// exit(0);
return nEvents;
}
// 简单计算baq数组就是全部赋值为'@' (64)
bool flatBAQArray(SamData& sd, vector<uint8_t>& baqArray) {
baqArray.resize(sd.read_len, (uint8_t)'@');
return true;
}
// 计算真实的baq数组耗时更多好像enable-baq参数默认是关闭的那就先不实现这个了
bool calculateBAQArray(AuxVar& aux, BAQ& baq, SamData& sd, vector<uint8_t>& baqArray) {
baqArray.resize(sd.read_len, 0);
return true;
}
// 获取一行字符串
static void get_line_from_buf(char* buf, int64_t total, int64_t* cur, string* line) {
line->clear();
if (*cur >= total)
return;
char b;
while (*cur < total && (b = buf[(*cur)++]) != '\n') {
line->push_back(b);
}
}
// 计算与read有交叉的已知位点信息 应该要判断一下是按照read的范围去读取vcf还是按照一个batch read的范围去读取
void calculateKnownSites(SamData& sd, vector<VCFParser> &vcfs, vector<bool> &knownSites) {
BamWrap* bw = sd.bw;
int tid = bw->contig_id();
uint64_t startPos = bw->start_pos(); // 闭区间
uint64_t endPos = bw->end_pos(); // 闭区间
// spdlog::info("bam {}, {}", startPos, endPos);
// update vcfs
for(auto &vcf : vcfs) {
// 清理旧的interval
while(!vcf.knownSites.empty()) {
auto& intv = vcf.knownSites.front();
// spdlog::info("intv bam {}, {}", intv.right, startPos);
if (intv.right < startPos)
vcf.knownSites.pop_front();
else
break;
}
if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) continue;
// spdlog::info("intv {}, {}, {}", vcf.knownSites.size(), vcf.knownSites.front().right, vcf.knownSites.front().right);
// exit(0);
//spdlog::info("before intervals : {}", vcf.knownSites.size());
// 读取新的interval
int64_t fpos, flen;
endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos);
Interval readIntv(startPos, endPos);
vcf.index.SearchInterval(startPos, endPos, &fpos, &flen);
//spdlog::info("file index: {}, {}", fpos, flen);
if (flen > 0) {
vcf.inStm.seekg(fpos, ios::beg);
if (flen > vcf.bufLen) {
vcf.bufLen = flen;
vcf.buf = (char*)realloc(vcf.buf, flen);
}
char* buf = vcf.buf;
vcf.inStm.read(buf, flen);
string line;
int64_t cur = 0;
get_line_from_buf(buf, flen, &cur, &line);
while (line.size() > 0) {
stringstream ss_line(line);
string stid;
int tid, pos;
int64_t locus;
string id, ref;
ss_line >> stid >> pos >> id >> ref;
tid = sam_hdr_name2tid(nsgv::gInBamHeader, stid.c_str());
int64_t varStart = BamWrap::bam_global_pos(tid, pos);
Interval varIntv(varStart, varStart + ref.size() - 1);
if (readIntv.overlaps(varIntv)) {
vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间
//spdlog::info("intv-1 {}, {}, {}", tid, pos, ref.size());
}
get_line_from_buf(buf, flen, &cur, &line);
}
}
//spdlog::info("after intervals : {}", vcf.knownSites.size());
//for(auto &val : vcf.knownSites) {
// spdlog::info("intv {}, {}", val.left, val.right);
//}
}
//exit(0);
knownSites.resize(sd.read_len);
endPos = bw->end_pos();
for(auto &vcf : vcfs) {
for (auto &intv : vcf.knownSites) {
// knownSite is outside clipping window for the read, ignore
if (intv.right < sd.softStart())
continue;
if (intv.left > sd.softEnd())
break;
// 计算对应的位点
ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.left);
int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx;
if (rcStart.cigarOp == 'D') {
--featureStartOnRead;
}
ReadIdxCigar rcEnd = sd.getReadIndexForReferenceCoordinate(intv.right);
int featureEndOnRead = rcEnd.readIdx == SamData::READ_INDEX_NOT_FOUND ? sd.read_len : rcEnd.readIdx;
if (featureStartOnRead > sd.read_len) {
featureStartOnRead = featureEndOnRead = sd.read_len;
}
for (int i = max(0, featureStartOnRead); i < min(sd.read_len, featureEndOnRead + 1); ++i) {
knownSites[i] = true;
}
}
}
}
// 应该是计算一段数据的平均值
static void calculateAndStoreErrorsInBlock(int i, int blockStartIndex, vector<int>& errorArr, vector<double>& fracErrs) {
int totalErrors = 0;
for (int j = max(0, blockStartIndex - 1); j <= i; j++) {
totalErrors += errorArr[j];
}
for (int j = max(0, blockStartIndex - 1); j <= i; j++) {
fracErrs[j] = ((double)totalErrors) / ((double)(i - max(0, blockStartIndex - 1) + 1));
}
}
// 应该是用来处理BAQ的把不等于特定BAQ分数的碱基作为一段数据统一处理
void calculateFractionalErrorArray(vector<int>& errorArr, vector<uint8_t>& baqArr, vector<double>& fracErrs) {
// for (auto val : errorArr) { if (val > 0) spdlog::info("snp err val: {}", val); }
fracErrs.resize(baqArr.size());
// errorArray和baqArray必须长度相同
const int BLOCK_START_UNSET = -1;
bool inBlock = false;
int blockStartIndex = BLOCK_START_UNSET;
int i;
for (i = 0; i < fracErrs.size(); ++i) {
if (baqArr[i] == NO_BAQ_UNCERTAINTY) { // 不等于NO_BAQ_UNCERTAINTY的碱基当成一段数据来处理
if (!inBlock) {
fracErrs[i] = errorArr[i];
} else {
calculateAndStoreErrorsInBlock(i, blockStartIndex, errorArr, fracErrs);
inBlock = false; // reset state variables
blockStartIndex = BLOCK_START_UNSET; // reset state variables
}
} else {
inBlock = true;
if (blockStartIndex == BLOCK_START_UNSET) {
blockStartIndex = i;
}
}
}
// 处理最后一个block
if (inBlock) {
calculateAndStoreErrorsInBlock(i - 1, blockStartIndex, errorArr, fracErrs);
}
}
/**
* Update the recalibration statistics using the information in recalInfo.
*
* Implementation detail: we only populate the quality score table and the optional tables.
* The read group table will be populated later by collapsing the quality score table.
*
* @param recalInfo data structure holding information about the recalibration values for a single read
*/
void updateRecalTablesForRead(ReadRecalInfo &info) {
SamData &read = info.read;
PerReadCovariateMatrix& readCovars = info.covariates;
Array3D<RecalDatum>& qualityScoreTable = nsgv::gRecalTables.qualityScoreTable;
Array4D<RecalDatum>& contextTable = nsgv::gRecalTables.contextTable;
Array4D<RecalDatum>& cycleTable = nsgv::gRecalTables.cycleTable;
int readLength = read.read_len;
for (int offset = 0; offset < readLength; ++offset) {
//if (read.rid == 46114) {
// fprintf(gf[3], "%d %d\n", offset, info.skips[offset] ? 1 : 0);
//}
if (!info.skips[offset]) {
//if (true){ // 不跳过当前位置
for (int idx = 0; idx < nsgv::gEventTypes.size(); ++idx) {
// 获取四个值readgroup / qualityscore / context / cycle
EventTypeValue& event = nsgv::gEventTypes[idx];
vector<int>& covariatesAtOffset = readCovars[event.index][offset];
uint8_t qual = info.getQual(event, offset);
double isError = info.getErrorFraction(event, offset);
int readGroup = covariatesAtOffset[ReadGroupCovariate::index];
int baseQuality = covariatesAtOffset[BaseQualityCovariate::index];
// 处理base quality score协变量
// RecalUtils::IncrementDatum3keys(qualityScoreTable, qual, isError, readGroup, baseQuality, event.index);
if (read.rid == 46114) {
// fprintf(gf[3], "%d %d %f\n", offset, baseQuality, isError);
}
qualityScoreTable[readGroup][baseQuality][event.index].increment(1, isError, baseQuality);
auto &d = qualityScoreTable[readGroup][baseQuality][event.index];
// spdlog::info("isError {} : {}, mis {}, obs {}", isError, info.snp_errs[offset], d.numMismatches, d.numObservations);
// 处理context covariate
int contextCovariate = covariatesAtOffset[ContextCovariate::index];
if (contextCovariate >= 0)
contextTable[readGroup][baseQuality][contextCovariate][event.index].increment(1, isError, baseQuality);
// RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.contextTable, qual, isError, readGroup, baseQuality, contextCovariate,
// event.index);
// 处理cycle covariate
int cycleCovariate = covariatesAtOffset[CycleCovariate::index];
if (cycleCovariate >= 0)
cycleTable[readGroup][baseQuality][cycleCovariate][event.index].increment(1, isError, baseQuality);
// RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.cycleTable, qual, isError, readGroup, baseQuality, cycleCovariate, event.index);
}
}
}
// fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1);
// for (auto& arr1 : qualityScoreTable.data) {
// for (size_t si = 0; si < arr1.size(); ++si) {
// for (auto &val : arr1[si]) {
// if (val.numObservations > 0)
// fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality);
// }
// }
// }
// fprintf(gf[3], "\n");
// fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1);
// for (auto& arr1 : contextTable.data) {
// for (size_t si = 0; si < arr1.size(); ++si) {
// for (auto &arr2 : arr1[si]) {
// for (auto& val : arr2) {
// if (val.numObservations > 0)
// fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality);
// }
// }
// }
// }
// fprintf(gf[3], "\n");
}
// 数据总结
void collapseQualityScoreTableToReadGroupTable(Array2D<RecalDatum> &byReadGroupTable, Array3D<RecalDatum> &byQualTable) {
// 遍历quality table
for (int k1 = 0; k1 < byQualTable.data.size(); ++k1) {
for (int k2 = 0; k2 < byQualTable[k1].size(); ++k2) {
for (int k3 = 0; k3 < byQualTable[k1][k2].size(); ++k3) {
auto& qualDatum = byQualTable[k1][k2][k3];
if (qualDatum.numObservations > 0) {
int rgKey = k1;
int eventIndex = k3;
// spdlog::info("k1 {}, k2 {}, k3 {}, numMis {}", k1, k2, k3, qualDatum.numMismatches);
byReadGroupTable[rgKey][eventIndex].combine(qualDatum);
// spdlog::info("rg {} {}, k3 {}", byReadGroupTable[rgKey][eventIndex].numMismatches, byReadGroupTable[rgKey][eventIndex].reportedQuality, k3);
}
}
}
}
}
/**
* To replicate the results of BQSR whether or not we save tables to disk (which we need in Spark),
* we need to trim the numbers to a few decimal placed (that's what writing and reading does).
*/
void roundTableValues(RecalTables& rt) {
#define _round_val(val) \
do { \
if (val.numObservations > 0) { \
val.numMismatches = MathUtils::RoundToNDecimalPlaces(val.numMismatches, RecalUtils::NUMBER_ERRORS_DECIMAL_PLACES); \
val.reportedQuality = MathUtils::RoundToNDecimalPlaces(val.reportedQuality, RecalUtils::REPORTED_QUALITY_DECIMAL_PLACES); \
} \
} while (0)
_Foreach2D(rt.readGroupTable, val, { _round_val(val); });
_Foreach3D(rt.qualityScoreTable, val, { _round_val(val); });
_Foreach4D(rt.contextTable, val, { _round_val(val); });
_Foreach4D(rt.cycleTable, val, { _round_val(val); });
}
2025-12-04 22:26:13 +08:00
// 串行bqsr
int SerialBQSR() {
open_debug_files();
2025-12-04 22:26:13 +08:00
int round = 0;
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
// inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut);
2025-12-04 22:26:13 +08:00
int64_t readNumSum = 0;
// 0. 初始化一些全局数据
// BAQ baq{BAQ::DEFAULT_GOP};
RecalDatum::StaticInit();
QualityUtils::StaticInit();
MathUtils::StaticInit();
// 1. 协变量数据相关初始化
PerReadCovariateMatrix readCovariates;
CovariateUtils::InitPerReadCovMat(readCovariates);
ContextCovariate::InitContextCovariate(nsgv::gBqsrArg);
CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg);
// 注意初始化顺序,这个必须在协变量初始化之后
nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg);
nsgv::gEventTypes.push_back(EventType::BASE_SUBSTITUTION);
if (nsgv::gBqsrArg.computeIndelBQSRTables) {
nsgv::gEventTypes.push_back(EventType::BASE_INSERTION);
nsgv::gEventTypes.push_back(EventType::BASE_DELETION);
}
// 2. 读取bam的read group
if (nsgv::gInBamHeader->hrecs->nrg == 0) {
spdlog::error("No RG tag found in the header!");
return 1;
}
for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) {
spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name);
ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i;
ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name;
}
int test = 0;
2025-12-04 22:26:13 +08:00
while (1) {
++ round;
// 一. 读取bam数据
2025-12-04 22:26:13 +08:00
size_t readNum = 0;
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam();
if (readNum < 1) {
break;
}
auto bams = inBamBuf.GetBamArr();
spdlog::info("{} reads processed in {} round, {}", readNum, round, test);
// 二. 遍历每个bamread记录进行处理
for (int i = 0; i < bams.size(); ++i) {
// 1. 对每个read需要检查cigar是否合法即没有两个连续的相同的cigar而且需要将首尾的deletion处理掉目前看好像没啥影响我们忽略这一步
// 2. 对质量分数长度跟碱基长度不匹配的read缺少的质量分数用默认值补齐先忽略后边有需要再处理
// 3. 如果bam文件之前做过bqsrtag中包含OQoriginnal quality原始质量分数检查用户参数里是否指定用原始质量分数进行bqsr如果是则将质量分数替换为OQ否则忽略OQ先忽略
// 4. 对read的两端进行检测去除hardclipadapter
// spdlog::info("bam idx: {}", i);
BamWrap* bw = bams[i];
SamData sd;
sd.rid = i + readNumSum;
sd.bw = bw;
sd.read_len = BamWrap::BamEffectiveLength(bw->b);
sd.cigar_end = bw->b->core.n_cigar;
if (sd.read_len <= 0) continue;
int adapter_boundary = bw->GetAdapterBoundary();
if (bw->IsAdapterInRead(adapter_boundary)) {
// adapter在read范围内
if (bw->GetReadNegativeStrandFlag()) { // 反链
clipByReferenceCoordinates(bw, -1, adapter_boundary, sd);
} else { // 正链
clipByReferenceCoordinates(bw, adapter_boundary, -1, sd);
}
}
sd.read_len = bw->b->core.l_qseq - sd.left_clip - sd.right_clip; // 更新read长度
// 5. 然后再去除softclip部分
calculateSoftClip(bw, sd);
sd.read_len = bw->b->core.l_qseq - sd.left_clip - sd.right_clip; // 更新read长度
if (sd.read_len <= 0) continue;
calculateRefOffset(bw, sd); // 计算ref_offset就是相对比对的position要将ref右移多少
calculateReadBases(bw, sd); // 计算clip处理之后剩余的碱基
// 计算clip之后两端的softstart和softend
calculateSoftStartEnd(bw, sd);
calculateCigar(bw, sd);
//spdlog::info("read-len {} - {}: clip left {}, right {}, ref offset: {}, cigar range: [{}, {}), cigar: {}", bw->b->core.l_qseq,
// sd.read_len, sd.left_clip, sd.right_clip, sd.ref_offset, sd.cigar_start, sd.cigar_end, bw->cigar_str());
// 6. 更新每个read的platform信息好像没啥用暂时忽略
vector<int> isSNP(sd.read_len, 0); // 该位置是否是SNP位置0不是1是
vector<int> isIns(sd.read_len, 0); // 该位置是否是插入位置0不是1是
vector<int> isDel(sd.read_len, 0); // 该位置是否是删除位置0不是1是
const int nErrors = calculateIsSNPOrIndel(nsgv::gAuxVars[0], sd, isSNP, isIns, isDel);
/*fprintf(gf[0], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]);
fprintf(gf[0], "\n");
fprintf(gf[1], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]);
fprintf(gf[1], "\n");
fprintf(gf[2], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]);
fprintf(gf[2], "\n");
*/
// spdlog::info("nErrors: {}", nErrors);
// for (auto val : isSNP) { if (val > 0) spdlog::info("snp val: {}", val); }
//exit(0);
// 7. 计算baqArray
// BAQ = base alignment quality
// note for efficiency reasons we don't compute the BAQ array unless we actually have
// some error to marginalize over. For ILMN data ~85% of reads have no error
vector<uint8_t> baqArray;
bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = flatBAQArray(sd, baqArray);
} else {
// baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray);
}
if (!baqCalculated) continue;
// 到这里基本的数据都准备好了后续就是进行bqsr的统计了
// 8. 计算这条read对应的协变量
PROF_START(covariate);
CovariateUtils::ComputeCovariates(sd, nsgv::gInBamHeader, readCovariates, true);
PROF_END(gprof[GP_covariate], covariate);
// fprintf(gf[3], "%ld %ld %d %ld\n", sd.rid, readCovariates.size(), sd.read_len, readCovariates[0][0].size());
// for (auto &arr1 : readCovariates) {
// for (size_t si = 0; si < sd.read_len; ++si) {
// for (auto &val : arr1[si]) {
// fprintf(gf[3], "%d ", val);
// }
// }
// }
// fprintf(gf[3], "\n");
// fprintf(gf[3], "%ld %d\n", sd.rid, sd.read_len);
// {
// auto& arr1 = readCovariates[0];
// {
// for (int pos = 0; pos < sd.read_len; ++pos) {
// fprintf(gf[3], "%d %d\n", pos, arr1[pos][2]);
// }
// }
// }
// fprintf(gf[3], "\n");
//test = readCovariates[1][0][0] + readCovariates[2][1][3];
//int end_pos = bw->contig_end_pos();
//spdlog::info("adapter: {}, read: {}, {}, strand: {}", adapter_boundary, bw->contig_pos(), end_pos,
// bw->GetReadNegativeStrandFlag() ? "reverse" : "forward");
// for (auto val : isSNP) { if (val > 0) spdlog::info("snp err val-1: {}", val); }
// 9. 计算这条read需要跳过的位置
vector<bool> skips(sd.read_len, 0);
PROF_START(known_sites);
calculateKnownSites(sd, nsgv::gAuxVars[0].vcfArr, skips);
for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
}
PROF_END(gprof[GP_read_vcf], known_sites);
// fprintf(gf[0], "%d\t", sd.read_len);
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0);
// fprintf(gf[0], "\n");
// 10. 根据BAQ进一步处理snpindel得到处理后的数据
vector<double> snpErrors, insErrors, delErrors;
// for (auto val : isSNP) { if (val > 0) spdlog::info("snp err val-2: {}", val); }
calculateFractionalErrorArray(isSNP, baqArray, snpErrors);
calculateFractionalErrorArray(isIns, baqArray, insErrors);
calculateFractionalErrorArray(isDel, baqArray, delErrors);
// for (auto val : isSNP) { if (val > 0) spdlog::info("snp val: {}", val); }
//spdlog::info("snp errors size: {}, read len: {}", snpErrors.size(), sd.read_len);
//for (auto val : snpErrors) { if (val > 0) spdlog::info("snp err val: {}", val); }
// aggregate all of the info into our info object, and update the data
// 11. 合并之前计算的数据得到info并更新bqsr table数据
ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors);
int m = 0;
// for (auto err : snpErrors) { if (isSNP[m] > 0 || err > 0) spdlog::info("snp err: {} : {}", isSNP[m++], err); }
//exit(0);
PROF_START(update_info);
updateRecalTablesForRead(info);
PROF_END(gprof[GP_update_info], update_info);
}
// exit(0);
2025-12-04 22:26:13 +08:00
readNumSum += readNum;
inBamBuf.ClearAll(); //
2025-12-04 22:26:13 +08:00
}
spdlog::info("read count: {}", readNumSum);
// 12. 创建总结数据
collapseQualityScoreTableToReadGroupTable(nsgv::gRecalTables.readGroupTable, nsgv::gRecalTables.qualityScoreTable);
roundTableValues(nsgv::gRecalTables);
// 13. 量化质量分数
QuantizationInfo quantInfo(nsgv::gRecalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);
// 14. 输出结果
RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, nsgv::gRecalTables);
close_debug_files();
2025-12-04 22:26:13 +08:00
return 0;
}
// 需要支持vcf idxtbicsi三种索引方式
// vcf和idx是一对
// vcf.gz和tbi或csi是一对
// entrance of mark BQSR
2025-12-04 22:26:13 +08:00
int BaseRecalibrator() {
2025-11-23 23:03:37 +08:00
PROF_START(whole_process);
/* bam */
nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
2025-12-04 22:26:13 +08:00
// 初始化AuxVar
nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS);
for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) {
nsgv::gAuxVars[i].header = nsgv::gInBamHeader;
nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str());
if (nsgv::gAuxVars[i].faidx == 0)
error("[%s] fail to load the fasta index.\n", __func__);
for (auto &vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) {
nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader));
}
}
2025-11-23 23:03:37 +08:00
// (libraryId)
nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0);
2025-12-04 22:26:13 +08:00
/* 并行读取bam数据 */
2025-11-23 23:03:37 +08:00
htsThreadPool htsPoolRead = {NULL, 0}; //
htsThreadPool htsPoolWrite = {NULL, 0}; //
htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
htsPoolWrite.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
if (!htsPoolRead.pool || !htsPoolWrite.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
2025-12-04 22:26:13 +08:00
return SerialBQSR();
2025-11-23 23:03:37 +08:00
// 读取known sites vcfs
for (const auto& ks : nsgv::gBqsrArg.KNOWN_SITES_VCFS) {
spdlog::info(" {}", ks);
bcf_srs_t* srs = bcf_sr_init();
if (!bcf_sr_add_reader(srs, ks.c_str()))
error("Failed to read from %s: %s\n", !strcmp("-", ks.c_str()) ? "standard input" : ks.c_str(), bcf_sr_strerror(srs->errnum));
nsgv::gKnownSitesVcfSrs.push_back(srs);
while (bcf_sr_next_line(srs)) {
bcf1_t* line = srs->readers[0].buffer[0];
cout << line->pos << '\t' << line->rlen << '\t' << line->n_allele << '\t' << line->n_info << endl;
}
}
/* 先实现串行的bqsr-phase-1 */
2025-12-04 22:26:13 +08:00
2025-11-23 23:03:37 +08:00
sam_close(nsgv::gInBamFp);
PROF_END(gprof[GP_whole_process], whole_process);
return 0;
}