/* Description: bam,bam,bam Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include #include #include #include #include #include #include #include #include #include #include #include "baq.h" #include "bqsr_args.h" #include "bqsr_funcs.h" #include "bqsr_pipeline.h" #include "covariate.h" #include "dup_metrics.h" #include "fastbqsr_version.h" #include "read_name_parser.h" #include "read_recal_info.h" #include "recal_datum.h" #include "recal_tables.h" #include "recal_utils.h" #include "util/interval.h" #include "util/linear_index.h" #include "util/profiling.h" #include "util/utils.h" #include "util/math/math_utils.h" #include "quant_info.h" #include "util/debug.h" using std::deque; #define BAM_BLOCK_SIZE 16L * 1024 * 1024 #define MAX_SITES_INTERVAL 100000 const uint8_t NO_BAQ_UNCERTAINTY = (uint8_t)'@'; const char cBaseToChar[16] = {'N', 'A', 'C', 'N', 'G', 'N', 'N', 'N', 'T', 'N', 'N', 'N', 'N', 'N', 'N', 'N'}; // 解析knownSites struct VCFParser { deque knownSites; // 已知的变异位点 char* buf = nullptr; // // 数据buffer uint32_t bufLen = 4 * 1024; // 数据buffer长度 LinearIndex index; // vcf文件索引 ifstream inStm; // vcf文件流 VCFParser() { Init(); } VCFParser(const string& vcfFileName) { Init(vcfFileName); } VCFParser(const string& vcfFileName, sam_hdr_t* samHeader) { Init(vcfFileName, samHeader); } void Init() { buf = (char*)malloc(bufLen); } void Init(const string& vcfFileName) { Init(); inStm.open(vcfFileName, ifstream::in); string idxFileName = vcfFileName + ".idx"; if (!index.ReadIndex(idxFileName)) error("[%s] fail to load the %s index file\n", __func__, idxFileName.c_str()); } void Init(const string& vcfFileName, sam_hdr_t *samHeader) { index.SetHeader(samHeader); Init(vcfFileName); } }; // 解析后的一些参数,文件,数据等 struct AuxVar { const static int REF_CONTEXT_PAD = 3; // 需要做一些填充 const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列,防止边界效应 sam_hdr_t* header = nullptr; // bam header faidx_t* faidx = nullptr; // reference index char* ref_seq = nullptr; // reference sequence int ref_len = 0; // reference sequence length int offset = 0; // 在要求的ref序列两边,多余取出的碱基数量 vector vcfArr; // 从vcf中获取已知位点 }; namespace nsgv { // 全局变量 for bqsr BQSRArg gBqsrArg; // bqsr arguments samFile* gInBamFp; // input BAM file pointer sam_hdr_t* gInBamHeader; // input BAM header vector gAuxVars; // auxiliary variables,保存一些文件,数据等,每个线程对应一个 RecalTables gRecalTables; // 记录bqsr所有的数据,输出table结果 vector gEventTypes; // 需要真正计算的eventtype // 下面是需要删除或修改的变量 std::vector gNameParsers; // read name parser DuplicationMetrics gMetrics; // DupResult gDupRes; PipelineArg gPipe(&gDupRes); samFile *gOutBamFp; // , sambam sam_hdr_t *gOutBamHeader; // header vector gKnownSitesVcfSrs; // known sites vcf srs }; // namespace nsgv // struct ByteBuf { uint8_t *buf = nullptr; int size = 0; // int capacity = 0; // }; // 读进来的这一批bam总共占了几个染色体,这个方案不行,读取太多,没必要 // 开区间 struct Region { int64_t start; int64_t end; }; /* * */ static string getFileExtension(const string &filename) { auto last_dot = filename.find_last_of('.'); if (last_dot == string::npos) { return ""; } return filename.substr(last_dot + 1); } // 过滤掉bqsr过程不符合要求的bam数据 bool bqsrReadFilterOut(const bam1_t *b) { // 过滤掉unmapped的read if (b->core.qual == 0) // mapping quality 0 return true; if (b->core.qual == 255) // mapping quality not available return true; if (b->core.flag & BAM_FUNMAP || b->core.tid == -1 || b->core.pos == -1) { // unmapped return true; } if (b->core.flag & BAM_FSECONDARY) { // secondary alignment return true; } if (b->core.flag & BAM_FDUP) { // secondary alignment return true; } if (b->core.flag & BAM_FQCFAIL) { // Not passing quality controls return true; } return false; } // 该操作符是否消耗read的碱基 bool consumeReadBases(char cigar) { return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'I' || cigar == 'S'; } // 该操作符是否消耗参考基因组的碱基 bool consumeRefBases(char cigar) { return cigar == 'M' || cigar == '=' || cigar == 'X' || cigar == 'D' || cigar == 'N'; } // 给定一个ref位置,在read内部找到对应的位置和操作符 struct PosAndOperator { int readPosAtRefCoord = -1; // read中的位置 char cigarOperator = '0'; // cigar操作符 int cigarIndex = -1; // cigar索引 int cigarLen = 0; int preCigarLen = 0; // 截止cigar之前的,消耗read base的长度 }; /** * Find the 0-based index within a read base array corresponding to a given 0-based position in the reference, along with the cigar operator of * the element containing that base. If the reference coordinate occurs within a deletion, the first index after the deletion is returned. * Note that this treats soft-clipped bases as if they align with the reference, which is useful for hard-clipping reads with soft clips. * * @param alignmentStart The soft start of the read on the reference * @param cigar The read's cigar * @param refCoord The target reference coordinate * @return If the reference coordinate occurs before the read start or after the read end {@code CLIPPING_GOAL_NOT_REACHED}; * if the reference coordinate falls within an alignment block of the read's cigar, the corresponding read coordinate; * if the reference coordinate falls within a deletion, the first read coordinate after the deletion. Note: if the last * cigar element is a deletion (which isn't meaningful), it returns {@code CLIPPING_GOAL_NOT_REACHED}. */ PosAndOperator getReadIndexForReferenceCoordinate(BamWrap *bw, int alignmentStart, int refCoord) { PosAndOperator po; if (refCoord < alignmentStart) { return po; } int firstReadPosOfElement = 0; // inclusive int firstRefPosOfElement = alignmentStart; // inclusive int lastReadPosOfElement = 0; // exclusive int lastRefPosOfElement = alignmentStart; // exclusive // advance forward through all the cigar elements until we bracket the reference coordinate const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; const int idx = bc.n_cigar - 1; if (idx < 0) return po; for (int i = 0; i < bc.n_cigar; ++i) { const char c = bam_cigar_opchr(cigar[i]); const int len = bam_cigar_oplen(cigar[i]); firstReadPosOfElement = lastReadPosOfElement; firstRefPosOfElement = lastRefPosOfElement; lastReadPosOfElement += consumeReadBases(c) ? len : 0; lastRefPosOfElement += (consumeRefBases(c) || c == 'S') ? len : 0; if (firstRefPosOfElement <= refCoord && refCoord < lastRefPosOfElement) { // refCoord falls within this cigar element int readPosAtRefCoord = firstReadPosOfElement + (consumeReadBases(c) ? (refCoord - firstRefPosOfElement) : 0); return PosAndOperator{readPosAtRefCoord, c, i, len, firstReadPosOfElement}; } } return po; } // 根据adapter位置,对read进行hardclip,返回左侧或右侧减掉的base数量 void clipByReferenceCoordinates(BamWrap *bw, int refStart, int refStop, SamData &ad) { int start, stop; // Determine the read coordinate to start and stop hard clipping if (refStart < 0) { if (refStop < 0) return; PosAndOperator stopPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStop); // if the refStop falls in a deletion, the above method returns the position after the deletion. Since the stop we return here // is inclusive, we decrement the stop to avoid overclipping by one base. As a result we do not clip the deletion, which is fine. stop = stopPosAndOperator.readPosAtRefCoord - (consumeReadBases(stopPosAndOperator.cigarOperator) ? 0 : 1); ad.left_clip = stop + 1; ad.cigar_start = stopPosAndOperator.cigarIndex; ad.first_cigar_clip = ad.left_clip - stopPosAndOperator.preCigarLen; } else { if (refStop >= 0) return; // unlike the above case where we clip the start fo the read, here we clip the end and returning the base to the right of a deletion avoids // overclipping PosAndOperator startPosAndOperator = getReadIndexForReferenceCoordinate(bw, bw->GetSoftStart(), refStart); start = startPosAndOperator.readPosAtRefCoord; ad.right_clip = bw->b->core.l_qseq - start; ad.cigar_end = startPosAndOperator.cigarIndex + 1; ad.last_cigar_clip = startPosAndOperator.preCigarLen + startPosAndOperator.cigarLen - start; } } // 计算切掉adapter之后,ref相对原始ref的偏移量 void calculateRefOffset(BamWrap *bw, SamData &ad) { const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; int i = 0; for (i = 0; i < ad.cigar_start; ++i) { const char c = bam_cigar_opchr(cigar[i]); int len = bam_cigar_oplen(cigar[i]); if (consumeRefBases(c)) { ad.ref_offset += len; } } const char c = bam_cigar_opchr(cigar[i]); if (consumeRefBases(c)) { ad.ref_offset += ad.first_cigar_clip; } } // 计算clip处理之后,剩余的碱基 void calculateReadBases(BamWrap* bw, SamData& ad) { ad.bases.resize(ad.read_len); ad.base_quals.resize(ad.read_len); uint8_t* seq = bam_get_seq(bw->b); uint8_t* quals = bam_get_qual(bw->b); for (int i = 0; i < ad.read_len; ++i) { ad.bases[i] = cBaseToChar[bam_seqi(seq, i + ad.left_clip)]; ad.base_quals[i] = quals[i + ad.left_clip]; } } // 计算read两端clip之后的softstart和softend void calculateSoftStartEnd(BamWrap* bw, SamData& ad) { int64_t softStart = bw->b->core.pos + ad.ref_offset; int64_t softEnd = softStart - 1; // 闭区间 const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; int cigar_start = ad.cigar_start; int cigar_end = ad.cigar_end; bool rightTail = false; for (int i = ad.cigar_start; i < ad.cigar_end; ++i) { const char c = bam_cigar_opchr(cigar[i]); int len = bam_cigar_oplen(cigar[i]); if (i == ad.cigar_start) len -= ad.first_cigar_clip; if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip; // if (c == 'S' || c == 'I') if (c == 'S') softStart -= len; else if (c != 'H') rightTail = true; if (rightTail) { if (consumeRefBases(c) || c == 'S') softEnd += len; } } ad.softStart() = softStart; ad.softEnd() = softEnd; } // 计算clip之后的cigar void calculateCigar(BamWrap* bw, SamData& ad) { ad.cigars.clear(); const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; int cigar_start = ad.cigar_start; int cigar_end = ad.cigar_end; for (int i = ad.cigar_start; i < ad.cigar_end; ++i) { const char c = bam_cigar_opchr(cigar[i]); int len = bam_cigar_oplen(cigar[i]); if (i == ad.cigar_start) len -= ad.first_cigar_clip; if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip; ad.cigars.push_back({c, len}); } //for(auto &cigar : ad.cigars) { // spdlog::info("op: {}, len: {}", cigar.op, cigar.len); //} } // 计算read两端softclip的碱基数量,可能会修改ad里的clip值 void calculateSoftClip(BamWrap *bw, SamData &ad) { const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; int readIndex = ad.left_clip; int cutLeft = -1; // first position to hard clip (inclusive) int cutRight = -1; // first position to hard clip (inclusive) int cigar_start = ad.cigar_start; int cigar_end = ad.cigar_end; bool rightTail = false; // trigger to stop clipping the left tail and start cutting the right tail for (int i = ad.cigar_start; i < ad.cigar_end; ++i) { const char c = bam_cigar_opchr(cigar[i]); int len = bam_cigar_oplen(cigar[i]); if (i == ad.cigar_start) len -= ad.first_cigar_clip; if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip; if (c == 'S') { if (rightTail) { cutRight = readIndex; cigar_end = i; } else { cutLeft = readIndex + len - 1; cigar_start = i + 1; } } else if (c != 'H') { rightTail = true; } if (consumeReadBases(c)) { readIndex += len; } } if (cutRight >= 0) { ad.right_clip = bw->b->core.l_qseq - cutRight; ad.cigar_end = cigar_end; ad.last_cigar_clip = 0; } if (cutLeft >= 0) { ad.left_clip = cutLeft + 1; ad.cigar_start = cigar_start; ad.first_cigar_clip = 0; } } // 读取给定区间的reference static inline void read_ref_base(AuxVar& aux, int64_t cur_pos, Interval& interval) { if (aux.ref_seq != NULL) free(aux.ref_seq); int tid = BamWrap::bam_tid(cur_pos); const char* chr = sam_hdr_tid2name(aux.header, tid); int seq_begin = BamWrap::bam_pos(interval.left); //- aux.REFERENCE_HALF_WINDOW_LENGTH; int seq_end = BamWrap::bam_pos(interval.right); //+ aux.REFERENCE_HALF_WINDOW_LENGTH; aux.ref_seq = faidx_fetch_seq(aux.faidx, chr, seq_begin, seq_end, &aux.ref_len); // aux.offset = aux.REFERENCE_HALF_WINDOW_LENGTH; } // 设置某个位置是indel inline void updateIndel(vector &isIndel, int index) { if (index >=0 && index < isIndel.size()) { isIndel[index] = 1; } } // 计算该read的每个碱基位置是否是SNP或Indel int calculateIsSNPOrIndel(AuxVar& aux, BamWrap *bw, SamData &ad, vector &isSNP, vector &isIns, vector &isDel) { // 1. 读取参考基因组,先看看串行运行性能,稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤 Interval interval{bw->start_pos() + ad.ref_offset, bw->end_pos()}; // 闭区间 PROF_START(ref); read_ref_base(aux, interval.left, interval); PROF_END(gprof[GP_read_ref], ref); string refBases(aux.ref_seq); // spdlog::info("ref: {}, {}, {} - {}", aux.ref_seq, aux.ref_len, bw->contig_pos(), bw->contig_end_pos()); // 2. 遍历cigar,计算每个碱基是否是SNP或Indel int readPos = 0, refPos = 0, nEvents = 0; const uint32_t* cigar = bam_get_cigar(bw->b); const bam1_core_t& bc = bw->b->core; uint8_t* seq = bam_get_seq(bw->b); for (int i = ad.cigar_start; i < ad.cigar_end; ++i) { const char c = bam_cigar_opchr(cigar[i]); int len = bam_cigar_oplen(cigar[i]); if (i == ad.cigar_start) len -= ad.first_cigar_clip; if (i == ad.cigar_end - 1) len -= ad.last_cigar_clip; if (c == 'M' || c == '=' || c == 'X') { for (int j = 0; j < len; ++j) { // 按位置将read和ref碱基进行比较,不同则是snp,注意read起始位置要加上left_clip int snpInt = cBaseToChar[bam_seqi(seq, readPos + ad.left_clip)] == refBases[refPos] ? 0 : 1; // if (snpInt > 0) { spdlog::info("snp {}, readpos: {}", snpInt, readPos); } isSNP[readPos] = snpInt; nEvents += snpInt; readPos++; refPos++; } } else if (c == 'D') { // 应该是在上一个消耗碱基的cigar的最后一个位置,标记Del int index = bw->GetReadNegativeStrandFlag() ? readPos : readPos - 1; updateIndel(isDel, index); refPos += len; } else if (c == 'N') { refPos += len; } else if (c == 'I') { // 与Del不同,Ins应该是在下一个cigar开始的位置,标记Ins bool forwardStrandRead = !bw->GetReadNegativeStrandFlag(); if (forwardStrandRead) { updateIndel(isIns, readPos - 1); } readPos += len; if (!forwardStrandRead) { updateIndel(isIns, readPos); } } else if (c == 'S') { readPos += len; } } nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0); // spdlog::info("nEvents: {}", nEvents); //spdlog::info("SNPs: {}, Ins: {}, Del: {}, total events: {}", std::accumulate(isSNP.begin(), isSNP.end(), 0), // std::accumulate(isIns.begin(), isIns.end(), 0), std::accumulate(isDel.begin(), isDel.end(), 0), nEvents); // exit(0); return nEvents; } // 简单计算baq数组,就是全部赋值为'@' (64) bool flatBAQArray(BamWrap* bw, SamData& ad, vector& baqArray) { baqArray.resize(ad.read_len, (uint8_t)'@'); return true; } // 计算真实的baq数组,耗时更多,好像enable-baq参数默认是关闭的,那就先不实现这个了 bool calculateBAQArray(AuxVar& aux, BAQ& baq, BamWrap* bw, SamData& ad, vector& baqArray) { baqArray.resize(ad.read_len, 0); return true; } // 获取一行字符串 static void get_line_from_buf(char* buf, int64_t total, int64_t* cur, string* line) { line->clear(); if (*cur >= total) return; char b; while (*cur < total && (b = buf[(*cur)++]) != '\n') { line->push_back(b); } } // 计算与read有交叉的已知位点信息, 应该要判断一下,是按照read的范围去读取vcf,还是按照一个batch read的范围去读取 void calculateKnownSites(BamWrap* bw, SamData& ad, vector &vcfs, vector &knownSites) { int tid = bw->contig_id(); uint64_t startPos = bw->start_pos(); // 闭区间 uint64_t endPos = bw->end_pos(); // 闭区间 // spdlog::info("bam {}, {}", startPos, endPos); // update vcfs for(auto &vcf : vcfs) { // 清理旧的interval while(!vcf.knownSites.empty()) { auto& intv = vcf.knownSites.front(); // spdlog::info("intv bam {}, {}", intv.right, startPos); if (intv.right < startPos) vcf.knownSites.pop_front(); else break; } if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) continue; // spdlog::info("intv {}, {}, {}", vcf.knownSites.size(), vcf.knownSites.front().right, vcf.knownSites.front().right); // exit(0); //spdlog::info("before intervals : {}", vcf.knownSites.size()); // 读取新的interval int64_t fpos, flen; endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos); Interval readIntv(startPos, endPos); vcf.index.SearchInterval(startPos, endPos, &fpos, &flen); //spdlog::info("file index: {}, {}", fpos, flen); if (flen > 0) { vcf.inStm.seekg(fpos, ios::beg); if (flen > vcf.bufLen) { vcf.bufLen = flen; vcf.buf = (char*)realloc(vcf.buf, flen); } char* buf = vcf.buf; vcf.inStm.read(buf, flen); string line; int64_t cur = 0; get_line_from_buf(buf, flen, &cur, &line); while (line.size() > 0) { stringstream ss_line(line); string stid; int tid, pos; int64_t locus; string id, ref; ss_line >> stid >> pos >> id >> ref; tid = sam_hdr_name2tid(nsgv::gInBamHeader, stid.c_str()); int64_t varStart = BamWrap::bam_global_pos(tid, pos); Interval varIntv(varStart, varStart + ref.size() - 1); if (readIntv.overlaps(varIntv)) { vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间 //spdlog::info("intv-1 {}, {}, {}", tid, pos, ref.size()); } get_line_from_buf(buf, flen, &cur, &line); } } //spdlog::info("after intervals : {}", vcf.knownSites.size()); //for(auto &val : vcf.knownSites) { // spdlog::info("intv {}, {}", val.left, val.right); //} } //exit(0); knownSites.resize(ad.read_len); endPos = bw->end_pos(); for(auto &vcf : vcfs) { for (auto &intv : vcf.knownSites) { // knownSite is outside clipping window for the read, ignore if (intv.right < ad.softStart()) continue; if (intv.left > ad.softEnd()) break; // 计算对应的位点 ReadIdxCigar rcStart = ad.getReadIndexForReferenceCoordinate(intv.left); int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx; if (rcStart.cigarOp == 'D') { --featureStartOnRead; } ReadIdxCigar rcEnd = ad.getReadIndexForReferenceCoordinate(intv.right); int featureEndOnRead = rcEnd.readIdx == SamData::READ_INDEX_NOT_FOUND ? ad.read_len : rcEnd.readIdx; if (featureStartOnRead > ad.read_len) { featureStartOnRead = featureEndOnRead = ad.read_len; } for (int i = max(0, featureStartOnRead); i < min(ad.read_len, featureEndOnRead + 1); ++i) { knownSites[i] = true; } } } } // 应该是计算一段数据的平均值 static void calculateAndStoreErrorsInBlock(int i, int blockStartIndex, vector& errorArr, vector& fracErrs) { int totalErrors = 0; for (int j = max(0, blockStartIndex - 1); j <= i; j++) { totalErrors += errorArr[j]; } for (int j = max(0, blockStartIndex - 1); j <= i; j++) { fracErrs[j] = ((double)totalErrors) / ((double)(i - max(0, blockStartIndex - 1) + 1)); } } // 应该是用来处理BAQ的,把不等于特定BAQ分数的碱基作为一段数据统一处理 void calculateFractionalErrorArray(vector& errorArr, vector& baqArr, vector& fracErrs) { // for (auto val : errorArr) { if (val > 0) spdlog::info("snp err val: {}", val); } fracErrs.resize(baqArr.size()); // errorArray和baqArray必须长度相同 const int BLOCK_START_UNSET = -1; bool inBlock = false; int blockStartIndex = BLOCK_START_UNSET; int i; for (i = 0; i < fracErrs.size(); ++i) { if (baqArr[i] == NO_BAQ_UNCERTAINTY) { // 不等于NO_BAQ_UNCERTAINTY的碱基当成一段数据来处理 if (!inBlock) { fracErrs[i] = errorArr[i]; } else { calculateAndStoreErrorsInBlock(i, blockStartIndex, errorArr, fracErrs); inBlock = false; // reset state variables blockStartIndex = BLOCK_START_UNSET; // reset state variables } } else { inBlock = true; if (blockStartIndex == BLOCK_START_UNSET) { blockStartIndex = i; } } } // 处理最后一个block if (inBlock) { calculateAndStoreErrorsInBlock(i - 1, blockStartIndex, errorArr, fracErrs); } } /** * Update the recalibration statistics using the information in recalInfo. * * Implementation detail: we only populate the quality score table and the optional tables. * The read group table will be populated later by collapsing the quality score table. * * @param recalInfo data structure holding information about the recalibration values for a single read */ void updateRecalTablesForRead(ReadRecalInfo &info) { SamData &read = info.read; PerReadCovariateMatrix& readCovars = info.covariates; Array3D& qualityScoreTable = nsgv::gRecalTables.qualityScoreTable; Array4D& contextTable = nsgv::gRecalTables.contextTable; Array4D& cycleTable = nsgv::gRecalTables.cycleTable; int readLength = read.read_len; for (int offset = 0; offset < readLength; ++offset) { if (!info.skips[offset]) { //if (true){ // 不跳过当前位置 for (int idx = 0; idx < nsgv::gEventTypes.size(); ++idx) { // 获取四个值,readgroup / qualityscore / context / cycle EventTypeValue& event = nsgv::gEventTypes[idx]; vector& covariatesAtOffset = readCovars[event.index][offset]; uint8_t qual = info.getQual(event, offset); double isError = info.getErrorFraction(event, offset); int readGroup = covariatesAtOffset[ReadGroupCovariate::index]; int baseQuality = covariatesAtOffset[BaseQualityCovariate::index]; // 处理base quality score协变量 // RecalUtils::IncrementDatum3keys(qualityScoreTable, qual, isError, readGroup, baseQuality, event.index); qualityScoreTable[readGroup][baseQuality][event.index].increment(1, isError, baseQuality); auto &d = qualityScoreTable[readGroup][baseQuality][event.index]; // spdlog::info("isError {} : {}, mis {}, obs {}", isError, info.snp_errs[offset], d.numMismatches, d.numObservations); // 处理context covariate int contextCovariate = covariatesAtOffset[ContextCovariate::index]; if (contextCovariate >= 0) contextTable[readGroup][baseQuality][contextCovariate][event.index].increment(1, isError, baseQuality); // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.contextTable, qual, isError, readGroup, baseQuality, contextCovariate, // event.index); // 处理cycle covariate int cycleCovariate = covariatesAtOffset[CycleCovariate::index]; if (cycleCovariate >= 0) cycleTable[readGroup][baseQuality][cycleCovariate][event.index].increment(1, isError, baseQuality); // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.cycleTable, qual, isError, readGroup, baseQuality, cycleCovariate, event.index); } } } } // 数据总结 void collapseQualityScoreTableToReadGroupTable(Array2D &byReadGroupTable, Array3D &byQualTable) { // 遍历quality table for (int k1 = 0; k1 < byQualTable.data.size(); ++k1) { for (int k2 = 0; k2 < byQualTable[k1].size(); ++k2) { for (int k3 = 0; k3 < byQualTable[k1][k2].size(); ++k3) { auto& qualDatum = byQualTable[k1][k2][k3]; if (qualDatum.numObservations > 0) { int rgKey = k1; int eventIndex = k3; // spdlog::info("k1 {}, k2 {}, k3 {}, numMis {}", k1, k2, k3, qualDatum.numMismatches); byReadGroupTable[rgKey][eventIndex].combine(qualDatum); // spdlog::info("rg {} {}, k3 {}", byReadGroupTable[rgKey][eventIndex].numMismatches, byReadGroupTable[rgKey][eventIndex].reportedQuality, k3); } } } } } /** * To replicate the results of BQSR whether or not we save tables to disk (which we need in Spark), * we need to trim the numbers to a few decimal placed (that's what writing and reading does). */ void roundTableValues(RecalTables& rt) { #define _round_val(val) \ do { \ if (val.numObservations > 0) { \ val.numMismatches = MathUtils::RoundToNDecimalPlaces(val.numMismatches, RecalUtils::NUMBER_ERRORS_DECIMAL_PLACES); \ val.reportedQuality = MathUtils::RoundToNDecimalPlaces(val.reportedQuality, RecalUtils::REPORTED_QUALITY_DECIMAL_PLACES); \ } \ } while (0) _Foreach2D(rt.readGroupTable, val, { _round_val(val); }); _Foreach3D(rt.qualityScoreTable, val, { _round_val(val); }); _Foreach4D(rt.contextTable, val, { _round_val(val); }); _Foreach4D(rt.cycleTable, val, { _round_val(val); }); } // 串行bqsr int SerialBQSR() { open_debug_files(); int round = 0; BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); // inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); int64_t readNumSum = 0; // 0. 初始化一些全局数据 // BAQ baq{BAQ::DEFAULT_GOP}; RecalDatum::StaticInit(); QualityUtils::StaticInit(); MathUtils::StaticInit(); // 1. 协变量数据相关初始化 PerReadCovariateMatrix readCovariates; CovariateUtils::InitPerReadCovMat(readCovariates); ContextCovariate::InitContextCovariate(nsgv::gBqsrArg); CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg); // 注意初始化顺序,这个必须在协变量初始化之后 nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg); nsgv::gEventTypes.push_back(EventType::BASE_SUBSTITUTION); if (nsgv::gBqsrArg.computeIndelBQSRTables) { nsgv::gEventTypes.push_back(EventType::BASE_INSERTION); nsgv::gEventTypes.push_back(EventType::BASE_DELETION); } // 2. 读取bam的read group if (nsgv::gInBamHeader->hrecs->nrg == 0) { spdlog::error("No RG tag found in the header!"); return 1; } for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) { spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name); ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i; ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name; } int test = 0; while (1) { ++ round; // 一. 读取bam数据 size_t readNum = 0; if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); if (readNum < 1) { break; } auto bams = inBamBuf.GetBamArr(); spdlog::info("{} reads processed in {} round, {}", readNum, round, test); // 二. 遍历每个bam(read)记录,进行处理 for (int i = 0; i < bams.size(); ++i) { // 1. 对每个read,需要检查cigar是否合法,即没有两个连续的相同的cigar,而且需要将首尾的deletion处理掉,目前看好像没啥影响,我们忽略这一步 // 2. 对质量分数长度跟碱基长度不匹配的read,缺少的质量分数用默认值补齐,先忽略,后边有需要再处理 // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 // 4. 对read的两端进行检测,去除(hardclip)adapter // spdlog::info("bam idx: {}", i); BamWrap* bw = bams[i]; SamData ad; ad.rid = i; ad.bw = bw; ad.read_len = BamWrap::BamEffectiveLength(bw->b); ad.cigar_end = bw->b->core.n_cigar; if (ad.read_len <= 0) continue; int adapter_boundary = bw->GetAdapterBoundary(); if (bw->IsAdapterInRead(adapter_boundary)) { // adapter在read范围内 if (bw->GetReadNegativeStrandFlag()) { // 反链 clipByReferenceCoordinates(bw, -1, adapter_boundary, ad); } else { // 正链 clipByReferenceCoordinates(bw, adapter_boundary, -1, ad); } } ad.read_len = bw->b->core.l_qseq - ad.left_clip - ad.right_clip; // 更新read长度 // 5. 然后再去除softclip部分 calculateSoftClip(bw, ad); ad.read_len = bw->b->core.l_qseq - ad.left_clip - ad.right_clip; // 更新read长度 if (ad.read_len <= 0) continue; calculateRefOffset(bw, ad); // 计算ref_offset,就是相对比对的position,要将ref右移多少 calculateReadBases(bw, ad); // 计算clip处理之后,剩余的碱基 // 计算clip之后,两端的softstart和softend calculateSoftStartEnd(bw, ad); calculateCigar(bw, ad); //spdlog::info("read-len {} - {}: clip left {}, right {}, ref offset: {}, cigar range: [{}, {}), cigar: {}", bw->b->core.l_qseq, // ad.read_len, ad.left_clip, ad.right_clip, ad.ref_offset, ad.cigar_start, ad.cigar_end, bw->cigar_str()); // 6. 更新每个read的platform信息,好像没啥用,暂时忽略 vector isSNP(ad.read_len, 0); // 该位置是否是SNP位置,0不是,1是 vector isIns(ad.read_len, 0); // 该位置是否是插入位置,0不是,1是 vector isDel(ad.read_len, 0); // 该位置是否是删除位置,0不是,1是 const int nErrors = calculateIsSNPOrIndel(nsgv::gAuxVars[0], bw, ad, isSNP, isIns, isDel); /*fprintf(gf[0], "%d\t", ad.read_len); for (int ii = 0; ii < ad.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); fprintf(gf[0], "\n"); fprintf(gf[1], "%d\t", ad.read_len); for (int ii = 0; ii < ad.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]); fprintf(gf[1], "\n"); fprintf(gf[2], "%d\t", ad.read_len); for (int ii = 0; ii < ad.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]); fprintf(gf[2], "\n"); */ // spdlog::info("nErrors: {}", nErrors); // for (auto val : isSNP) { if (val > 0) spdlog::info("snp val: {}", val); } //exit(0); // 7. 计算baqArray // BAQ = base alignment quality // note for efficiency reasons we don't compute the BAQ array unless we actually have // some error to marginalize over. For ILMN data ~85% of reads have no error vector baqArray; bool baqCalculated = false; if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { baqCalculated = flatBAQArray(bw, ad, baqArray); } else { // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, bw, ad, baqArray); } if (!baqCalculated) continue; // 到这里,基本的数据都准备好了,后续就是进行bqsr的统计了 // 8. 计算这条read对应的协变量 PROF_START(covariate); CovariateUtils::ComputeCovariates(bw, ad, nsgv::gInBamHeader, readCovariates, true); PROF_END(gprof[GP_covariate], covariate); // fprintf(gf[3], "%ld %ld %d %ld\n", ad.rid, readCovariates.size(), ad.read_len, readCovariates[0][0].size()); // for (auto &arr1 : readCovariates) { // for (size_t si = 0; si < ad.read_len; ++si) { // for (auto &val : arr1[si]) { // fprintf(gf[3], "%d ", val); // } // } // } // fprintf(gf[3], "\n"); //test = readCovariates[1][0][0] + readCovariates[2][1][3]; //int end_pos = bw->contig_end_pos(); //spdlog::info("adapter: {}, read: {}, {}, strand: {}", adapter_boundary, bw->contig_pos(), end_pos, // bw->GetReadNegativeStrandFlag() ? "reverse" : "forward"); // for (auto val : isSNP) { if (val > 0) spdlog::info("snp err val-1: {}", val); } // 9. 计算这条read需要跳过的位置 vector skips(ad.read_len, 0); PROF_START(known_sites); calculateKnownSites(bw, ad, nsgv::gAuxVars[0].vcfArr, skips); for (int ii = 0; ii < ad.read_len; ++ii) { skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[ad.bases[ii]] == -1) || ad.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; } PROF_END(gprof[GP_read_vcf], known_sites); // fprintf(gf[0], "%d\t", ad.read_len); // for (int ii = 0; ii < ad.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0); // fprintf(gf[0], "\n"); // 10. 根据BAQ进一步处理snp,indel,得到处理后的数据 vector snpErrors, insErrors, delErrors; // for (auto val : isSNP) { if (val > 0) spdlog::info("snp err val-2: {}", val); } calculateFractionalErrorArray(isSNP, baqArray, snpErrors); calculateFractionalErrorArray(isIns, baqArray, insErrors); calculateFractionalErrorArray(isDel, baqArray, delErrors); // for (auto val : isSNP) { if (val > 0) spdlog::info("snp val: {}", val); } //spdlog::info("snp errors size: {}, read len: {}", snpErrors.size(), ad.read_len); //for (auto val : snpErrors) { if (val > 0) spdlog::info("snp err val: {}", val); } // aggregate all of the info into our info object, and update the data // 11. 合并之前计算的数据,得到info,并更新bqsr table数据 ReadRecalInfo info(ad, readCovariates, skips, snpErrors, insErrors, delErrors); int m = 0; // for (auto err : snpErrors) { if (isSNP[m] > 0 || err > 0) spdlog::info("snp err: {} : {}", isSNP[m++], err); } //exit(0); PROF_START(update_info); updateRecalTablesForRead(info); PROF_END(gprof[GP_update_info], update_info); } // exit(0); readNumSum += readNum; inBamBuf.ClearAll(); // } spdlog::info("read count: {}", readNumSum); // 12. 创建总结数据 collapseQualityScoreTableToReadGroupTable(nsgv::gRecalTables.readGroupTable, nsgv::gRecalTables.qualityScoreTable); roundTableValues(nsgv::gRecalTables); // 13. 量化质量分数 QuantizationInfo quantInfo(nsgv::gRecalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); // 14. 输出结果 RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, nsgv::gRecalTables); close_debug_files(); return 0; } // 需要支持vcf idx,tbi,csi三种索引方式 // vcf和idx是一对 // vcf.gz和tbi或csi是一对 // entrance of mark BQSR int BaseRecalibrator() { PROF_START(whole_process); /* bam */ nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr); if (!nsgv::gInBamFp) { spdlog::error("[{}] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header // 初始化AuxVar nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS); for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) { nsgv::gAuxVars[i].header = nsgv::gInBamHeader; nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str()); if (nsgv::gAuxVars[i].faidx == 0) error("[%s] fail to load the fasta index.\n", __func__); for (auto &vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader)); } } // (libraryId) nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0); /* 并行读取bam数据 */ htsThreadPool htsPoolRead = {NULL, 0}; // , htsThreadPool htsPoolWrite = {NULL, 0}; // htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); htsPoolWrite.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); if (!htsPoolRead.pool || !htsPoolWrite.pool) { spdlog::error("[{}] failed to set up thread pool", __LINE__); sam_close(nsgv::gInBamFp); return -1; } hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); return SerialBQSR(); // 读取known sites vcfs for (const auto& ks : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { spdlog::info(" {}", ks); bcf_srs_t* srs = bcf_sr_init(); if (!bcf_sr_add_reader(srs, ks.c_str())) error("Failed to read from %s: %s\n", !strcmp("-", ks.c_str()) ? "standard input" : ks.c_str(), bcf_sr_strerror(srs->errnum)); nsgv::gKnownSitesVcfSrs.push_back(srs); while (bcf_sr_next_line(srs)) { bcf1_t* line = srs->readers[0].buffer[0]; cout << line->pos << '\t' << line->rlen << '\t' << line->n_allele << '\t' << line->n_info << endl; } } /* 先实现串行的bqsr-phase-1 */ sam_close(nsgv::gInBamFp); PROF_END(gprof[GP_whole_process], whole_process); return 0; }