FastBQSR/src/bqsr/apply_bqsr_entry.cpp

336 lines
14 KiB
C++
Raw Normal View History

// 一. 读取并解析recalibrate tables
// 二. 循环处理每一个read
// 1. 计算read的协变量covs
// 2. 根据read的协变量信息从recal tables获取对应的数据
// 3. 根据recal tables数据对read的每个碱基分别计算新的质量分数
// 4. 将计算后的质量分数赋值给read
#include <header.h> // in htslib
#include <htslib/sam.h>
#include <htslib/thread_pool.h>
#include "aux_arg.h"
#include "common_data.h"
#include "util/debug.h"
#include "util/profiling.h"
#include "recal_utils.h"
#include "recal_funcs.h"
#include "read_utils.h"
using std::vector;
namespace nsgv {
// 全局变量 for apply bqsr
samFile* gOutBamFp; // 输出文件, sam或者bam格式
sam_hdr_t* gOutBamHeader; // 输出文件的header
RecalTables gRecalTables; // 保留一个全局的recalibrate tables就行了
vector<uint8_t> gQuantizedQuals; // 读取quantized info table信息得到的第三列数据
StableArray<bam1_t*> gRecalBams[2]; // 保存已经校正过质量分数的bam数据双buffer
}; // namespace nsgv
// 串行apply bqsr
int SerialApplyBQSR(AuxVar &aux) {
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, RecalFuncs::applyBqsrReadFilterOut);
int64_t readNumSum = 0;
int round = 0;
PerReadCovariateMatrix& readCovariates = aux.readCovariates;
// 一. 读取并解析recalibrate tables
auto& recalTables = *aux.bqsrTable;
// 全局的校正后的bam数组
auto& recalBams = nsgv::gRecalBams[0];
auto& sd = aux.sd;
while (true) {
++round;
// 一. 读取bam数据
size_t readNum = 0;
PROF_START(GP_read);
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam();
PROF_GP_END(GP_read);
if (readNum < 1) {
break;
}
auto bams = inBamBuf.GetBamArr();
spdlog::info("{} reads processed in {} round", readNum, round);
if (recalBams.size() < bams.size()) {
int start = recalBams.size();
recalBams.resize(bams.size());
for (int i = start; i < recalBams.size(); ++i) {
recalBams[i] = bam_init1();
}
}
// 二. 遍历每个bamread记录进行处理
for (int i = 0; i < bams.size(); ++i) {
if (bam_copy1(recalBams[i], bams[i]->b) == NULL) {
spdlog::error("Copy bam error");
exit(1);
}
BamWrap* bw = bams[i];
bw->b = recalBams[i]; // 注意这里的赋值然后就可以对b进行更改了
sd.init();
sd.parseForApplyBQSR(bw);
sd.rid = i + aux.processedReads;
// 1. 是否使用original quality来代替当前的quality
if (nsgv::gBqsrArg.useOriginalBaseQualities)
ReadUtils::resetOriginalBaseQualities(bw->b);
// 2. 是否将当前的quality保存在tag OQ中
if (nsgv::gBqsrArg.emitOriginalQuals)
ReadUtils::setOriginalBaseQualsIfNoOQ(bw->b);
// 3. 计算read的协变量covs
PROF_START(GP_covariate);
CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, nsgv::gBqsrArg.computeIndelBQSRTables, 0);
PROF_GP_END(GP_covariate);
// clear indel qualities
ReadUtils::removeAttribute(bw->b, "BI");
ReadUtils::removeAttribute(bw->b, "BD");
// 4. 检查read的readGroup tag是否包含在bqsr table里
const char* readGroupId = ReadUtils::getReadGroupId(bw->b);
auto& covaritesForRead = readCovariates[EventType::BASE_SUBSTITUTION.index];
uint8_t* recalibratedQuals = bam_get_qual(bw->b);
auto& preUpdateQuals = sd.base_quals;
int rgKey = -1;
if (ReadGroupCovariate::RgToId.find(std::string(readGroupId)) != ReadGroupCovariate::RgToId.end())
rgKey = ReadGroupCovariate::RgToId[std::string(readGroupId)];
if (rgKey == -1) {
if (nsgv::gBqsrArg.allowMissingReadGroups) {
// Given the way the recalibration code is implemented below, we cannot recalibrate a read with a
// read group that's not in the recal table.
for (int i = 0; i < sd.read_len; i++) {
//recalibratedQuals[i] = staticQuantizedMapping != null ? staticQuantizedMapping[preUpdateQuals[i]] : quantizedQuals.get(preUpdateQuals[i]);
recalibratedQuals[i] = nsgv::gQuantizedQuals[preUpdateQuals[i]];
}
} else {
spdlog::error(
"Read group {} not found in the recalibration table. Use \"--allow-missing-read-group\" command line argument to ignore this "
"error.",
readGroupId);
exit(1);
}
}
// 5. 根据recal tables数据对read的每个碱基分别计算新的质量分数
auto& readGroupDatum = recalTables.readGroupTable(EventType::BASE_SUBSTITUTION.index, rgKey);
// Note: this loop is under very heavy use in applyBQSR. Keep it slim.
for (int offset = 0; offset < sd.read_len; offset++) { // recalibrate all bases in the read
// only recalibrate usable qualities (default: >= 6) (the original quality will come from the instrument -- reported quality)
if (recalibratedQuals[offset] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN) {
continue;
}
auto& covs = readCovariates[EventType::BASE_SUBSTITUTION.index][offset];
// 根据read的协变量数据获取对应的bqsr数据
auto& qualityScoreDatum = recalTables.qualityScoreTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality);
auto& contextDatum = recalTables.contextTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality, covs.context);
auto& cycleDatum = recalTables.cycleTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality, covs.cycle);
// 计算校正后的质量分数
double priorQualityScore =
nsgv::gBqsrArg.globalQScorePrior > 0.0 ? nsgv::gBqsrArg.globalQScorePrior : readGroupDatum.getReportedQuality();
double rawRecalibratedQualityScore =
RecalFuncs::hierarchicalBayesianQualityEstimate(priorQualityScore, readGroupDatum, qualityScoreDatum, contextDatum, cycleDatum);
uint8_t qualIdx = RecalFuncs::getBoundedIntegerQual(rawRecalibratedQualityScore);
uint8_t quantizedQualityScore = qualIdx; // nsgv::gQuantizedQuals.at(qualIdx);
// TODO: as written the code quantizes *twice* if the static binning is enabled (first time to the dynamic bin). It should be
// quantized once.
// recalibratedQuals[offset] = staticQuantizedMapping == null ? quantizedQualityScore : staticQuantizedMapping[quantizedQualityScore];
recalibratedQuals[offset] = quantizedQualityScore;
}
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
spdlog::error("failed writing sam record to \"{}\"", nsgv::gBqsrArg.OUTPUT_FILE.c_str());
sam_close(nsgv::gInBamFp);
sam_close(nsgv::gOutBamFp);
exit(1);
}
//break;
}
// break;
readNumSum += readNum;
AuxVar::processedReads += readNum;
inBamBuf.ClearAll(); // 清空bam buf
}
spdlog::info("read count: {}", readNumSum);
return 0;
}
// 并行 apply bqsr
int ParallelApplyBQSR(vector<AuxVar> &auxArr) {
return 0;
}
// 在进行数据处理之前,初始化一些全局数据
static void globalInit() {
open_debug_files();
/* bam */
nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
exit(1);
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
/* 并行读取bam数据 */
htsThreadPool htsPoolRead = {NULL, 0}; //
int readThreadNum = min(nsgv::gBqsrArg.NUM_THREADS, BAM_READ_MAX_THREAD);
htsPoolRead.pool = hts_tpool_init(readThreadNum);
if (!htsPoolRead.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
exit(1);
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
if (!nsgv::gInBamHeader->hrecs) {
if (sam_hdr_fill_hrecs(nsgv::gInBamHeader) != 0) {
spdlog::error("[{}] failed to read sam header", __LINE__);
sam_close(nsgv::gInBamFp);
exit(1);
}
}
/* 并行写入bam文件 */
char modeout[12] = "wb";
sam_open_mode(modeout + 1, nsgv::gBqsrArg.OUTPUT_FILE.c_str(), NULL);
nsgv::gOutBamFp = sam_open(nsgv::gBqsrArg.OUTPUT_FILE.c_str(), modeout);
if (!nsgv::gOutBamFp) {
spdlog::error("[{}] create output sam/bam file failed.\n", __func__);
exit(1);
}
nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader);
// 修改输出文件的header
sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", PROGRAM_NAME, "VN", FASTBQSR_VERSION, "CL", nsgv::gBqsrArg.CLI_STR.c_str(), NULL);
// 用不同的线程数量处理输出文件
htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池
int writeThreadNum = min(nsgv::gBqsrArg.NUM_THREADS, BAM_WRITE_MAX_THREAD);
htsPoolWrite.pool = hts_tpool_init(writeThreadNum);
if (!htsPoolWrite.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
exit(1);
}
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite);
if (sam_hdr_write(nsgv::gOutBamFp, nsgv::gOutBamHeader) != 0) {
spdlog::error("failed writing header to \"{}\"", nsgv::gBqsrArg.OUTPUT_FILE);
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
exit(1);
}
// 输出index文件
nsgv::gBqsrArg.INDEX_FILE = nsgv::gBqsrArg.OUTPUT_FILE + ".bai"; // min_shift = 0 是bai格式
if ("sam" == Utils::getFileExtension(nsgv::gBqsrArg.OUTPUT_FILE) || !nsgv::gBqsrArg.CREATE_INDEX) {
nsgv::gBqsrArg.INDEX_FILE = "";
}
if (!nsgv::gBqsrArg.INDEX_FILE.empty()) {
int index_min_shift = 0;
if (nsgv::gBqsrArg.INDEX_FORMAT == nsbqsr::IndexFormat::CSI) {
nsgv::gBqsrArg.INDEX_FILE = nsgv::gBqsrArg.OUTPUT_FILE + ".csi";
index_min_shift = 14;
}
if (sam_idx_init(nsgv::gOutBamFp, nsgv::gOutBamHeader, 0 /*csi 14*/, nsgv::gBqsrArg.INDEX_FILE.c_str()) < 0) {
spdlog::error("failed to open index \"{}\" for writing", nsgv::gBqsrArg.INDEX_FILE);
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
exit(1);
}
}
// 1. 协变量数据相关初始化
ContextCovariate::InitContextCovariate(nsgv::gBqsrArg);
CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg);
// 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue
// nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg);
// 初始化AuxVar
nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS);
for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) {
nsgv::gAuxVars[i].header = nsgv::gInBamHeader;
nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str());
if (nsgv::gAuxVars[i].faidx == 0)
error("[%s] fail to load the fasta index.\n", __func__);
// 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue
nsgv::gAuxVars[i].bqsrTable = &nsgv::gRecalTables;
CovariateUtils::InitPerReadCovMat(nsgv::gAuxVars[i].readCovariates);
}
// 0. 初始化一些全局数据
RecalDatum::StaticInit();
QualityUtils::StaticInit();
MathUtils::StaticInit();
BaseUtils::StaticInit();
// 初始化需要计算的event types
RecalUtils::initEventTypes(nsgv::gBqsrArg.computeIndelBQSRTables);
// 2. 读取bam的read group
// if (nsgv::gInBamHeader->hrecs->nrg == 0) {
// spdlog::error("No RG tag found in the header!");
// exit(1);
// }
/*
// 应该是从bqsr table里读取read group信息
for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) {
// spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name);
ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i;
ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name;
}
*/
// 读取并解析recalibrate tables
RecalUtils::readRecalTables(nsgv::gBqsrArg.BQSR_FILE, nsgv::gBqsrArg, nsgv::gQuantizedQuals, nsgv::gRecalTables);
}
// 全局资源释放
static void globalDestroy() {
close_debug_files();
if (!nsgv::gBqsrArg.INDEX_FILE.empty() && sam_idx_save(nsgv::gOutBamFp) < 0) {
spdlog::error("writing index failed");
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
exit(1);
}
/* 关闭文件,收尾清理 */
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
}
// entrance of BQSR phase-1
int ApplyBQSR() {
int ret = 0;
PROF_START(GP_whole_process);
globalInit();
// if (nsgv::gBqsrArg.NUM_THREADS == 1)
ret = SerialApplyBQSR(nsgv::gAuxVars[0]); // 串行处理数据生成recal bams
// else
// ret = ParallelApplyBQSR(nsgv::gAuxVars); // 并行处理数据生成recal bams
globalDestroy();
PROF_GP_END(GP_whole_process);
return ret;
}