/* Description: bam,bam,bam Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include #include #include #include #include #include #include #include #include #include #include #include #include "aux_arg.h" #include "baq.h" #include "bqsr_args.h" #include "covariate.h" #include "fastbqsr_version.h" #include "quant_info.h" #include "read_recal_info.h" #include "recal_datum.h" #include "recal_funcs.h" #include "recal_tables.h" #include "recal_utils.h" #include "util/bam_buf.h" #include "util/base_utils.h" #include "util/debug.h" #include "util/interval.h" #include "util/linear_index.h" #include "util/math/math_utils.h" #include "util/profiling.h" #include "util/read_transformer.h" #include "util/sam_data.h" #include "util/stable_array.h" #include "util/utils.h" #include "util/vcf_parser.h" using std::deque; #define BAM_BLOCK_SIZE 16L * 1024 * 1024 // 16M namespace nsgv { // 全局变量 for bqsr BQSRArg gBqsrArg; // bqsr arguments samFile* gInBamFp; // input BAM file pointer sam_hdr_t* gInBamHeader; // input BAM header vector gAuxVars; // auxiliary variables,保存一些文件,数据等,每个线程对应一个 }; // namespace nsgv // 过滤掉bqsr过程不符合要求的bam数据 bool bqsrReadFilterOut(const bam1_t *b) { // 过滤掉unmapped的read if (b->core.qual == 0) // mapping quality 0 return true; if (b->core.qual == 255) // mapping quality not available return true; if (b->core.flag & BAM_FUNMAP || b->core.tid == -1 || b->core.pos == -1) { // unmapped return true; } if (b->core.flag & BAM_FSECONDARY) { // secondary alignment return true; } if (b->core.flag & BAM_FDUP) { // secondary alignment return true; } if (b->core.flag & BAM_FQCFAIL) { // Not passing quality controls return true; } return false; } // 数据总结 void collapseQualityScoreTableToReadGroupTable(Array2D &byReadGroupTable, Array3D &byQualTable) { // 遍历quality table _Foreach3DK(byQualTable, qualDatum, { if (qualDatum.numObservations > 0) { byReadGroupTable(k1, k2).combine(qualDatum); } }); } /** * To replicate the results of BQSR whether or not we save tables to disk (which we need in Spark), * we need to trim the numbers to a few decimal placed (that's what writing and reading does). */ void roundTableValues(RecalTables& rt) { #define _round_val(val) \ do { \ if (val.numObservations > 0) { \ val.numMismatches = MathUtils::RoundToNDecimalPlaces(val.numMismatches, RecalUtils::NUMBER_ERRORS_DECIMAL_PLACES); \ val.reportedQuality = MathUtils::RoundToNDecimalPlaces(val.reportedQuality, RecalUtils::REPORTED_QUALITY_DECIMAL_PLACES); \ } \ } while (0) _Foreach2D(rt.readGroupTable, val, { _round_val(val); }); _Foreach3D(rt.qualityScoreTable, val, { _round_val(val); }); _Foreach4D(rt.contextTable, val, { _round_val(val); }); _Foreach4D(rt.cycleTable, val, { _round_val(val); }); } // 打印recal tables,用于调试 static void printRecalTables(const RecalTables& rt) { _Foreach2D(rt.readGroupTable, val, { if (val.numObservations > 0) { fprintf(gf[0], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach3D(rt.qualityScoreTable, val, { if (val.numObservations > 0) { fprintf(gf[1], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach4D(rt.contextTable, val, { if (val.numObservations > 0) { fprintf(gf[2], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach4D(rt.cycleTable, val, { if (val.numObservations > 0) { fprintf(gf[3], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); } // 串行bqsr int SerialBQSR(AuxVar &aux) { BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); int64_t readNumSum = 0; int round = 0; PerReadCovariateMatrix &readCovariates = aux.readCovariates; RecalTables& recalTables = aux.recalTables; SamData& sd = aux.sd; StableArray&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置,0不是,1是 StableArray &baqArray = aux.baqArray; StableArray &snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; StableArray &skips = aux.skips; // 该位置是否是已知位点 while (true) { ++round; // 一. 读取bam数据 size_t readNum = 0; if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); if (readNum < 1) { break; } auto bams = inBamBuf.GetBamArr(); spdlog::info("{} reads processed in {} round", readNum, round); // 二. 遍历每个bam(read)记录,进行处理 for (int i = 0; i < bams.size(); ++i) { // 1. // 对每个read,需要检查cigar是否合法,即没有两个连续的相同的cigar,而且需要将首尾的deletion处理掉,目前看好像没啥影响,我们忽略这一步 // 2. 对质量分数长度跟碱基长度不匹配的read,缺少的质量分数用默认值补齐,先忽略,后边有需要再处理 // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal // quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 // spdlog::info("bam idx: {}", i); BamWrap* bw = bams[i]; sd.init(); sd.parseBasic(bw); sd.rid = i + readNumSum; if (sd.read_len <= 0) continue; PROF_START(clip_read); // 4. 对read的两端进行检测,去除(hardclip)adapter ReadTransformer::hardClipAdaptorSequence(bw, sd); if (sd.read_len <= 0) continue; // 5. 然后再去除softclip部分 ReadTransformer::hardClipSoftClippedBases(bw, sd); if (sd.read_len <= 0) continue; // 应用所有的变换,计算samdata的相关信息 sd.applyTransformations(); PROF_END(gprof[GP_clip_read], clip_read); //const char* qname = bam_get_qname(sd.bw->b); // fprintf(gf[4], "%ld %d %d %d\n", sd.rid, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos)); //fprintf(gf[4], "%s %d %d %d %d\n", qname, sd.bw->b->core.flag, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos)); // 6. 更新每个read的platform信息,好像没啥用,暂时忽略 // 这里计算snp和indel有点问题,snp和del结果不对,白天调试一下 const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); // fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[4], "%d ", skips[ii] ? 1 : 0); // fprintf(gf[4], "\n"); // fprintf(gf[0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); fprintf(gf[0], "\n"); // fprintf(gf[1], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]); fprintf(gf[1], "\n"); // fprintf(gf[2], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]); fprintf(gf[2], "\n"); // 7. 计算baqArray // BAQ = base alignment quality // note for efficiency reasons we don't compute the BAQ array unless we actually have // some error to marginalize over. For ILMN data ~85% of reads have no error // vector baqArray; bool baqCalculated = false; if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { baqCalculated = BAQ::flatBAQArray(sd, baqArray); } else { // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray); } if (!baqCalculated) continue; // 到这里,基本的数据都准备好了,后续就是进行bqsr的统计了 // 8. 计算这条read对应的协变量 PROF_START(covariate); CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); PROF_END(gprof[GP_covariate], covariate); // fprintf(gf[4], "%ld %d\n", sd.rid, sd.read_len); // for (auto &arr1 : readCovariates) { // for (size_t si = 0; si < sd.read_len; ++si) { // fprintf(gf[4], "%d %d %d %d ", arr1[si].readGroup, arr1[si].baseQuality, arr1[si].context, arr1[si].cycle); // } // } // fprintf(gf[4], "\n"); // fprintf(gf[3], "%ld %d\n", sd.rid, sd.read_len); // { // auto& arr1 = readCovariates[0]; // { // for (int pos = 0; pos < sd.read_len; ++pos) { // fprintf(gf[3], "%d %d\n", pos, arr1[pos][2]); // } // } // } // fprintf(gf[3], "\n"); // 9. 计算这条read需要跳过的位置 PROF_START(read_vcf); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); for (int ii = 0; ii < sd.read_len; ++ii) { skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; } //stringstream ss; //for (auto s : skips) ss << (int)s << ' '; //spdlog::info("{}", ss.str()); PROF_GP_END(read_vcf); // fprintf(gf[3], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[3], "%d ", skips[ii] ? 1 : 0); // fprintf(gf[3], "\n"); // fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0); // fprintf(gf[0], "\n"); // 10. 根据BAQ进一步处理snp,indel,得到处理后的数据 PROF_START(frac_err); RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); PROF_GP_END(frac_err); // aggregate all of the info into our info object, and update the data // 11. 合并之前计算的数据,得到info,并更新bqsr table数据 ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); PROF_START(update_info); RecalUtils::updateRecalTablesForRead(info, recalTables); PROF_END(gprof[GP_update_info], update_info); } readNumSum += readNum; inBamBuf.ClearAll(); // } spdlog::info("read count: {}", readNumSum); // 12. 创建总结数据 collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); roundTableValues(recalTables); #if 0 printRecalTables(recalTables); #endif // 13. 量化质量分数 QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); // 14. 输出结果 RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables); return 0; } // 多线程处理bam数据, tmd是乱序的? static void thread_worker(void* data, long idx, int tid, int steal) { // static void thread_worker(void* data, long idx, int tid) { AuxVar& aux = (*(vector*)data)[tid]; auto& readCovariates = aux.readCovariates; RecalTables& recalTables = aux.recalTables; SamData& sd = aux.sd; StableArray&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置,0不是,1是 StableArray& baqArray = aux.baqArray; StableArray&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; StableArray& skips = aux.skips; // 该位置是否是已知位点 auto &bams = *aux.bamArr; if (steal) for (auto& vcf : aux.vcfArr) vcf.knownSites.clear(); #if 1 int startIdx = idx * aux.BAM_BLOCK_NUM; int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size()); #else int blockReadNums = (bams.size() + nsgv::gAuxVars.size() - 1) / nsgv::gAuxVars.size(); int startIdx = idx * blockReadNums; int stopIdx = std::min((size_t)(idx + 1) * blockReadNums, bams.size()); #endif aux.threadProcessedReads += stopIdx - startIdx; for (int i = startIdx; i < stopIdx; ++i) { // spdlog::info("Thread {} processing read idx: {}", tid, i); BamWrap* bw = bams[i]; sd.init(); sd.parseBasic(bw); sd.rid = i + aux.processedReads; if (sd.read_len <= 0) continue; //PROF_START(clip_read); ReadTransformer::hardClipAdaptorSequence(bw, sd); if (sd.read_len <= 0) continue; ReadTransformer::hardClipSoftClippedBases(bw, sd); if (sd.read_len <= 0) continue; sd.applyTransformations(); // PROF_END(gprof[GP_clip_read], clip_read); const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); bool baqCalculated = false; if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { baqCalculated = BAQ::flatBAQArray(sd, baqArray); } else { // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray); } if (!baqCalculated) continue; // PROF_START(covariate); CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); // PROF_END(gprof[GP_covariate], covariate); // PROF_START(read_vcf); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); for (int ii = 0; ii < sd.read_len; ++ii) { skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; } // PROF_GP_END(read_vcf); #if 0 int fidx = 0 + 2 * tid; //if (sd.rid % 2 == 0) fidx = 0 + 2 * tid; //else fidx = 1 + 2 * tid; fprintf(gf[fidx], "%ld %d\t", sd.rid, sd.read_len); for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[fidx], "%d ", skips[ii] ? 1 : 0); fprintf(gf[fidx], "\n"); #endif // PROF_START(frac_err); RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); // PROF_GP_END(frac_err); ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); //PROF_START(update_info); RecalUtils::updateRecalTablesForRead(info, recalTables); //PROF_END(gprof[GP_update_info], update_info); } } // 并行bqsr int ParallelBQSR(vector& auxArr) { BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); int64_t readNumSum = 0; int round = 0; while (true) { ++round; // 一. 读取bam数据 size_t readNum = 0; if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); if (readNum < 1) { break; } auto bams = inBamBuf.GetBamArr(); for_each(auxArr.begin(), auxArr.end(), [&](AuxVar& aux) { aux.bamArr = &bams; }); spdlog::info("{} reads processed in {} round", readNum, round); #if 1 kt_for_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM); #else kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size()); #endif readNumSum += readNum; AuxVar::processedReads += readNum; inBamBuf.ClearAll(); // } spdlog::info("read count: {}", readNumSum); // 合并各个线程的结果 RecalTables& recalTables = auxArr[0].recalTables; // printRecalTables(recalTables); for (int i = 0; i < auxArr.size(); ++i) spdlog::info("thread {} processed reads {}.", i, auxArr[i].threadProcessedReads); for (int i = 1; i < auxArr.size(); ++i) { auxArr[0].threadProcessedReads += auxArr[i].threadProcessedReads; _Foreach3DK(auxArr[i].recalTables.qualityScoreTable, qualDatum, { if (qualDatum.numObservations > 0) { recalTables.qualityScoreTable(k1, k2, k3).increment(qualDatum); } }); _Foreach4DK(auxArr[i].recalTables.contextTable, contextDatum, { if (contextDatum.numObservations > 0) { recalTables.contextTable(k1, k2, k3, k4).increment(contextDatum); } }); _Foreach4DK(auxArr[i].recalTables.cycleTable, cycleDatum, { if (cycleDatum.numObservations > 0) { recalTables.cycleTable(k1, k2, k3, k4).increment(cycleDatum); } }); } spdlog::info("All processed reads {}.", auxArr[0].threadProcessedReads); // 创建总结数据 collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); roundTableValues(recalTables); printRecalTables(recalTables); // 量化质量分数 QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); // 输出结果 RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables); return 0; } // 在进行数据处理之前,初始化一些全局数据 static void globalInit() { open_debug_files(); /* bam */ nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr); if (!nsgv::gInBamFp) { spdlog::error("[{}] load sam/bam file failed.\n", __func__); exit(1); } hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header /* 并行读取bam数据 */ htsThreadPool htsPoolRead = {NULL, 0}; // int readThreadNum = min(nsgv::gBqsrArg.NUM_THREADS, 8); htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); if (!htsPoolRead.pool ) { spdlog::error("[{}] failed to set up thread pool", __LINE__); sam_close(nsgv::gInBamFp); exit(1); } hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); if (!nsgv::gInBamHeader->hrecs) { if (sam_hdr_fill_hrecs(nsgv::gInBamHeader) != 0) { spdlog::error("[{}] failed to read sam header", __LINE__); sam_close(nsgv::gInBamFp); exit(1); } } // 1. 协变量数据相关初始化 ContextCovariate::InitContextCovariate(nsgv::gBqsrArg); CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg); // 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue // nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg); // 初始化AuxVar nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS); for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) { nsgv::gAuxVars[i].header = nsgv::gInBamHeader; nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str()); if (nsgv::gAuxVars[i].faidx == 0) error("[%s] fail to load the fasta index.\n", __func__); for (auto& vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader)); } // 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue nsgv::gAuxVars[i].recalTables.init(nsgv::gInBamHeader->hrecs->nrg); CovariateUtils::InitPerReadCovMat(nsgv::gAuxVars[i].readCovariates); } // 0. 初始化一些全局数据 // BAQ baq{BAQ::DEFAULT_GOP}; BAQ::StaticInit(); RecalDatum::StaticInit(); QualityUtils::StaticInit(); MathUtils::StaticInit(); BaseUtils::StaticInit(); // 初始化需要计算的event types RecalUtils::initEventTypes(nsgv::gBqsrArg.computeIndelBQSRTables); // 2. 读取bam的read group if (nsgv::gInBamHeader->hrecs->nrg == 0) { spdlog::error("No RG tag found in the header!"); exit(1); } for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) { // spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name); ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i; ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name; } } // 全局资源释放 static void globalDestroy() { close_debug_files(); } // entrance of mark BQSR int BaseRecalibrator() { int ret = 0; PROF_START(whole_process); globalInit(); if (nsgv::gBqsrArg.NUM_THREADS == 1) ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据,生成recal table else ret = ParallelBQSR(nsgv::gAuxVars); // 并行处理数据,生成recal table globalDestroy(); sam_close(nsgv::gInBamFp); PROF_END(gprof[GP_whole_process], whole_process); return ret; }