FastBQSR/src/bqsr/bqsr_entry.cpp

358 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*
Description:
bambambam
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/10/23
*/
#include <header.h>
#include <htslib/faidx.h>
#include <htslib/kstring.h>
#include <htslib/sam.h>
#include <htslib/synced_bcf_reader.h>
#include <htslib/thread_pool.h>
#include <spdlog/spdlog.h>
#include <iomanip>
#include <numeric>
#include <queue>
#include <vector>
#include "aux_arg.h"
#include "baq.h"
#include "bqsr_args.h"
#include "covariate.h"
#include "fastbqsr_version.h"
#include "quant_info.h"
#include "read_recal_info.h"
#include "recal_datum.h"
#include "recal_funcs.h"
#include "recal_tables.h"
#include "recal_utils.h"
#include "util/bam_buf.h"
#include "util/base_utils.h"
#include "util/debug.h"
#include "util/interval.h"
#include "util/linear_index.h"
#include "util/math/math_utils.h"
#include "util/profiling.h"
#include "util/read_transformer.h"
#include "util/sam_data.h"
#include "util/stable_array.h"
#include "util/utils.h"
#include "util/vcf_parser.h"
using std::deque;
#define BAM_BLOCK_SIZE 16L * 1024 * 1024 // 16M
namespace nsgv {
// 全局变量 for bqsr
BQSRArg gBqsrArg; // bqsr arguments
samFile* gInBamFp; // input BAM file pointer
sam_hdr_t* gInBamHeader; // input BAM header
vector<AuxVar> gAuxVars; // auxiliary variables保存一些文件数据等每个线程对应一个
}; // namespace nsgv
// 过滤掉bqsr过程不符合要求的bam数据
bool bqsrReadFilterOut(const bam1_t *b) {
// 过滤掉unmapped的read
if (b->core.qual == 0) // mapping quality 0
return true;
if (b->core.qual == 255) // mapping quality not available
return true;
if (b->core.flag & BAM_FUNMAP || b->core.tid == -1 || b->core.pos == -1) { // unmapped
return true;
}
if (b->core.flag & BAM_FSECONDARY) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FDUP) { // secondary alignment
return true;
}
if (b->core.flag & BAM_FQCFAIL) { // Not passing quality controls
return true;
}
return false;
}
// 数据总结
void collapseQualityScoreTableToReadGroupTable(Array2D<RecalDatum> &byReadGroupTable, Array3D<RecalDatum> &byQualTable) {
// 遍历quality table
_Foreach3DK(byQualTable, qualDatum, {
if (qualDatum.numObservations > 0) {
byReadGroupTable(k1, k2).combine(qualDatum);
}
});
}
/**
* To replicate the results of BQSR whether or not we save tables to disk (which we need in Spark),
* we need to trim the numbers to a few decimal placed (that's what writing and reading does).
*/
void roundTableValues(RecalTables& rt) {
#define _round_val(val) \
do { \
if (val.numObservations > 0) { \
val.numMismatches = MathUtils::RoundToNDecimalPlaces(val.numMismatches, RecalUtils::NUMBER_ERRORS_DECIMAL_PLACES); \
val.reportedQuality = MathUtils::RoundToNDecimalPlaces(val.reportedQuality, RecalUtils::REPORTED_QUALITY_DECIMAL_PLACES); \
} \
} while (0)
_Foreach2D(rt.readGroupTable, val, { _round_val(val); });
_Foreach3D(rt.qualityScoreTable, val, { _round_val(val); });
_Foreach4D(rt.contextTable, val, { _round_val(val); });
_Foreach4D(rt.cycleTable, val, { _round_val(val); });
}
// 串行bqsr
int SerialBQSR() {
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut);
int64_t readNumSum = 0;
int round = 0;
PerReadCovariateMatrix readCovariates;
CovariateUtils::InitPerReadCovMat(readCovariates);
RecalTables& recalTables = nsgv::gAuxVars[0].recalTables;
while (true) {
++round;
// 一. 读取bam数据
size_t readNum = 0;
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam();
if (readNum < 1) {
break;
}
auto bams = inBamBuf.GetBamArr();
spdlog::info("{} reads processed in {} round", readNum, round);
// 二. 遍历每个bamread记录进行处理
SamData sd;
StableArray<int> isSNP, isIns, isDel; // 该位置是否是SNP, indel位置0不是1是
StableArray<uint8_t> baqArray;
StableArray<double> snpErrors, insErrors, delErrors;
StableArray<uint8_t> skips; // 该位置是否是已知位点
for (int i = 0; i < bams.size(); ++i) {
// 1. 对每个read需要检查cigar是否合法即没有两个连续的相同的cigar而且需要将首尾的deletion处理掉目前看好像没啥影响我们忽略这一步
// 2. 对质量分数长度跟碱基长度不匹配的read缺少的质量分数用默认值补齐先忽略后边有需要再处理
// 3. 如果bam文件之前做过bqsrtag中包含OQoriginnal
// quality原始质量分数检查用户参数里是否指定用原始质量分数进行bqsr如果是则将质量分数替换为OQ否则忽略OQ先忽略 spdlog::info("bam
// idx: {}", i);
BamWrap* bw = bams[i];
sd.init();
sd.parseBasic(bw);
sd.rid = i + readNumSum;
if (sd.read_len <= 0)
continue;
PROF_START(clip_read);
// 4. 对read的两端进行检测去除hardclipadapter
ReadTransformer::hardClipAdaptorSequence(bw, sd);
if (sd.read_len <= 0)
continue;
// 5. 然后再去除softclip部分
ReadTransformer::hardClipSoftClippedBases(bw, sd);
if (sd.read_len <= 0)
continue;
// 应用所有的变换计算samdata的相关信息
sd.applyTransformations();
PROF_END(gprof[GP_clip_read], clip_read);
// 6. 更新每个read的platform信息好像没啥用暂时忽略
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(nsgv::gAuxVars[0], sd, isSNP, isIns, isDel);
/*fprintf(gf[0], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]);
fprintf(gf[0], "\n");
fprintf(gf[1], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]);
fprintf(gf[1], "\n");
fprintf(gf[2], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]);
fprintf(gf[2], "\n");
*/
// 7. 计算baqArray
// BAQ = base alignment quality
// note for efficiency reasons we don't compute the BAQ array unless we actually have
// some error to marginalize over. For ILMN data ~85% of reads have no error
// vector<uint8_t> baqArray;
bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = BAQ::flatBAQArray(sd, baqArray);
} else {
// baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray);
}
if (!baqCalculated)
continue;
// 到这里基本的数据都准备好了后续就是进行bqsr的统计了
// 8. 计算这条read对应的协变量
PROF_START(covariate);
CovariateUtils::ComputeCovariates(sd, nsgv::gInBamHeader, readCovariates, true);
PROF_END(gprof[GP_covariate], covariate);
// fprintf(gf[3], "%ld %ld %d %ld\n", sd.rid, readCovariates.size(), sd.read_len, readCovariates[0][0].size());
// for (auto &arr1 : readCovariates) {
// for (size_t si = 0; si < sd.read_len; ++si) {
// for (auto &val : arr1[si]) {
// fprintf(gf[3], "%d ", val);
// }
// }
// }
// fprintf(gf[3], "\n");
// fprintf(gf[3], "%ld %d\n", sd.rid, sd.read_len);
// {
// auto& arr1 = readCovariates[0];
// {
// for (int pos = 0; pos < sd.read_len; ++pos) {
// fprintf(gf[3], "%d %d\n", pos, arr1[pos][2]);
// }
// }
// }
// fprintf(gf[3], "\n");
// 9. 计算这条read需要跳过的位置
PROF_START(read_vcf);
RecalFuncs::calculateKnownSites(sd, nsgv::gAuxVars[0].vcfArr, nsgv::gInBamHeader, skips);
for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
}
PROF_GP_END(read_vcf);
// fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len);
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0);
// fprintf(gf[0], "\n");
// 10. 根据BAQ进一步处理snpindel得到处理后的数据
PROF_START(frac_err);
RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors);
RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors);
RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors);
PROF_GP_END(frac_err);
// aggregate all of the info into our info object, and update the data
// 11. 合并之前计算的数据得到info并更新bqsr table数据
ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors);
PROF_START(update_info);
RecalUtils::updateRecalTablesForRead(info, recalTables);
PROF_END(gprof[GP_update_info], update_info);
}
readNumSum += readNum;
inBamBuf.ClearAll(); //
}
spdlog::info("read count: {}", readNumSum);
// 12. 创建总结数据
collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable);
roundTableValues(recalTables);
// 13. 量化质量分数
QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);
// 14. 输出结果
RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables);
return 0;
}
// 在进行数据处理之前,初始化一些全局数据
static void globalInit() {
open_debug_files();
/* bam */
nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
exit(1);
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
/* 并行读取bam数据 */
htsThreadPool htsPoolRead = {NULL, 0}; //
htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
if (!htsPoolRead.pool ) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
exit(1);
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
if (!nsgv::gInBamHeader->hrecs) {
if (sam_hdr_fill_hrecs(nsgv::gInBamHeader) != 0) {
spdlog::error("[{}] failed to read sam header", __LINE__);
sam_close(nsgv::gInBamFp);
exit(1);
}
}
// 1. 协变量数据相关初始化
ContextCovariate::InitContextCovariate(nsgv::gBqsrArg);
CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg);
// 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue
// nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg);
// 初始化AuxVar
nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS);
for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) {
nsgv::gAuxVars[i].header = nsgv::gInBamHeader;
nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str());
if (nsgv::gAuxVars[i].faidx == 0)
error("[%s] fail to load the fasta index.\n", __func__);
for (auto& vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) {
nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader));
}
// 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue
nsgv::gAuxVars[i].recalTables.init(nsgv::gInBamHeader->hrecs->nrg);
}
// 0. 初始化一些全局数据
// BAQ baq{BAQ::DEFAULT_GOP};
BAQ::StaticInit();
RecalDatum::StaticInit();
QualityUtils::StaticInit();
MathUtils::StaticInit();
// 初始化需要计算的event types
RecalUtils::initEventTypes(nsgv::gBqsrArg.computeIndelBQSRTables);
// 2. 读取bam的read group
if (nsgv::gInBamHeader->hrecs->nrg == 0) {
spdlog::error("No RG tag found in the header!");
exit(1);
}
for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) {
// spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name);
ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i;
ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name;
}
}
// 全局资源释放
static void globalDestroy() {
close_debug_files();
}
// entrance of mark BQSR
int BaseRecalibrator() {
int ret = 0;
PROF_START(whole_process);
globalInit();
ret = SerialBQSR(); // 串行处理数据生成recal table
globalDestroy();
sam_close(nsgv::gInBamFp);
PROF_END(gprof[GP_whole_process], whole_process);
return ret;
}