FastBQSR/src/bqsr/bqsr_entry.cpp

702 lines
28 KiB
C++
Raw Normal View History

2025-11-23 23:03:37 +08:00
/*
Description:
bambambam
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/10/23
*/
#include <htslib/faidx.h>
#include <htslib/kstring.h>
2025-11-23 23:03:37 +08:00
#include <htslib/sam.h>
2025-12-04 22:26:13 +08:00
#include <htslib/synced_bcf_reader.h>
2025-11-23 23:03:37 +08:00
#include <htslib/thread_pool.h>
#include <header.h>
2025-11-23 23:03:37 +08:00
#include <spdlog/spdlog.h>
#include <iomanip>
#include <numeric>
2025-11-23 23:03:37 +08:00
#include <vector>
#include <queue>
2025-11-23 23:03:37 +08:00
#include "baq.h"
2025-11-23 23:03:37 +08:00
#include "bqsr_args.h"
#include "bqsr_funcs.h"
#include "bqsr_pipeline.h"
#include "covariate.h"
2025-12-04 22:26:13 +08:00
#include "dup_metrics.h"
#include "fastbqsr_version.h"
2025-11-23 23:03:37 +08:00
#include "read_name_parser.h"
#include "util/interval.h"
2025-11-23 23:03:37 +08:00
#include "util/profiling.h"
2025-12-04 22:26:13 +08:00
#include "util/utils.h"
#include "util/linear_index.h"
using std::deque;
2025-11-23 23:03:37 +08:00
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
const char cBaseToChar[16] = {'N', 'A', 'C', 'N', 'G', 'N', 'N', 'N', 'T', 'N', 'N', 'N', 'N', 'N', 'N', 'N'};
// 解析knownSites
struct VCFParser {
deque<Interval> knownSites; // 已知的变异位点
char* buf = nullptr; // // 数据buffer
uint32_t bufLen = 4 * 1024; // 数据buffer长度
LinearIndex index; // vcf文件索引
ifstream inStm; // vcf文件流
VCFParser() { Init(); }
VCFParser(const string& vcfFileName) { Init(vcfFileName); }
VCFParser(const string& vcfFileName, sam_hdr_t* samHeader) { Init(vcfFileName, samHeader); }
void Init() { buf = (char*)malloc(bufLen); }
void Init(const string& vcfFileName) {
Init();
inStm.open(vcfFileName, ifstream::in);
string idxFileName = vcfFileName + ".idx";
if (!index.ReadIndex(idxFileName))
error("[%s] fail to load the %s index file\n", __func__, idxFileName.c_str());
}
void Init(const string& vcfFileName, sam_hdr_t *samHeader) {
index.SetHeader(samHeader);
Init(vcfFileName);
}
};
// 解析后的一些参数,文件,数据等
struct AuxVar {
const static int REF_CONTEXT_PAD = 3; // 需要做一些填充
const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列防止边界效应
sam_hdr_t* header = nullptr; // bam header
faidx_t* faidx = nullptr; // reference index
char* ref_seq = nullptr; // reference sequence
int ref_len = 0; // reference sequence length
int offset = 0; // 在要求的ref序列两边多余取出的碱基数量
vector<VCFParser> vcfArr; // 从vcf中获取已知位点
};
2025-11-23 23:03:37 +08:00
namespace nsgv {
// 全局变量 for bqsr
BQSRArg gBqsrArg; // bqsr arguments
samFile* gInBamFp; // input BAM file pointer
sam_hdr_t* gInBamHeader; // input BAM header
vector<AuxVar> gAuxVars; // auxiliary variables保存一些文件数据等每个线程对应一个
2025-12-04 22:26:13 +08:00
// 下面是需要删除或修改的变量
2025-11-23 23:03:37 +08:00
std::vector<ReadNameParser> gNameParsers; // read name parser
2025-12-04 22:26:13 +08:00
DuplicationMetrics gMetrics; //
DupResult gDupRes;
PipelineArg gPipe(&gDupRes);
2025-11-23 23:03:37 +08:00
samFile *gOutBamFp; // , sambam
sam_hdr_t *gOutBamHeader; // header
2025-12-04 22:26:13 +08:00
vector <bcf_srs_t*> gKnownSitesVcfSrs; // known sites vcf srs
2025-11-23 23:03:37 +08:00
}; // namespace nsgv
//
struct ByteBuf {
uint8_t *buf = nullptr;
int size = 0; //
int capacity = 0; //
};
// 读进来的这一批bam总共占了几个染色体这个方案不行读取太多没必要
// 开区间
struct Region {
int64_t start;
int64_t end;
};
2025-11-23 23:03:37 +08:00
/*
*
*/
static string getFileExtension(const string &filename) {
auto last_dot = filename.find_last_of('.');
if (last_dot == string::npos) {
return "";
}
return filename.substr(last_dot + 1);
}
// 过滤掉bqsr过程不符合要求的bam数据
bool bqsrReadFilterOut(const bam1_t *b) {
// 过滤掉unmapped的read
if (b->core.qual == 0) // mapping quality 0
return true;
if (b->core.qual == 255) // mapping quality not available
return true;
if (b->core.flag & BAM_FUNMAP || b->core.tid == -1 || b->core.pos == -1) { // unmapped
return true;
}
if (b->core.flag & BAM_FSECONDARY) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FDUP) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FQCFAIL) { // Not passing quality controls
return true;
}
return false;
}
// 该操作符是否消耗read的碱基
bool consumeReadBases(char cigar) {
return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'I' || cigar == 'S';
}
// 该操作符是否消耗参考基因组的碱基
bool consumeRefBases(char cigar) {
return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'D' || cigar == 'N';
}
// 给定一个ref位置在read内部找到对应的位置和操作符
struct PosAndOperator {
int readPosAtRefCoord = -1; // read中的位置
char cigarOperator = '0'; // cigar操作符
int cigarIndex = -1; // cigar索引
int cigarLen = 0;
int preCigarLen = 0; // 截止cigar之前的消耗read base的长度
};
/**
* Find the 0-based index within a read base array corresponding to a given 0-based position in the reference, along with the cigar operator of
* the element containing that base. If the reference coordinate occurs within a deletion, the first index after the deletion is returned.
* Note that this treats soft-clipped bases as if they align with the reference, which is useful for hard-clipping reads with soft clips.
*
* @param alignmentStart The soft start of the read on the reference
* @param cigar The read's cigar
* @param refCoord The target reference coordinate
* @return If the reference coordinate occurs before the read start or after the read end {@code CLIPPING_GOAL_NOT_REACHED};
* if the reference coordinate falls within an alignment block of the read's cigar, the corresponding read coordinate;
* if the reference coordinate falls within a deletion, the first read coordinate after the deletion. Note: if the last
* cigar element is a deletion (which isn't meaningful), it returns {@code CLIPPING_GOAL_NOT_REACHED}.
*/
PosAndOperator getReadIndexForReferenceCoordinate(BamWrap *bw, int alignmentStart, int refCoord) {
PosAndOperator po;
if (refCoord < alignmentStart) {
return po;
}
int firstReadPosOfElement = 0; // inclusive
int firstRefPosOfElement = alignmentStart; // inclusive
int lastReadPosOfElement = 0; // exclusive
int lastRefPosOfElement = alignmentStart; // exclusive
// advance forward through all the cigar elements until we bracket the reference coordinate
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
const int idx = bc.n_cigar - 1;
if (idx < 0)
return po;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
firstReadPosOfElement = lastReadPosOfElement;
firstRefPosOfElement = lastRefPosOfElement;
lastReadPosOfElement += consumeReadBases(c) ? len : 0;
lastRefPosOfElement += (consumeRefBases(c) || c == 'S') ? len : 0;
if (firstRefPosOfElement <= refCoord && refCoord < lastRefPosOfElement) { // refCoord falls within this cigar element
int readPosAtRefCoord = firstReadPosOfElement + (consumeReadBases(c) ? (refCoord - firstRefPosOfElement) : 0);
return PosAndOperator{readPosAtRefCoord, c, i, len, firstReadPosOfElement};
}
}
return po;
}
// 根据adapter位置对read进行hardclip返回左侧或右侧减掉的base数量
void clipByReferenceCoordinates(BamWrap *bw, int refStart, int refStop, ReadAdditionData &ad) {
int start, stop;
// Determine the read coordinate to start and stop hard clipping
if (refStart < 0) {
if (refStop < 0) return;
PosAndOperator stopPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStop);
// if the refStop falls in a deletion, the above method returns the position after the deletion. Since the stop we return here
// is inclusive, we decrement the stop to avoid overclipping by one base. As a result we do not clip the deletion, which is fine.
stop = stopPosAndOperator.readPosAtRefCoord - (consumeReadBases(stopPosAndOperator.cigarOperator) ? 0 : 1);
ad.left_clip = stop + 1;
ad.cigar_start = stopPosAndOperator.cigarIndex;
ad.first_cigar_clip = ad.left_clip - stopPosAndOperator.preCigarLen;
} else {
if (refStop >= 0) return;
// unlike the above case where we clip the start fo the read, here we clip the end and returning the base to the right of a deletion avoids
// overclipping
PosAndOperator startPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStart);
start = startPosAndOperator.readPosAtRefCoord;
ad.right_clip = bw->b->core.l_qseq - start;
ad.cigar_end = startPosAndOperator.cigarIndex + 1;
ad.last_cigar_clip = startPosAndOperator.preCigarLen + startPosAndOperator.cigarLen - start;
}
}
// 计算切掉adapter之后ref相对原始ref的偏移量
void calculateRefOffset(BamWrap *bw, ReadAdditionData &ad) {
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int i = 0;
for (i = 0; i < ad.cigar_start; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (consumeRefBases(c)) {
ad.ref_offset += len;
}
}
const char c = bam_cigar_opchr(cigar[i]);
if (consumeRefBases(c)) {
ad.ref_offset += ad.first_cigar_clip;
}
}
// 计算clip处理之后剩余的碱基
void calculateReadBases(BamWrap* bw, ReadAdditionData& ad) {
ad.bases.resize(ad.read_len);
uint8_t* seq = bam_get_seq(bw->b);
for (int i = 0; i < ad.read_len; ++i) {
ad.bases[i] = cBaseToChar[bam_seqi(seq, i + ad.left_clip)];
}
}
// 计算read两端softclip的碱基数量可能会修改ad里的clip值
void calculateSoftClip(BamWrap *bw, ReadAdditionData &ad) {
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
int readIndex = ad.left_clip;
int cutLeft = -1; // first position to hard clip (inclusive)
int cutRight = -1; // first position to hard clip (inclusive)
int cigar_start = ad.cigar_start;
int cigar_end = ad.cigar_end;
bool rightTail = false; // trigger to stop clipping the left tail and start cutting the right tail
for (int i = ad.cigar_start; i < ad.cigar_end; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (i == ad.cigar_start) len -= ad.first_cigar_clip;
if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip;
if (c == 'S') {
if (rightTail) {
cutRight = readIndex;
cigar_end = i;
} else {
cutLeft = readIndex + len - 1;
cigar_start = i + 1;
}
} else if (c != 'H') {
rightTail = true;
}
if (consumeReadBases(c)) {
readIndex += len;
}
}
if (cutRight >= 0) {
ad.right_clip = bw->b->core.l_qseq - cutRight;
ad.cigar_end = cigar_end;
ad.last_cigar_clip = 0;
}
if (cutLeft >= 0) {
ad.left_clip = cutLeft + 1;
ad.cigar_start = cigar_start;
ad.first_cigar_clip = 0;
}
}
// 读取给定区间的reference
static inline void read_ref_base(AuxVar& aux, int64_t cur_pos, Interval& interval) {
if (aux.ref_seq != NULL)
free(aux.ref_seq);
int tid = BamWrap::bam_tid(cur_pos);
const char* chr = sam_hdr_tid2name(aux.header, tid);
int seq_begin = BamWrap::bam_pos(interval.left); //- aux.REFERENCE_HALF_WINDOW_LENGTH;
int seq_end = BamWrap::bam_pos(interval.right); //+ aux.REFERENCE_HALF_WINDOW_LENGTH;
aux.ref_seq = faidx_fetch_seq(aux.faidx, chr, seq_begin, seq_end, &aux.ref_len);
// aux.offset = aux.REFERENCE_HALF_WINDOW_LENGTH;
}
// 设置某个位置是indel
inline void updateIndel(vector<int> &isIndel, int index) {
if (index >=0 && index < isIndel.size()) {
isIndel[index] = 1;
}
}
// 计算该read的每个碱基位置是否是SNP或Indel
int calculateIsSNPOrIndel(AuxVar& aux, BamWrap *bw, ReadAdditionData &ad, vector<int> &isSNP, vector<int> &isIns, vector<int> &isDel) {
// 1. 读取参考基因组先看看串行运行性能稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤
Interval interval{bw->start_pos() + ad.ref_offset, bw->end_pos()}; // 闭区间
read_ref_base(aux, interval.left, interval);
string refBases(aux.ref_seq);
// spdlog::info("ref: {}, {}, {} - {}", aux.ref_seq, aux.ref_len, bw->contig_pos(), bw->contig_end_pos());
// 2. 遍历cigar计算每个碱基是否是SNP或Indel
int readPos = 0, refPos = 0, nEvents = 0;
const uint32_t* cigar = bam_get_cigar(bw->b);
const bam1_core_t& bc = bw->b->core;
uint8_t* seq = bam_get_seq(bw->b);
for (int i = ad.cigar_start; i < ad.cigar_end; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
int len = bam_cigar_oplen(cigar[i]);
if (i == ad.cigar_start) len -= ad.first_cigar_clip;
if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip;
if (c == 'M' || c == '=' || c == 'X') {
for (int j = 0; j < len; ++j) {
// 按位置将read和ref碱基进行比较不同则是snp注意read起始位置要加上left_clip
int snpInt = cBaseToChar[bam_seqi(seq, readPos + ad.left_clip)] == refBases[refPos] ? 0 : 1;
isSNP[readPos] = snpInt;
nEvents += snpInt;
readPos++;
refPos++;
}
} else if (c == 'D') {
// 应该是在上一个消耗碱基的cigar的最后一个位置标记Del
int index = bw->GetReadNegativeStrandFlag() ? readPos : readPos - 1;
updateIndel(isDel, index);
refPos += len;
} else if (c == 'N') {
refPos += len;
} else if (c == 'I') {
// 与Del不同Ins应该是在下一个cigar开始的位置标记Ins
bool forwardStrandRead = !bw->GetReadNegativeStrandFlag();
if (forwardStrandRead) {
updateIndel(isIns, readPos - 1);
}
readPos += len;
if (!forwardStrandRead) {
updateIndel(isIns, readPos);
}
} else if (c == 'S') {
readPos += len;
}
}
nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0);
//spdlog::info("SNPs: {}, Ins: {}, Del: {}, total events: {}", std::accumulate(isSNP.begin(), isSNP.end(), 0),
// std::accumulate(isIns.begin(), isIns.end(), 0), std::accumulate(isDel.begin(), isDel.end(), 0), nEvents);
// exit(0);
return nEvents;
}
// 简单计算baq数组就是全部赋值为'@' (64)
bool flatBAQArray(BamWrap* bw, ReadAdditionData& ad, vector<int>& baqArray) {
baqArray.resize(ad.read_len, (int)'@');
return true;
}
// 计算真实的baq数组耗时更多好像enable-baq参数默认是关闭的那就先不实现这个了
bool calculateBAQArray(AuxVar& aux, BAQ& baq, BamWrap* bw, ReadAdditionData& ad, vector<int>& baqArray) {
baqArray.resize(ad.read_len, 0);
return true;
}
// 获取一行字符串
static void get_line_from_buf(char* buf, int64_t total, int64_t* cur, string* line) {
line->clear();
if (*cur >= total)
return;
char b;
while (*cur < total && (b = buf[(*cur)++]) != '\n') {
line->push_back(b);
}
}
// 计算与read有交叉的已知位点信息 应该要判断一下是按照read的范围去读取vcf还是按照一个batch read的范围去读取
void calculateKnownSites(BamWrap* bw, ReadAdditionData& ad, vector<VCFParser> &vcfs) {
int tid = bw->contig_id();
uint64_t startPos = bw->start_pos(); // 闭区间
uint64_t endPos = bw->end_pos(); // 闭区间
// spdlog::info("bam {}, {}", startPos, endPos);
// update vcfs
for(auto &vcf : vcfs) {
// 清理旧的interval
while(!vcf.knownSites.empty()) {
auto& intv = vcf.knownSites.front();
// spdlog::info("intv bam {}, {}", intv.right, startPos);
if (intv.right < startPos)
vcf.knownSites.pop_front();
else
break;
}
if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) continue;
// spdlog::info("intv {}, {}, {}", vcf.knownSites.size(), vcf.knownSites.front().right, vcf.knownSites.front().right);
// exit(0);
//spdlog::info("before intervals : {}", vcf.knownSites.size());
// 读取新的interval
int64_t fpos, flen;
vcf.index.SearchInterval(startPos, endPos, &fpos, &flen);
//spdlog::info("file index: {}, {}", fpos, flen);
if (flen > 0) {
vcf.inStm.seekg(fpos, ios::beg);
if (flen > vcf.bufLen) {
vcf.bufLen = flen;
vcf.buf = (char*)realloc(vcf.buf, flen);
}
char* buf = vcf.buf;
vcf.inStm.read(buf, flen);
string line;
int64_t cur = 0;
get_line_from_buf(buf, flen, &cur, &line);
while (line.size() > 0) {
stringstream ss_line(line);
string stid;
int tid, pos;
int64_t locus;
string id, ref;
ss_line >> stid >> pos >> id >> ref;
tid = sam_hdr_name2tid(nsgv::gInBamHeader, stid.c_str());
if (tid >= 0 && pos > 0) {
vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size()));
//spdlog::info("intv-1 {}, {}, {}", tid, pos, ref.size());
}
get_line_from_buf(buf, flen, &cur, &line);
}
}
//spdlog::info("after intervals : {}", vcf.knownSites.size());
//for(auto &val : vcf.knownSites) {
// spdlog::info("intv {}, {}", val.left, val.right);
//}
}
//exit(0);
}
2025-12-04 22:26:13 +08:00
// 串行bqsr
int SerialBQSR() {
int round = 0;
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
// inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut);
2025-12-04 22:26:13 +08:00
int64_t readNumSum = 0;
// 0. 初始化一些全局数据
// BAQ baq{BAQ::DEFAULT_GOP};
// 1. 协变量数据相关初始化
PerReadCovariateMatrix readCovariates;
CovariateUtils::InitPerReadCovMat(readCovariates);
ContextCovariate::InitContextCovariate(nsgv::gBqsrArg);
CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg);
// 2. 读取bam的read group
if (nsgv::gInBamHeader->hrecs->nrg == 0) {
spdlog::error("No RG tag found in the header!");
return 1;
}
for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) {
spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name);
ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i;
ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name;
}
int test = 0;
2025-12-04 22:26:13 +08:00
while (1) {
++ round;
// 一. 读取bam数据
2025-12-04 22:26:13 +08:00
size_t readNum = 0;
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam();
if (readNum < 1) {
break;
}
auto bams = inBamBuf.GetBamArr();
spdlog::info("{} reads processed in {} round, {}", readNum, round, test);
// 二. 遍历每个bamread记录进行处理
for (int i = 0; i < bams.size(); ++i) {
// 1. 对每个read需要检查cigar是否合法即没有两个连续的相同的cigar而且需要将首尾的deletion处理掉目前看好像没啥影响我们忽略这一步
// 2. 对质量分数长度跟碱基长度不匹配的read缺少的质量分数用默认值补齐先忽略后边有需要再处理
// 3. 如果bam文件之前做过bqsrtag中包含OQoriginnal quality原始质量分数检查用户参数里是否指定用原始质量分数进行bqsr如果是则将质量分数替换为OQ否则忽略OQ先忽略
// 4. 对read的两端进行检测去除hardclipadapter
BamWrap *bw = bams[i];
ReadAdditionData ad;
ad.read_len = BamWrap::BamEffectiveLength(bw->b);
ad.cigar_end = bw->b->core.n_cigar;
if (ad.read_len <= 0) continue;
int adapter_boundary = bw->GetAdapterBoundary();
if (bw->IsAdapterInRead(adapter_boundary)) {
// adapter在read范围内
if (bw->GetReadNegativeStrandFlag()) { // 反链
clipByReferenceCoordinates(bw, -1, adapter_boundary, ad);
} else { // 正链
clipByReferenceCoordinates(bw, adapter_boundary, -1, ad);
}
}
ad.read_len = bw->b->core.l_qseq - ad.left_clip - ad.right_clip; // 更新read长度
// 5. 然后再去除softclip部分
calculateSoftClip(bw, ad);
ad.read_len = bw->b->core.l_qseq - ad.left_clip - ad.right_clip; // 更新read长度
if (ad.read_len <= 0) continue;
calculateRefOffset(bw, ad); // 计算ref_offset就是相对比对的position要将ref右移多少
calculateReadBases(bw, ad); // 计算clip处理之后剩余的碱基
//spdlog::info("read-len {} - {}: clip left {}, right {}, ref offset: {}, cigar range: [{}, {}), cigar: {}", bw->b->core.l_qseq,
// ad.read_len, ad.left_clip, ad.right_clip, ad.ref_offset, ad.cigar_start, ad.cigar_end, bw->cigar_str());
// 6. 更新每个read的platform信息好像没啥用暂时忽略
vector<int> isSNP(ad.read_len, 0); // 该位置是否是SNP位置0不是1是
vector<int> isIns(ad.read_len, 0); // 该位置是否是插入位置0不是1是
vector<int> isDel(ad.read_len, 0); // 该位置是否是删除位置0不是1是
const int nErrors = calculateIsSNPOrIndel(nsgv::gAuxVars[0], bw, ad, isSNP, isIns, isDel);
// 7. 计算baqArray
// BAQ = base alignment quality
// note for efficiency reasons we don't compute the BAQ array unless we actually have
// some error to marginalize over. For ILMN data ~85% of reads have no error
vector<int> baqArray;
bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = flatBAQArray(bw, ad, baqArray);
} else {
// baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, bw, ad, baqArray);
}
if (!baqCalculated) continue;
// 到这里基本的数据都准备好了后续就是进行bqsr的统计了
// 8. 计算这条read对应的协变量
CovariateUtils::ComputeCovariates(bw, ad, nsgv::gInBamHeader, readCovariates, true);
test = readCovariates[1][0][0] + readCovariates[2][1][3];
int end_pos = bw->contig_end_pos();
//spdlog::info("adapter: {}, read: {}, {}, strand: {}", adapter_boundary, bw->contig_pos(), end_pos,
// bw->GetReadNegativeStrandFlag() ? "reverse" : "forward");
// 9. 计算这条read需要跳过的位置
vector<bool> skip(ad.read_len, 0);
calculateKnownSites(bw, ad, nsgv::gAuxVars[0].vcfArr);
}
#if 0
// spdlog::info("region: {} - {}", bams[0]->global_softclip_start(), bams.back()->global_softclip_end());
2025-12-04 22:26:13 +08:00
// 1. 获取bams数组覆盖的region范围
// 如果读取的bam数组跨越了不同的染色体咋搞还是按照每个线程都有独立的vcf文件来做吧
int64_t region_start = bams[0]->global_softclip_start();
vector<Region> contig_bams;
int contig_id = bams[0]->contig_id();
int64_t start = 0, stop = 0;
while (true) {
stop = start;
while (stop < bams.size() && bams[stop]->contig_id() == contig_id) ++stop;
if (stop > start) contig_bams.push_back(Region{start, stop});
if (stop >= bams.size()) break;
contig_id = bams[stop]->contig_id();
start = stop;
}
2025-12-04 22:26:13 +08:00
spdlog::info("{}, {} contig regions", contig_id, contig_bams.size());
for (int i = 0; i < bams.size();) {
int64_t a1 = bams[i]->contig_pos();
int64_t b1 = bams[i]->contig_end_pos();
int64_t a = bams[i]->softclip_start();
int64_t b = bams[i]->softclip_end();
spdlog::info("{}: ({}, {}), ({}, {})", bams[i]->query_name(), a1, b1, a, b);
++i;
}
// 依次处理每个contig的bams
vector<uint32_t> bitmap(100, 0); // 用来表示known sites覆盖情况的bitmap
for (const auto& cr : contig_bams) {
spdlog::info(" contig id: {}, bam count: {}, bitmap size: {}", contig_id, cr.end - cr.start, bitmap.size());
// 当前处理的contig
int contig_id = bams[cr.start]->contig_id();
int64_t region_start = bams[cr.start]->softclip_start();
int64_t region_end = bams[cr.end - 1]->softclip_end();
if ((bitmap.size() << 5)) {
}
}
#endif
2025-12-04 22:26:13 +08:00
// 2. 开辟一个uint32_t的数组作为bitmap如果上一轮的不够就重开用来表示region的每个位点是否有known sites覆盖每轮使用前需清零
// 3. 读取在region范围内的所有known sites并为对应的bitmap设定0 or 1 (作为skip标识)
// 4. 遍历bams数组中的每一条记录并进行处理
readNumSum += readNum;
inBamBuf.ClearAll(); //
2025-12-04 22:26:13 +08:00
}
spdlog::info("read count: {}", readNumSum);
return 0;
}
// 需要支持vcf idxtbicsi三种索引方式
// vcf和idx是一对
// vcf.gz和tbi或csi是一对
// entrance of mark BQSR
2025-12-04 22:26:13 +08:00
int BaseRecalibrator() {
2025-11-23 23:03:37 +08:00
PROF_START(whole_process);
/* bam */
nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
2025-12-04 22:26:13 +08:00
// 初始化AuxVar
nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS);
for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) {
nsgv::gAuxVars[i].header = nsgv::gInBamHeader;
nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str());
if (nsgv::gAuxVars[i].faidx == 0)
error("[%s] fail to load the fasta index.\n", __func__);
for (auto &vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) {
nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader));
}
}
2025-11-23 23:03:37 +08:00
// (libraryId)
nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0);
2025-12-04 22:26:13 +08:00
/* 并行读取bam数据 */
2025-11-23 23:03:37 +08:00
htsThreadPool htsPoolRead = {NULL, 0}; //
htsThreadPool htsPoolWrite = {NULL, 0}; //
htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
htsPoolWrite.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
if (!htsPoolRead.pool || !htsPoolWrite.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
2025-12-04 22:26:13 +08:00
return SerialBQSR();
2025-11-23 23:03:37 +08:00
// 读取known sites vcfs
for (const auto& ks : nsgv::gBqsrArg.KNOWN_SITES_VCFS) {
spdlog::info(" {}", ks);
bcf_srs_t* srs = bcf_sr_init();
if (!bcf_sr_add_reader(srs, ks.c_str()))
error("Failed to read from %s: %s\n", !strcmp("-", ks.c_str()) ? "standard input" : ks.c_str(), bcf_sr_strerror(srs->errnum));
nsgv::gKnownSitesVcfSrs.push_back(srs);
while (bcf_sr_next_line(srs)) {
bcf1_t* line = srs->readers[0].buffer[0];
cout << line->pos << '\t' << line->rlen << '\t' << line->n_allele << '\t' << line->n_info << endl;
}
}
/* 先实现串行的bqsr-phase-1 */
2025-12-04 22:26:13 +08:00
2025-11-23 23:03:37 +08:00
sam_close(nsgv::gInBamFp);
PROF_END(gprof[GP_whole_process], whole_process);
return 0;
}