diff --git a/src/bqsr/aux_arg.h b/src/bqsr/aux_arg.h new file mode 100644 index 0000000..1649443 --- /dev/null +++ b/src/bqsr/aux_arg.h @@ -0,0 +1,35 @@ +/* + Description: 解析后的一些参数,文件,数据等,主要供线程用,每个线程一个实例 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2025/12/29 +*/ +#pragma once + +#include +#include + +#include + +#include "util/vcf_parser.h" +#include "recal_tables.h" + +using std::vector; + +// 解析后的一些参数,文件,数据等 +struct AuxVar { + const static int REF_CONTEXT_PAD = 3; // 需要做一些填充 + const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列,防止边界效应 + + sam_hdr_t* header = nullptr; // bam header + faidx_t* faidx = nullptr; // reference index + char* ref_seq = nullptr; // reference sequence + int ref_len = 0; // reference sequence length + int offset = 0; // 在要求的ref序列两边,多余取出的碱基数量 + + vector vcfArr; // 从vcf中获取已知位点 + + RecalTables recalTables; // 每个线程对应一个recal tables +}; \ No newline at end of file diff --git a/src/bqsr/baq.h b/src/bqsr/baq.h index a57b722..0d43d5a 100644 --- a/src/bqsr/baq.h +++ b/src/bqsr/baq.h @@ -16,6 +16,8 @@ #include "util/bam_wrap.h" #include "util/sam_data.h" +#include "util/stable_array.h" +#include "aux_arg.h" using std::vector; using std::string; @@ -23,6 +25,8 @@ using std::string; // base alignment quality (BAQ) struct BAQ { + static void StaticInit() { } + static vector qual2prob; // 质量分数转化概率 static constexpr double EM = 0.33333333333; static constexpr double EI = 0.25; @@ -86,4 +90,16 @@ struct BAQ { // 计算baq数组,返回成功与否 bool calcBAQFromHMM(SamData& ad, string ref, int refOffset, vector& baqArray); + + // 简单计算baq数组,就是全部赋值为'@' (64) + static bool flatBAQArray(SamData& sd, StableArray& baqArray) { + baqArray.resize_fill(sd.read_len, (uint8_t)'@'); + return true; + } + + // 计算真实的baq数组,耗时更多,好像enable-baq参数默认是关闭的,那就先不实现这个了 + static bool calculateBAQArray(AuxVar& aux, BAQ& baq, SamData& sd, vector& baqArray) { + baqArray.resize(sd.read_len, 0); + return true; + } }; \ No newline at end of file diff --git a/src/bqsr/bqsr_entry.cpp b/src/bqsr/bqsr_entry.cpp index b3bc42a..f0ce7f8 100644 --- a/src/bqsr/bqsr_entry.cpp +++ b/src/bqsr/bqsr_entry.cpp @@ -20,76 +20,33 @@ Date : 2023/10/23 #include #include +#include "aux_arg.h" #include "baq.h" #include "bqsr_args.h" -#include "bqsr_funcs.h" -#include "bqsr_pipeline.h" #include "covariate.h" -#include "dup_metrics.h" #include "fastbqsr_version.h" -#include "read_name_parser.h" +#include "quant_info.h" #include "read_recal_info.h" #include "recal_datum.h" +#include "recal_funcs.h" #include "recal_tables.h" #include "recal_utils.h" +#include "util/bam_buf.h" +#include "util/base_utils.h" +#include "util/debug.h" #include "util/interval.h" #include "util/linear_index.h" -#include "util/profiling.h" -#include "util/utils.h" #include "util/math/math_utils.h" -#include "quant_info.h" -#include "util/debug.h" -#include "util/stable_array.h" -#include "util/sam_data.h" +#include "util/profiling.h" #include "util/read_transformer.h" -#include "util/base_utils.h" +#include "util/sam_data.h" +#include "util/stable_array.h" +#include "util/utils.h" +#include "util/vcf_parser.h" using std::deque; -#define BAM_BLOCK_SIZE 16L * 1024 * 1024 -#define MAX_SITES_INTERVAL 100000 - -const uint8_t NO_BAQ_UNCERTAINTY = (uint8_t)'@'; - -// 解析knownSites -struct VCFParser { - deque knownSites; // 已知的变异位点 - char* buf = nullptr; // // 数据buffer - uint32_t bufLen = 4 * 1024; // 数据buffer长度 - LinearIndex index; // vcf文件索引 - ifstream inStm; // vcf文件流 - - VCFParser() { Init(); } - VCFParser(const string& vcfFileName) { Init(vcfFileName); } - VCFParser(const string& vcfFileName, sam_hdr_t* samHeader) { Init(vcfFileName, samHeader); } - - void Init() { buf = (char*)malloc(bufLen); } - void Init(const string& vcfFileName) { - Init(); - inStm.open(vcfFileName, ifstream::in); - string idxFileName = vcfFileName + ".idx"; - if (!index.ReadIndex(idxFileName)) - error("[%s] fail to load the %s index file\n", __func__, idxFileName.c_str()); - } - void Init(const string& vcfFileName, sam_hdr_t *samHeader) { - index.SetHeader(samHeader); - Init(vcfFileName); - } -}; - -// 解析后的一些参数,文件,数据等 -struct AuxVar { - const static int REF_CONTEXT_PAD = 3; // 需要做一些填充 - const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列,防止边界效应 - - sam_hdr_t* header = nullptr; // bam header - faidx_t* faidx = nullptr; // reference index - char* ref_seq = nullptr; // reference sequence - int ref_len = 0; // reference sequence length - int offset = 0; // 在要求的ref序列两边,多余取出的碱基数量 - - vector vcfArr; // 从vcf中获取已知位点 -}; +#define BAM_BLOCK_SIZE 16L * 1024 * 1024 // 16M namespace nsgv { @@ -98,44 +55,8 @@ BQSRArg gBqsrArg; // bqsr arguments samFile* gInBamFp; // input BAM file pointer sam_hdr_t* gInBamHeader; // input BAM header vector gAuxVars; // auxiliary variables,保存一些文件,数据等,每个线程对应一个 -RecalTables gRecalTables; // 记录bqsr所有的数据,输出table结果 -vector gEventTypes; // 需要真正计算的eventtype - -// 下面是需要删除或修改的变量 -std::vector gNameParsers; // read name parser -DuplicationMetrics gMetrics; // -DupResult gDupRes; -PipelineArg gPipe(&gDupRes); - -samFile *gOutBamFp; // , sambam -sam_hdr_t *gOutBamHeader; // header -vector gKnownSitesVcfSrs; // known sites vcf srs }; // namespace nsgv -// -struct ByteBuf { - uint8_t *buf = nullptr; - int size = 0; // - int capacity = 0; // -}; - -// 读进来的这一批bam总共占了几个染色体,这个方案不行,读取太多,没必要 -// 开区间 -struct Region { - int64_t start; - int64_t end; -}; - -/* - * - */ -static string getFileExtension(const string &filename) { - auto last_dot = filename.find_last_of('.'); - if (last_dot == string::npos) { - return ""; - } - return filename.substr(last_dot + 1); -} // 过滤掉bqsr过程不符合要求的bam数据 bool bqsrReadFilterOut(const bam1_t *b) { @@ -159,318 +80,16 @@ bool bqsrReadFilterOut(const bam1_t *b) { return false; } -// 读取给定区间的reference -static inline void read_ref_base(AuxVar& aux, int64_t cur_pos, Interval& interval) { - if (aux.ref_seq != NULL) - free(aux.ref_seq); - int tid = BamWrap::bam_tid(cur_pos); - const char* chr = sam_hdr_tid2name(aux.header, tid); - int seq_begin = BamWrap::bam_pos(interval.left); //- aux.REFERENCE_HALF_WINDOW_LENGTH; - int seq_end = BamWrap::bam_pos(interval.right); //+ aux.REFERENCE_HALF_WINDOW_LENGTH; - aux.ref_seq = faidx_fetch_seq(aux.faidx, chr, seq_begin, seq_end, &aux.ref_len); - // aux.offset = aux.REFERENCE_HALF_WINDOW_LENGTH; -} - -// 设置某个位置是indel -inline void updateIndel(StableArray& isIndel, int index) { - if (index >=0 && index < isIndel.size()) { - isIndel[index] = 1; - } -} - -// 计算该read的每个碱基位置是否是SNP或Indel -int calculateIsSNPOrIndel(AuxVar& aux, SamData& sd, StableArray& isSNP, StableArray& isIns, StableArray& isDel) { - isSNP.resize(sd.read_len, 0); - isIns.resize(sd.read_len, 0); - isDel.resize(sd.read_len, 0); - // 1. 读取参考基因组,先看看串行运行性能,稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤 - Interval interval{sd.start_pos, sd.end_pos}; // 闭区间 - PROF_START(ref); - read_ref_base(aux, interval.left, interval); - PROF_END(gprof[GP_read_ref], ref); - string refBases(aux.ref_seq); - - // 2. 遍历cigar,计算每个碱基是否是SNP或Indel - int readPos = 0, refPos = 0, nEvents = 0; - for (int i = 0; iGetReadNegativeStrandFlag() ? readPos : readPos - 1; - updateIndel(isDel, index); - refPos += len; - } else if (c == 'N') { - refPos += len; - } else if (c == 'I') { - // 与Del不同,Ins应该是在下一个cigar开始的位置,标记Ins - bool forwardStrandRead = !sd.bw->GetReadNegativeStrandFlag(); - if (forwardStrandRead) { - updateIndel(isIns, readPos - 1); - } - readPos += len; - if (!forwardStrandRead) { - updateIndel(isIns, readPos); - } - } else if (c == 'S') { - readPos += len; - } - } - nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0); - - return nEvents; -} - -// 简单计算baq数组,就是全部赋值为'@' (64) -bool flatBAQArray(SamData& sd, StableArray& baqArray) { - baqArray.resize(sd.read_len, (uint8_t)'@'); - return true; -} - -// 计算真实的baq数组,耗时更多,好像enable-baq参数默认是关闭的,那就先不实现这个了 -bool calculateBAQArray(AuxVar& aux, BAQ& baq, SamData& sd, vector& baqArray) { - baqArray.resize(sd.read_len, 0); - return true; -} - -// 获取一行字符串 -static void get_line_from_buf(char* buf, int64_t total, int64_t* cur, string* line) { - line->clear(); - if (*cur >= total) - return; - char b; - while (*cur < total && (b = buf[(*cur)++]) != '\n') { - line->push_back(b); - } -} - -// 计算与read有交叉的已知位点信息, 应该要判断一下,是按照read的范围去读取vcf,还是按照一个batch read的范围去读取 -void calculateKnownSites(SamData& sd, vector& vcfs, StableArray& knownSites) { - BamWrap* bw = sd.bw; - int tid = bw->contig_id(); - uint64_t startPos = bw->start_pos(); // 闭区间 - uint64_t endPos = bw->end_pos(); // 闭区间 - knownSites.resize(sd.read_len, 0); - - // update vcfs - for(auto &vcf : vcfs) { - // 清理旧的interval - while(!vcf.knownSites.empty()) { - auto& intv = vcf.knownSites.front(); - // spdlog::info("intv bam {}, {}", intv.right, startPos); - if (intv.right < startPos) - vcf.knownSites.pop_front(); - else - break; - } - if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) continue; - - // 读取新的interval - int64_t fpos, flen; - endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos); - Interval readIntv(startPos, endPos); - vcf.index.SearchInterval(startPos, endPos, &fpos, &flen); - if (flen > 0) { - vcf.inStm.seekg(fpos, ios::beg); - if (flen > vcf.bufLen) { - vcf.bufLen = flen; - vcf.buf = (char*)realloc(vcf.buf, flen); - } - char* buf = vcf.buf; - vcf.inStm.read(buf, flen); - string line; - int64_t cur = 0; - get_line_from_buf(buf, flen, &cur, &line); - while (line.size() > 0) { - stringstream ss_line(line); - string stid; - int tid, pos; - int64_t locus; - string id, ref; - ss_line >> stid >> pos >> id >> ref; - tid = sam_hdr_name2tid(nsgv::gInBamHeader, stid.c_str()); - int64_t varStart = BamWrap::bam_global_pos(tid, pos); - Interval varIntv(varStart, varStart + ref.size() - 1); - if (readIntv.overlaps(varIntv)) { - vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间 - } - get_line_from_buf(buf, flen, &cur, &line); - } - } - } - for(auto &vcf : vcfs) { - for (auto &intv : vcf.knownSites) { - // knownSite is outside clipping window for the read, ignore - if (intv.right < sd.softStart()) - continue; - if (intv.left > sd.softEnd()) - break; - // 计算对应的位点 - ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.left); - int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx; - if (rcStart.cigarOp == 'D') { - --featureStartOnRead; - } - ReadIdxCigar rcEnd = sd.getReadIndexForReferenceCoordinate(intv.right); - int featureEndOnRead = rcEnd.readIdx == SamData::READ_INDEX_NOT_FOUND ? sd.read_len : rcEnd.readIdx; - if (featureStartOnRead > sd.read_len) { - featureStartOnRead = featureEndOnRead = sd.read_len; - } - for (int i = max(0, featureStartOnRead); i < min(sd.read_len, featureEndOnRead + 1); ++i) { - knownSites[i] = true; - } - } - } -} - -// 应该是计算一段数据的平均值 -static void calculateAndStoreErrorsInBlock(int i, int blockStartIndex, StableArray& errorArr, StableArray& fracErrs) { - int totalErrors = 0; - for (int j = max(0, blockStartIndex - 1); j <= i; j++) { - totalErrors += errorArr[j]; - } - for (int j = max(0, blockStartIndex - 1); j <= i; j++) { - fracErrs[j] = ((double)totalErrors) / ((double)(i - max(0, blockStartIndex - 1) + 1)); - } -} - -// 应该是用来处理BAQ的,把不等于特定BAQ分数的碱基作为一段数据统一处理 -void calculateFractionalErrorArray(StableArray& errorArr, StableArray& baqArr, StableArray& fracErrs) { - fracErrs.resize(baqArr.size(), 0.0); - // errorArray和baqArray必须长度相同 - const int BLOCK_START_UNSET = -1; - bool inBlock = false; - int blockStartIndex = BLOCK_START_UNSET; - int i; - for (i = 0; i < fracErrs.size(); ++i) { - if (baqArr[i] == NO_BAQ_UNCERTAINTY) { // 不等于NO_BAQ_UNCERTAINTY的碱基当成一段数据来处理 - if (!inBlock) { - fracErrs[i] = errorArr[i]; - } else { - calculateAndStoreErrorsInBlock(i, blockStartIndex, errorArr, fracErrs); - inBlock = false; // reset state variables - blockStartIndex = BLOCK_START_UNSET; // reset state variables - } - } else { - inBlock = true; - if (blockStartIndex == BLOCK_START_UNSET) { - blockStartIndex = i; - } - } - } - // 处理最后一个block - if (inBlock) { - calculateAndStoreErrorsInBlock(i - 1, blockStartIndex, errorArr, fracErrs); - } -} - -/** - * Update the recalibration statistics using the information in recalInfo. - * - * Implementation detail: we only populate the quality score table and the optional tables. - * The read group table will be populated later by collapsing the quality score table. - * - * @param recalInfo data structure holding information about the recalibration values for a single read - */ -void updateRecalTablesForRead(ReadRecalInfo &info) { - SamData &read = info.read; - PerReadCovariateMatrix& readCovars = info.covariates; - Array3D& qualityScoreTable = nsgv::gRecalTables.qualityScoreTable; - Array4D& contextTable = nsgv::gRecalTables.contextTable; - Array4D& cycleTable = nsgv::gRecalTables.cycleTable; - - int readLength = read.read_len; - for (int offset = 0; offset < readLength; ++offset) { - //if (read.rid == 46114) { - // fprintf(gf[3], "%d %d\n", offset, info.skips[offset] ? 1 : 0); - //} - if (!info.skips[offset]) { - //if (true){ // 不跳过当前位置 - for (int idx = 0; idx < nsgv::gEventTypes.size(); ++idx) { - // 获取四个值,readgroup / qualityscore / context / cycle - EventTypeValue& event = nsgv::gEventTypes[idx]; - vector& covariatesAtOffset = readCovars[event.index][offset]; - uint8_t qual = info.getQual(event, offset); - double isError = info.getErrorFraction(event, offset); - - int readGroup = covariatesAtOffset[ReadGroupCovariate::index]; - int baseQuality = covariatesAtOffset[BaseQualityCovariate::index]; - - // 处理base quality score协变量 - // RecalUtils::IncrementDatum3keys(qualityScoreTable, qual, isError, readGroup, baseQuality, event.index); - if (read.rid == 46114) { - // fprintf(gf[3], "%d %d %f\n", offset, baseQuality, isError); - } - qualityScoreTable[readGroup][baseQuality][event.index].increment(1, isError, baseQuality); - - auto &d = qualityScoreTable[readGroup][baseQuality][event.index]; - // spdlog::info("isError {} : {}, mis {}, obs {}", isError, info.snp_errs[offset], d.numMismatches, d.numObservations); - - // 处理context covariate - int contextCovariate = covariatesAtOffset[ContextCovariate::index]; - if (contextCovariate >= 0) - contextTable[readGroup][baseQuality][contextCovariate][event.index].increment(1, isError, baseQuality); - // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.contextTable, qual, isError, readGroup, baseQuality, contextCovariate, - // event.index); - // 处理cycle covariate - int cycleCovariate = covariatesAtOffset[CycleCovariate::index]; - if (cycleCovariate >= 0) - cycleTable[readGroup][baseQuality][cycleCovariate][event.index].increment(1, isError, baseQuality); - // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.cycleTable, qual, isError, readGroup, baseQuality, cycleCovariate, event.index); - } - } - } - // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); - // for (auto& arr1 : qualityScoreTable.data) { - // for (size_t si = 0; si < arr1.size(); ++si) { - // for (auto &val : arr1[si]) { - // if (val.numObservations > 0) - // fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); - // } - // } - // } - // fprintf(gf[3], "\n"); - - // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); - // for (auto& arr1 : contextTable.data) { - // for (size_t si = 0; si < arr1.size(); ++si) { - // for (auto &arr2 : arr1[si]) { - // for (auto& val : arr2) { - // if (val.numObservations > 0) - // fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); - // } - // } - // } - // } - // fprintf(gf[3], "\n"); -} - // 数据总结 void collapseQualityScoreTableToReadGroupTable(Array2D &byReadGroupTable, Array3D &byQualTable) { // 遍历quality table - for (int k1 = 0; k1 < byQualTable.data.size(); ++k1) { - for (int k2 = 0; k2 < byQualTable[k1].size(); ++k2) { - for (int k3 = 0; k3 < byQualTable[k1][k2].size(); ++k3) { - auto& qualDatum = byQualTable[k1][k2][k3]; - if (qualDatum.numObservations > 0) { - int rgKey = k1; - int eventIndex = k3; - // spdlog::info("k1 {}, k2 {}, k3 {}, numMis {}", k1, k2, k3, qualDatum.numMismatches); - byReadGroupTable[rgKey][eventIndex].combine(qualDatum); - // spdlog::info("rg {} {}, k3 {}", byReadGroupTable[rgKey][eventIndex].numMismatches, byReadGroupTable[rgKey][eventIndex].reportedQuality, k3); - } - } + _Foreach3DK(byQualTable, qualDatum, { + if (qualDatum.numObservations > 0) { + int rgKey = k1; + int eventIndex = k3; + byReadGroupTable[rgKey][eventIndex].combine(qualDatum); } - } + }); } /** @@ -494,45 +113,15 @@ void roundTableValues(RecalTables& rt) { // 串行bqsr int SerialBQSR() { - open_debug_files(); - int round = 0; BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); - // inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); int64_t readNumSum = 0; - // 0. 初始化一些全局数据 - // BAQ baq{BAQ::DEFAULT_GOP}; - RecalDatum::StaticInit(); - QualityUtils::StaticInit(); - MathUtils::StaticInit(); - - // 1. 协变量数据相关初始化 + int round = 0; PerReadCovariateMatrix readCovariates; CovariateUtils::InitPerReadCovMat(readCovariates); - ContextCovariate::InitContextCovariate(nsgv::gBqsrArg); - CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg); - - // 注意初始化顺序,这个必须在协变量初始化之后 - nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg); - nsgv::gEventTypes.push_back(EventType::BASE_SUBSTITUTION); - if (nsgv::gBqsrArg.computeIndelBQSRTables) { - nsgv::gEventTypes.push_back(EventType::BASE_INSERTION); - nsgv::gEventTypes.push_back(EventType::BASE_DELETION); - } - - // 2. 读取bam的read group - if (nsgv::gInBamHeader->hrecs->nrg == 0) { - spdlog::error("No RG tag found in the header!"); - return 1; - } - for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) { - spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name); - ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i; - ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name; - } - + RecalTables& recalTables = nsgv::gAuxVars[0].recalTables; while (true) { - ++ round; + ++round; // 一. 读取bam数据 size_t readNum = 0; if (inBamBuf.ReadStat() >= 0) @@ -553,27 +142,31 @@ int SerialBQSR() { for (int i = 0; i < bams.size(); ++i) { // 1. 对每个read,需要检查cigar是否合法,即没有两个连续的相同的cigar,而且需要将首尾的deletion处理掉,目前看好像没啥影响,我们忽略这一步 // 2. 对质量分数长度跟碱基长度不匹配的read,缺少的质量分数用默认值补齐,先忽略,后边有需要再处理 - // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 - // spdlog::info("bam idx: {}", i); + // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal + // quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 spdlog::info("bam + // idx: {}", i); BamWrap* bw = bams[i]; sd.init(); sd.parseBasic(bw); sd.rid = i + readNumSum; - if (sd.read_len <= 0) continue; + if (sd.read_len <= 0) + continue; PROF_START(clip_read); // 4. 对read的两端进行检测,去除(hardclip)adapter - ReadTransformer::hardClipAdaptorSequence(bw, sd); - if (sd.read_len <= 0) continue; + ReadTransformer::hardClipAdaptorSequence(bw, sd); + if (sd.read_len <= 0) + continue; // 5. 然后再去除softclip部分 ReadTransformer::hardClipSoftClippedBases(bw, sd); - if (sd.read_len <= 0) continue; + if (sd.read_len <= 0) + continue; // 应用所有的变换,计算samdata的相关信息 sd.applyTransformations(); PROF_END(gprof[GP_clip_read], clip_read); // 6. 更新每个read的platform信息,好像没啥用,暂时忽略 - const int nErrors = calculateIsSNPOrIndel(nsgv::gAuxVars[0], sd, isSNP, isIns, isDel); + const int nErrors = RecalFuncs::calculateIsSNPOrIndel(nsgv::gAuxVars[0], sd, isSNP, isIns, isDel); /*fprintf(gf[0], "%d\t", sd.read_len); for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); @@ -593,11 +186,12 @@ int SerialBQSR() { // vector baqArray; bool baqCalculated = false; if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { - baqCalculated = flatBAQArray(sd, baqArray); + baqCalculated = BAQ::flatBAQArray(sd, baqArray); } else { // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray); } - if (!baqCalculated) continue; + if (!baqCalculated) + continue; // 到这里,基本的数据都准备好了,后续就是进行bqsr的统计了 // 8. 计算这条read对应的协变量 @@ -627,70 +221,88 @@ int SerialBQSR() { // fprintf(gf[3], "\n"); // 9. 计算这条read需要跳过的位置 - PROF_START(known_sites); - calculateKnownSites(sd, nsgv::gAuxVars[0].vcfArr, skips); + PROF_START(read_vcf); + RecalFuncs::calculateKnownSites(sd, nsgv::gAuxVars[0].vcfArr, nsgv::gInBamHeader, skips); for (int ii = 0; ii < sd.read_len; ++ii) { skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; } - PROF_END(gprof[GP_read_vcf], known_sites); + PROF_GP_END(read_vcf); // fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0); // fprintf(gf[0], "\n"); // 10. 根据BAQ进一步处理snp,indel,得到处理后的数据 - calculateFractionalErrorArray(isSNP, baqArray, snpErrors); - calculateFractionalErrorArray(isIns, baqArray, insErrors); - calculateFractionalErrorArray(isDel, baqArray, delErrors); + PROF_START(frac_err); + RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); + RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); + RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); + PROF_GP_END(frac_err); // aggregate all of the info into our info object, and update the data // 11. 合并之前计算的数据,得到info,并更新bqsr table数据 ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); PROF_START(update_info); - updateRecalTablesForRead(info); + RecalUtils::updateRecalTablesForRead(info, recalTables); PROF_END(gprof[GP_update_info], update_info); } - // exit(0); - readNumSum += readNum; - inBamBuf.ClearAll(); // + inBamBuf.ClearAll(); // } spdlog::info("read count: {}", readNumSum); // 12. 创建总结数据 - collapseQualityScoreTableToReadGroupTable(nsgv::gRecalTables.readGroupTable, nsgv::gRecalTables.qualityScoreTable); - roundTableValues(nsgv::gRecalTables); + collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); + roundTableValues(recalTables); // 13. 量化质量分数 - QuantizationInfo quantInfo(nsgv::gRecalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); + QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); // 14. 输出结果 - RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, nsgv::gRecalTables); - - close_debug_files(); + RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables); return 0; } -// 需要支持vcf idx,tbi,csi三种索引方式 -// vcf和idx是一对 -// vcf.gz和tbi或csi是一对 - -// entrance of mark BQSR -int BaseRecalibrator() { - - PROF_START(whole_process); +// 在进行数据处理之前,初始化一些全局数据 +static void globalInit() { + open_debug_files(); /* bam */ nsgv::gInBamFp = sam_open_format(nsgv::gBqsrArg.INPUT_FILE.c_str(), "r", nullptr); if (!nsgv::gInBamFp) { spdlog::error("[{}] load sam/bam file failed.\n", __func__); - return -1; + exit(1); } hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header - + + /* 并行读取bam数据 */ + htsThreadPool htsPoolRead = {NULL, 0}; // , + htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); + if (!htsPoolRead.pool ) { + spdlog::error("[{}] failed to set up thread pool", __LINE__); + sam_close(nsgv::gInBamFp); + exit(1); + } + hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); + + if (!nsgv::gInBamHeader->hrecs) { + if (sam_hdr_fill_hrecs(nsgv::gInBamHeader) != 0) { + spdlog::error("[{}] failed to read sam header", __LINE__); + sam_close(nsgv::gInBamFp); + exit(1); + } + } + + // 1. 协变量数据相关初始化 + ContextCovariate::InitContextCovariate(nsgv::gBqsrArg); + CycleCovariate::InitCycleCovariate(nsgv::gBqsrArg); + + // 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue + // nsgv::gRecalTables.init(nsgv::gInBamHeader->hrecs->nrg); + // 初始化AuxVar nsgv::gAuxVars.resize(nsgv::gBqsrArg.NUM_THREADS); for (int i = 0; i < nsgv::gBqsrArg.NUM_THREADS; ++i) { @@ -698,51 +310,50 @@ int BaseRecalibrator() { nsgv::gAuxVars[i].faidx = fai_load(nsgv::gBqsrArg.REFERENCE_FILE.c_str()); if (nsgv::gAuxVars[i].faidx == 0) error("[%s] fail to load the fasta index.\n", __func__); - for (auto &vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { + for (auto& vcfFileName : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { nsgv::gAuxVars[i].vcfArr.push_back(VCFParser(vcfFileName, nsgv::gInBamHeader)); } + // 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue + nsgv::gAuxVars[i].recalTables.init(nsgv::gInBamHeader->hrecs->nrg); } - // (libraryId) - nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0); + // 0. 初始化一些全局数据 + // BAQ baq{BAQ::DEFAULT_GOP}; + BAQ::StaticInit(); + RecalDatum::StaticInit(); + QualityUtils::StaticInit(); + MathUtils::StaticInit(); - /* 并行读取bam数据 */ - htsThreadPool htsPoolRead = {NULL, 0}; // , - htsThreadPool htsPoolWrite = {NULL, 0}; // - htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); - htsPoolWrite.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); - if (!htsPoolRead.pool || !htsPoolWrite.pool) { - spdlog::error("[{}] failed to set up thread pool", __LINE__); - sam_close(nsgv::gInBamFp); - return -1; + // 初始化需要计算的event types + RecalUtils::initEventTypes(nsgv::gBqsrArg.computeIndelBQSRTables); + + // 2. 读取bam的read group + if (nsgv::gInBamHeader->hrecs->nrg == 0) { + spdlog::error("No RG tag found in the header!"); + exit(1); } - hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); - - return SerialBQSR(); - - // 读取known sites vcfs - for (const auto& ks : nsgv::gBqsrArg.KNOWN_SITES_VCFS) { - spdlog::info(" {}", ks); - bcf_srs_t* srs = bcf_sr_init(); - if (!bcf_sr_add_reader(srs, ks.c_str())) - error("Failed to read from %s: %s\n", !strcmp("-", ks.c_str()) ? "standard input" : ks.c_str(), bcf_sr_strerror(srs->errnum)); - nsgv::gKnownSitesVcfSrs.push_back(srs); - - while (bcf_sr_next_line(srs)) { - bcf1_t* line = srs->readers[0].buffer[0]; - cout << line->pos << '\t' << line->rlen << '\t' << line->n_allele << '\t' << line->n_info << endl; - } + for (int i = 0; i < nsgv::gInBamHeader->hrecs->nrg; ++i) { + // spdlog::info("rg: {}", nsgv::gInBamHeader->hrecs->rg[i].name); + ReadGroupCovariate::RgToId[nsgv::gInBamHeader->hrecs->rg[i].name] = i; + ReadGroupCovariate::IdToRg[i] = nsgv::gInBamHeader->hrecs->rg[i].name; } +} - /* 先实现串行的bqsr-phase-1 */ - - - +// 全局资源释放 +static void globalDestroy() { + close_debug_files(); +} +// entrance of mark BQSR +int BaseRecalibrator() { + int ret = 0; + PROF_START(whole_process); + globalInit(); + ret = SerialBQSR(); // 串行处理数据,生成recal table + globalDestroy(); sam_close(nsgv::gInBamFp); - PROF_END(gprof[GP_whole_process], whole_process); - return 0; + return ret; } diff --git a/src/bqsr/bqsr_funcs.cpp b/src/bqsr/bqsr_funcs.cpp index 6b2db1f..e69de29 100644 --- a/src/bqsr/bqsr_funcs.cpp +++ b/src/bqsr/bqsr_funcs.cpp @@ -1,403 +0,0 @@ -#include "bqsr_funcs.h" - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "dup_metrics.h" -#include "bqsr_args.h" -#include "read_ends.h" -#include "read_name_parser.h" - -using std::cerr; -using std::endl; -using std::map; -using std::set; -using std::unordered_map; -using std::vector; - -namespace nsgv { -extern BQSRArg gBqsrArg; -extern DuplicationMetrics gMetrics; -}; - -/* - * read - */ -int16_t computeDuplicateScore(BamWrap &bw) { - - return 1; -} - -/* - * Builds a read ends object that represents a single read. - * read - */ -void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) { - auto &k = *pKey; - auto &bc = bw.b->core; - k.read1FirstOfPair = bw.GetFirstOfPairFlag(); - k.read1ReferenceIndex = bc.tid; - k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); - k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; - - k.read1IndexInFile = index; - k.score = computeDuplicateScore(bw); - // Doing this lets the ends object know that it's part of a pair - if (bw.GetReadPairedFlag() && !bw.GetMateUnmappedFlag()) { - k.read2ReferenceIndex = bc.mtid; - } - // Fill in the location information for optical duplicates - if (!ReadNameParser::sWrongNameFormat) - rnParser.AddLocationInformation(bw.query_name(), pKey); - // cout << k.tile << ' ' << k.x << ' ' << k.y << endl; - // key - k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation; -} - -/* pairend read end */ -void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds) { - auto &pairedEnds = *pPairedEnds; - - int64_t bamIdx = fragEnd.read1IndexInFile; - const int matesRefIndex = fragEnd.read1ReferenceIndex; - const int matesCoordinate = fragEnd.read1Coordinate; - // Set orientationForOpticalDuplicates, which always goes by the first then - // the second end for the strands. NB: must do this before updating the - // orientation later. - if (fragEnd.read1FirstOfPair) { - pairedEnds.orientationForOpticalDuplicates = - ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), pairedEnds.orientation == ReadEnds::R); - } else { - pairedEnds.orientationForOpticalDuplicates = - ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, fragEnd.IsNegativeStrand()); - } - // If the other read is actually later, simply add the other read's data as - // read2, else flip the reads - if (matesRefIndex > pairedEnds.read1ReferenceIndex || - (matesRefIndex == pairedEnds.read1ReferenceIndex && matesCoordinate >= pairedEnds.read1Coordinate)) { - pairedEnds.read2ReferenceIndex = matesRefIndex; - pairedEnds.read2Coordinate = matesCoordinate; - pairedEnds.read2IndexInFile = bamIdx; - pairedEnds.orientation = - ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, fragEnd.IsNegativeStrand()); - - // if the two read ends are in the same position, pointing in opposite - // directions, the orientation is undefined and the procedure above will - // depend on the order of the reads in the file. To avoid this, we set - // it explicitly (to FR): - if (pairedEnds.read2ReferenceIndex == pairedEnds.read1ReferenceIndex && - pairedEnds.read2Coordinate == pairedEnds.read1Coordinate && pairedEnds.orientation == ReadEnds::RF) { - pairedEnds.orientation = ReadEnds::FR; - } - } else { - pairedEnds.read2ReferenceIndex = pairedEnds.read1ReferenceIndex; - pairedEnds.read2Coordinate = pairedEnds.read1Coordinate; - pairedEnds.read2IndexInFile = pairedEnds.read1IndexInFile; - pairedEnds.read1ReferenceIndex = matesRefIndex; - pairedEnds.read1Coordinate = matesCoordinate; - pairedEnds.read1IndexInFile = bamIdx; - pairedEnds.orientation = - ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), pairedEnds.orientation == ReadEnds::R); - pairedEnds.posKey = fragEnd.posKey; - } - pairedEnds.score += fragEnd.score; -} - -static inline bool closeEnough(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) { - return lhs != rhs && // no comparing an object to itself (checked using object identity)! - (lhs->tile != ReadEnds::NO_VALUE) && - (rhs->tile != ReadEnds::NO_VALUE) && // no comparing objects without locations - lhs->tile == rhs->tile && // and the same tile - abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance; -} - -static inline bool closeEnoughShort(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) { - return lhs != rhs && abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance; -} - -/** - * Finds which reads within the list of duplicates that are likely to be optical/co-localized duplicates of - * one another. Within each cluster of optical duplicates that is found, one read remains un-flagged for - * optical duplication and the rest are flagged as optical duplicates. The set of reads that are considered - * optical duplicates are indicated by returning "true" at the same index in the resulting boolean[] as the - * read appeared in the input list of physical locations. - * - * @param list a list of reads that are determined to be duplicates of one another - * @param keeper a single PhysicalLocation that is the one being kept as non-duplicate, and thus should never be - * annotated as an optical duplicate. May in some cases be null, or a PhysicalLocation not - * contained within the list! (always not be null!) - * @return a boolean[] of the same length as the incoming list marking which reads are optical duplicates - */ -static void findOpticalDuplicates(vector &readEndsArr, const ReadEnds *pBestRe, - vector *pOpticalDuplicateFlags) { - const int DEFAULT_OPTICAL_DUPLICATE_DISTANCE = 100; - const int DEFAULT_MAX_DUPLICATE_SET_SIZE = 300000; - - vector &opticalDuplicateFlags = *pOpticalDuplicateFlags; - // opticalDuplicateFlags.push_back(true); // for test - int len = readEndsArr.size(); - // If there is only one or zero reads passed in (so there are obviously no optical duplicates), - // or if there are too many reads (so we don't want to try to run this expensive n^2 algorithm), - // then just return an array of all false - if (len < 2 || len > DEFAULT_MAX_DUPLICATE_SET_SIZE) { - return; - } - - if (len >= 4) { - /** - * Compute the optical duplicates correctly in the case where the duplicate group could end up with transitive - * optical duplicates - * getOpticalDuplicatesFlagWithGraph - */ - // Make a graph where the edges are reads that lie within the optical duplicate pixel distance from each other, - // we will then use the union-find algorithm to cluster the graph and find optical duplicate groups - Graph opticalDistanceRelationGraph; - unordered_map> tileRGmap; - int keeperIndex = -1; - for (int i = 0; i < readEndsArr.size(); ++i) { - const ReadEnds *currentLoc = readEndsArr[i]; - if (currentLoc == pBestRe) - keeperIndex = i; - if (currentLoc->tile != ReadEnds::NO_VALUE) { - int key = currentLoc->tile; // ,read group - tileRGmap[key].push_back(i); - } - opticalDistanceRelationGraph.addNode(i); - } - // Since because finding adjacent optical duplicates is an O(n^2) operation, we can subdivide the input into its - // readgroups in order to reduce the amount of redundant checks across readgroups between reads. - // fillGraphFromAGroup - for (auto &entry : tileRGmap) { - auto &groupList = entry.second; - if (groupList.size() > 1) { - for (int i = 0; i < groupList.size(); ++i) { - int iIndex = groupList[i]; - const ReadEnds *currentLoc = readEndsArr[iIndex]; - for (int j = i + 1; j < groupList.size(); ++j) { - int jIndex = groupList[j]; - const ReadEnds *other = readEndsArr[jIndex]; - if (closeEnoughShort(currentLoc, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE)) - opticalDistanceRelationGraph.addEdge(iIndex, jIndex); - } - } - } - } - // Keep a map of the reads and their cluster assignments - unordered_map opticalDuplicateClusterMap; - opticalDistanceRelationGraph.cluster(&opticalDuplicateClusterMap); - unordered_map clusterToRepresentativeRead; - int keeperCluster = -1; - // Specially mark the keeper as specifically not a duplicate if it exists - if (keeperIndex >= 0) { - keeperCluster = opticalDuplicateClusterMap[keeperIndex]; - clusterToRepresentativeRead[keeperCluster] = keeperIndex; - } - - for (auto &entry : opticalDuplicateClusterMap) { - int recordIndex = entry.first; - int recordAssignedCluster = entry.second; - // If its not the first read we've seen for this cluster, mark it as an optical duplicate - auto repReadItr = clusterToRepresentativeRead.find(recordAssignedCluster); - if (repReadItr != clusterToRepresentativeRead.end() && recordIndex != keeperIndex) { - const ReadEnds *representativeLoc = readEndsArr[repReadItr->second]; - const ReadEnds *currentRecordLoc = readEndsArr[recordIndex]; - // If not in the keeper cluster, then keep the minX -> minY valued duplicate (note the tile must be - // equal for reads to cluster together) - if (!(keeperIndex >= 0 && recordAssignedCluster == keeperCluster) && - (currentRecordLoc->x < representativeLoc->x || - (currentRecordLoc->x == representativeLoc->x && currentRecordLoc->y < representativeLoc->y))) { - // Mark the old min as an optical duplicate, and save the new min - opticalDuplicateFlags[repReadItr->second] = true; - clusterToRepresentativeRead[recordAssignedCluster] = recordIndex; - } else { - // If a smaller read has already been visited, mark the test read as an optical duplicate - opticalDuplicateFlags[recordIndex] = true; - } - } else { - clusterToRepresentativeRead[recordAssignedCluster] = recordIndex; - } - } - } else { - /** - * Compute optical duplicates quickly in the standard case where we know that there won't be any transitive - * distances to worry about. Note, this is guaranteed to be correct when there are at most 2x reads from a - * readgroup or 3x with the keeper present - * getOpticalDuplicatesFlagFast - */ - // First go through and compare all the reads to the keeper - for (int i = 0; i < len; ++i) { - const ReadEnds *other = readEndsArr[i]; - opticalDuplicateFlags[i] = closeEnough(pBestRe, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE); - } - // Now go through and do each pairwise comparison not involving the actualKeeper - for (int i = 0; i < len; ++i) { - const ReadEnds *lhs = readEndsArr[i]; - if (lhs == pBestRe) // no comparisons to actualKeeper since those are all handled above - continue; - for (int j = i + 1; j < len; ++j) { - const ReadEnds *rhs = readEndsArr[j]; - if (rhs == pBestRe) // no comparisons to actualKeeper since those are all handled above - continue; - if (opticalDuplicateFlags[i] && opticalDuplicateFlags[j]) - continue; // both already marked, no need to check - if (closeEnough(lhs, rhs, DEFAULT_OPTICAL_DUPLICATE_DISTANCE)) { - // At this point we want to mark either lhs or rhs as duplicate. Either could have been marked - // as a duplicate of the keeper (but not both - that's checked above), so be careful about which - // one to now mark as a duplicate. - int index = opticalDuplicateFlags[j] ? i : j; - opticalDuplicateFlags[index] = true; - } - } - } - } -} - -/** - * Looks through the set of reads and identifies how many of the duplicates are - * in fact optical duplicates, and stores the data in the instance level histogram. - * - * We expect only reads with FR or RF orientations, not a mixture of both. - * - * In PCR duplicate detection, a duplicates can be a have FR and RF when fixing the orientation order to the first end - * of the mate. In optical duplicate detection, we do not consider them duplicates if one read as FR and the other RF - * when we order orientation by the first mate sequenced (read #1 of the pair). - */ -static int checkOpticalDuplicates(vector &readEndsArr, const ReadEnds *pBestRe) { - vector opticalDuplicateFlags(readEndsArr.size(), false); - // find OpticalDuplicates - findOpticalDuplicates(readEndsArr, pBestRe, &opticalDuplicateFlags); - int opticalDuplicates = 0; - for (int i = 0; i < opticalDuplicateFlags.size(); ++i) { - ReadEnds *pRe = const_cast(readEndsArr[i]); - if (opticalDuplicateFlags[i]) { - ++opticalDuplicates; - pRe->isOpticalDuplicate = true; - } else { - pRe->isOpticalDuplicate = false; - } - } - return opticalDuplicates; -} - -/** - * - */ -void trackOpticalDuplicates(vector &readEndsArr, const ReadEnds *pBestRe) { - bool hasFR = false, hasRF = false; - // Check to see if we have a mixture of FR/RF - for (auto pRe : readEndsArr) { - if (ReadEnds::FR == pRe->orientationForOpticalDuplicates) - hasFR = true; - else if (ReadEnds::RF == pRe->orientationForOpticalDuplicates) - hasRF = true; - } - - // Check if we need to partition since the orientations could have changed - int nOpticalDup; - if (hasFR && hasRF) { // need to track them independently - vector trackOpticalDuplicatesF; - vector trackOpticalDuplicatesR; - // Split into two lists: first of pairs and second of pairs, - // since they must have orientation and same starting end - for (auto pRe : readEndsArr) { - if (ReadEnds::FR == pRe->orientationForOpticalDuplicates) - trackOpticalDuplicatesF.push_back(pRe); - else if (ReadEnds::RF == pRe->orientationForOpticalDuplicates) - trackOpticalDuplicatesR.push_back(pRe); - else - cerr << "Found an unexpected orientation: " << pRe->orientation << endl; - } - // track the duplicates - int nOpticalDupF = checkOpticalDuplicates(trackOpticalDuplicatesF, pBestRe); - int nOpticalDupR = checkOpticalDuplicates(trackOpticalDuplicatesR, pBestRe); - nOpticalDup = nOpticalDupF + nOpticalDupR; - } else { // No need to partition - nOpticalDup = checkOpticalDuplicates(readEndsArr, pBestRe); - } - - // ,trackDuplicateCounts - ++nsgv::gMetrics.DuplicateCountHist[readEndsArr.size()]; - if (readEndsArr.size() > nOpticalDup) - ++nsgv::gMetrics.NonOpticalDuplicateCountHist[readEndsArr.size() - nOpticalDup]; - if (nOpticalDup) - ++nsgv::gMetrics.OpticalDuplicatesCountHist[nOpticalDup + 1]; -} - -/** - * Estimates the size of a library based on the number of paired end molecules observed - * and the number of unique pairs observed. - *

- * Based on the Lander-Waterman equation that states: - * C/X = 1 - exp( -N/X ) - * where - * X = number of distinct molecules in library - * N = number of read pairs - * C = number of distinct fragments observed in read pairs - */ -int64_t estimateLibrarySize(int64_t readPairs, int64_t uniqueReadPairs) { - int64_t librarySize = 0; - int64_t readPairDuplicates = readPairs - uniqueReadPairs; - auto f = [](double x, double c, double n) { return c / x - 1 + exp(-n / x); }; - if (readPairs > 0 && readPairDuplicates > 0) { - double m = 1.0; - double M = 100.0; - if (uniqueReadPairs >= readPairs || f(m * uniqueReadPairs, uniqueReadPairs, readPairs) < 0) { - cerr << "Invalid values for pairs and unique pairs: " << readPairs << ", " << uniqueReadPairs << endl; - return 0; - } - // find value of M, large enough to act as other side for bisection method - while (f(M * uniqueReadPairs, uniqueReadPairs, readPairs) > 0) { - M *= 10.0; - } - // use bisection method (no more than 40 times) to find solution - for (int i = 0; i < 40; ++i) { - double r = (m + M) / 2.0; - double u = f(r * uniqueReadPairs, uniqueReadPairs, readPairs); - if (u == 0) - break; - else if (u > 0) - m = r; - else if (u < 0) - M = r; - } - return uniqueReadPairs * (m + M) / 2.0; - } - return librarySize; -} - -/** - * Estimates the ROI (return on investment) that one would see if a library was sequenced to - * x higher coverage than the observed coverage. - * - * @param estimatedLibrarySize the estimated number of molecules in the library - * @param x the multiple of sequencing to be simulated (i.e. how many X sequencing) - * @param pairs the number of pairs observed in the actual sequencing - * @param uniquePairs the number of unique pairs observed in the actual sequencing - * @return a number z <= x that estimates if you had pairs*x as your sequencing then you - * would observe uniquePairs*z unique pairs. - */ -double estimateRoi(long estimatedLibrarySize, double x, long pairs, long uniquePairs) { - return estimatedLibrarySize * (1 - exp(-(x * pairs) / estimatedLibrarySize)) / uniquePairs; -} - -/** - * Calculates a histogram using the estimateRoi method to estimate the effective yield - * doing x sequencing for x=1..10. - */ -void calculateRoiHistogram(DuplicationMetrics &metrics) { - int64_t uniquePairs = metrics.READ_PAIRS_EXAMINED - metrics.READ_PAIR_DUPLICATES; - metrics.CoverageMult.resize(101); - for (int x = 1; x <= 100; ++x) { - metrics.CoverageMult[x] = - estimateRoi(metrics.ESTIMATED_LIBRARY_SIZE, x, metrics.READ_PAIRS_EXAMINED, uniquePairs); - } -} \ No newline at end of file diff --git a/src/bqsr/bqsr_funcs.h b/src/bqsr/bqsr_funcs.h deleted file mode 100644 index b3ff9c5..0000000 --- a/src/bqsr/bqsr_funcs.h +++ /dev/null @@ -1,141 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include "dup_metrics.h" - -using std::priority_queue; -using std::unordered_map; -using std::unordered_set; -using std::vector; - -/* */ -class BamWrap; -class ReadEnds; -class ReadNameParser; - -/* - * optical duplicationgraph - */ -template -struct Graph { // set? - vector nodes; // - unordered_map nodeIdxMap; - unordered_map> neighbors; // - - int addNode(const Node &singleton) { - int idx = -1; - if (nodeIdxMap.find(singleton) == nodeIdxMap.end()) { - nodes.push_back(singleton); - idx = nodes.size() - 1; - nodeIdxMap[singleton] = idx; - neighbors[idx].clear(); - } else - idx = nodeIdxMap[singleton]; - - return idx; - } - - /* bidirectional and public */ - void addEdge(Node &left, Node &right) { - int leftIndex = addNode(left); - if (left == right) - return; - int rightIndex = addNode(right); - addNeighbor(leftIndex, rightIndex); - addNeighbor(rightIndex, leftIndex); - } - - void addNeighbor(int fromNode, int toNode) { - unordered_set &fromNodesNeighbors = neighbors[fromNode]; - if (fromNodesNeighbors.find(toNode) == fromNodesNeighbors.end()) - fromNodesNeighbors.insert(toNode); - } - - /** - * returns the cluster map of connected components - * - * @return Nodes that point to the same integer are in the same cluster. - */ - void cluster(unordered_map *pClusterMap) { - auto &clusterMap = *pClusterMap; - vector vCluster(nodes.size(), 0); - for (int i = 0; i < vCluster.size(); ++i) vCluster[i] = i; - for (int i = 0; i < neighbors.size(); ++i) { - for (auto &j : neighbors[i]) joinNodes(j, i, &vCluster); - } - for (auto &node : nodes) { - clusterMap[node] = findRepNode(vCluster, nodeIdxMap[node]); - } - } - - // Part of Union-Find with Path Compression that joins to nodes to be part of the same cluster. - static void joinNodes(int nodeId1, int nodeId2, vector *pGrouping) { - auto &grouping = *pGrouping; - int repNode1 = findRepNode(grouping, nodeId1); - int repNode2 = findRepNode(grouping, nodeId2); - if (repNode1 == repNode2) - return; - grouping[repNode1] = repNode2; - } - - // Part of Union-Find with Path Compression to determine the duplicate set a particular UMI belongs to. - static int findRepNode(vector &grouping, int nodeId) { - int representativeUmi = nodeId; // All UMIs of a duplicate set will have the same reprsentativeUmi. - while (representativeUmi != grouping[representativeUmi]) representativeUmi = grouping[representativeUmi]; - while (nodeId != representativeUmi) { - int newUmiId = grouping[nodeId]; - grouping[nodeId] = representativeUmi; - nodeId = newUmiId; - } - return representativeUmi; - } -}; - -/* - * read - */ -int16_t computeDuplicateScore(BamWrap &bw); - -/* - * Builds a read ends object that represents a single read. - * read - */ -void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey); - -/* - * pairend read end - */ -void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds); - -/** - * Looks through the set of reads and identifies how many of the duplicates are - * in fact optical duplicates, and stores the data in the instance level histogram. - * Additionally sets the transient isOpticalDuplicate flag on each read end that is - * identified as an optical duplicate. - * - */ -void trackOpticalDuplicates(vector &readEndsArr, const ReadEnds *pBestRe); - -/* - * Estimates the size of a library based on the number of paired end molecules observed - * and the number of unique pairs observed. - */ -int64_t estimateLibrarySize(int64_t readPairs, int64_t uniqueReadPairs); - -/** - * Estimates the ROI (return on investment) that one would see if a library was sequenced to - * x higher coverage than the observed coverage. - **/ -double estimateRoi(long estimatedLibrarySize, double x, long pairs, long uniquePairs); - -/** - * Calculates a histogram using the estimateRoi method to estimate the effective yield - * doing x sequencing for x=1..10. - */ -void calculateRoiHistogram(DuplicationMetrics &metrics); \ No newline at end of file diff --git a/src/bqsr/bqsr_pipeline.cpp b/src/bqsr/bqsr_pipeline.cpp deleted file mode 100644 index 65ee25e..0000000 --- a/src/bqsr/bqsr_pipeline.cpp +++ /dev/null @@ -1,848 +0,0 @@ -#include "bqsr_pipeline.h" - -#include -#include -#include - -#include -#include -#include - -#include "dup_metrics.h" -#include "bqsr_args.h" -#include "bqsr_funcs.h" -#include "read_ends.h" -#include "read_name_parser.h" -#include "util/bam_buf.h" -#include "util/profiling.h" -#include "util/yarn.h" - -using std::vector; -using namespace std; - -namespace nsgv { - -extern BQSRArg gBqsrArg; // -extern samFile *gInBamFp; // bam -extern sam_hdr_t *gInBamHeader; // bam -extern DuplicationMetrics gMetrics; // -extern vector gNameParsers; -extern DupResult gDupRes; -extern PipelineArg gPipe; -}; // namespace nsgv - -/* pairendreadends,, ,*/ -static void markDupsForPairs(vector &vpRe, DPSet *dupIdx, MDSet *opticalDupIdx, - DPSet *repIdx, MDSet *notDupIdx = nullptr, - MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { -} - -/* pairedreadends, */ -static void markDupsForFrags(vector &vpRe, bool containsPairs, DPSet *dupIdx, - MDSet *notDupIdx = nullptr) { - if (containsPairs) { - for (auto pe : vpRe) { - if (!pe->IsPaired()) { - dupIdx->insert(pe->read1IndexInFile); - } - } - } else { - int maxScore = 0; - const ReadEnds *pBest = nullptr; - for (auto pe : vpRe) { - if (pe->score > maxScore || pBest == nullptr) { - maxScore = pe->score; - pBest = pe; - } - } - if (notDupIdx != nullptr) { - notDupIdx->insert(pBest->read1IndexInFile); - } - for (auto pe : vpRe) { - if (pe != pBest) { - dupIdx->insert(pe->read1IndexInFile); - } - } - } -} - -/* readend posreadend */ -static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { - auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::CorLittleThan); // - dst->insert(dst->end(), range.first, range.second); -} - -#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread)) -#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread)) - -/* pairs */ -static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, - DPSet *repIdx, MDSet *notDupIdx = nullptr, - MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { - // return; - vector vpCache; // reads - const ReadEnds *pReadEnd = nullptr; - for (auto &re : readEnds) { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // - vpCache.push_back(&re); // - else { - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, - notRepIdx); // - vpCache.clear(); - vpCache.push_back(&re); - pReadEnd = &re; - } - } - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); -} - -/* frags */ -static void processFrags(vector &readEnds, DPSet *dupIdx, MDSet *notDupIdx = nullptr) { - bool containsPairs = false; - bool containsFrags = false; - vector vpCache; // reads - const ReadEnds *pReadEnd = nullptr; - for (auto &re : readEnds) { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { - vpCache.push_back(&re); - containsPairs = containsPairs || re.IsPaired(); - containsFrags = containsFrags || !re.IsPaired(); - } else { - if (vpCache.size() > 1 && containsFrags) { - markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); - } - vpCache.clear(); - vpCache.push_back(&re); - pReadEnd = &re; - containsPairs = re.IsPaired(); - containsFrags = !re.IsPaired(); - } - } - if (vpCache.size() > 1 && containsFrags) { - markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); - } -} - - -/* */ -static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, - bool isPairCmp = false) { - if (leftArr.empty() || rightArr.empty()) { - return; - } - const size_t leftEndIdx = leftArr.size() - 1; - const size_t rightStartIdx = 0; - size_t leftSpan = 0; - size_t rightSpan = 0; - - while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { - leftSpan += 1; - if (leftSpan > leftEndIdx) { // (bug,?) - leftSpan = leftArr.size() - 1; - break; - } - } - - while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { - rightSpan += 1; - if (rightSpan == rightArr.size() - 1) // ,bug - break; - } - dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); - dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); - if (isPairCmp) - std::sort(dst->begin(), dst->end(), ReadEnds::PairLittleThan); - else - std::sort(dst->begin(), dst->end(), ReadEnds::FragLittleThan); -} - -/* fragsdup idx */ -static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬DupIdx, MarkDupDataArg *lastArg, - MarkDupDataArg *curArg) { - auto &lp = *lastArg; - auto &p = *curArg; - for (auto idx : dupIdx) { - lp.fragDupIdx.insert(idx); - p.fragDupIdx.erase(idx); - } - for (auto idx : notDupIdx) { - lp.fragDupIdx.erase(idx); - p.fragDupIdx.erase(idx); - } -} - -// for step 2 generate read ends -// multi-thread generate read ends -static void mtGenerateReadEnds(void *data, long idx, int tid) { - auto &p = *(PipelineArg *)data; - auto &rnParser = nsgv::gNameParsers[idx]; - int nThread = p.numThread; - auto &bams = p.readData.bams; - int64_t bamStartIdx = p.readData.bamStartIdx; - int64_t taskSeq = p.readData.taskSeq; - GenREData &genREData = p.genREData[p.genREOrder % p.GENBUFNUM]; - auto &pairs = genREData.pairsArr[idx]; - auto &frags = genREData.fragsArr[idx]; - auto &unpairedDic = genREData.unpairedDicArr[idx]; - - pairs.clear(); - frags.clear(); - unpairedDic.clear(); - - PROF_START(gen); - size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); - size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); - for (size_t i = start_id; i < end_id; ++i) { // read - BamWrap *bw = bams[i]; - const int64_t bamIdx = bamStartIdx + i; - if (bw->GetReadUnmappedFlag()) { - if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - break; - } else if (!bw->IsSecondaryOrSupplementary()) { // - ReadEnds fragEnd; - buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); - frags.push_back(fragEnd); // frag - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // pairendread - string key = bw->query_name(); - if (unpairedDic.find(key) == unpairedDic.end()) { - unpairedDic[key] = {taskSeq, fragEnd}; - } else { // pairend - auto &pairedEnds = unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - unpairedDic.erase(key); // pairend - } - } - } - } - PROF_END(tprof[TP_gen][tid], gen); - - PROF_START(sort_frag); - sort(frags.begin(), frags.end(), ReadEnds::FragLittleThan); - PROF_END(tprof[TP_sort_frag][tid], sort_frag); - - PROF_START(sort_pair); - sort(pairs.begin(), pairs.end(), ReadEnds::PairLittleThan); - PROF_END(tprof[TP_sort_pair][tid], sort_pair); -} - -static void doGenRE(PipelineArg &pipeArg) { - GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; - ReadData &readData = pipeArg.readData; - - // generate read ends - const int numThread = pipeArg.numThread; - - kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); - // genRESort,step - // read, - genREData.unpairedDic.clear(); - vector &pairs = genREData.pairsArr[numThread]; - pairs.clear(); - - int testNum = 0; - for (int i = 0; i < numThread; ++i) { - testNum += genREData.unpairedDicArr[i].size(); - for (auto &val : genREData.unpairedDicArr[i]) { - const string &key = val.first; - const ReadEnds &fragEnd = val.second.unpairedRE; - if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) { - genREData.unpairedDic[key] = {readData.taskSeq, fragEnd}; - } else { // pairend - auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - genREData.unpairedDic.erase(key); // pairend - } - } - } - sort(pairs.begin(), pairs.end(), ReadEnds::PairLittleThan); -} -// end for step 2 generate read ends - -// for step-3 sort -static void doSort(PipelineArg &pipeArg) { - // return; - GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; - SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; - SortMarkData &smd = *(SortMarkData *)sortData.dataPtr; - - smd.unpairedDic = genREData.unpairedDic; - - smd.pairs.clear(); - smd.frags.clear(); - const ReadEnds *pRE; - ReadEndsHeap fragsHeap; - ReadEndsHeap pairsHeap; - PROF_START(sort_pair); - pairsHeap.Init(&genREData.pairsArr); - while ((pRE = pairsHeap.Pop()) != nullptr) { - smd.pairs.push_back(*pRE); - } - PROF_END(gprof[GP_sort_pair], sort_pair); - PROF_START(sort_frag); - fragsHeap.Init(&genREData.fragsArr); - while ((pRE = fragsHeap.Pop()) != nullptr) { - smd.frags.push_back(*pRE); - } - PROF_END(gprof[GP_sort_frag], sort_frag); -} -// for step-4 sort -static void doMarkDup(PipelineArg &pipeArg) { - MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; - SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; - mdData.taskSeq = pipeArg.markDupOrder; - mdData.clear(); - - auto tmpPtr = mdData.dataPtr; - mdData.dataPtr = sortData.dataPtr; - sortData.dataPtr = tmpPtr; - - SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; - // pair - PROF_START(markdup_pair); - processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx); - PROF_END(gprof[GP_markdup_pair], markdup_pair); - // frag - PROF_START(markdup_frag); - processFrags(smd.frags, &mdData.fragDupIdx); - PROF_END(gprof[GP_markdup_frag], markdup_frag); -} - -template -static void refreshDupIdx(T &srcArr, T &insArr) { - for (auto dup : srcArr) { - insArr.insert(dup); - } -} -template -static void refreshNotDupIdx(T1 &srcArr, T2 &delArr) { - for (auto dup : srcArr) { - delArr.erase(dup); - } -} - -static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, - MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, - MarkDupData &lp) { - refreshDupIdx(dupIdx, lp.pairDupIdx); - refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx); - refreshDupIdx(repIdx, lp.pairRepIdx); - refreshNotDupIdx(notDupIdx, lp.pairDupIdx); - refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx); - refreshNotDupIdx(notRepIdx, lp.pairRepIdx); -} - -static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, - MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, - MarkDupData &lp, MarkDupData &p) { - refreshDupIdx(dupIdx, lp.pairDupIdx); - refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx); - refreshDupIdx(repIdx, lp.pairRepIdx); - refreshNotDupIdx(notDupIdx, lp.pairDupIdx); - refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx); - refreshNotDupIdx(notRepIdx, lp.pairRepIdx); - refreshNotDupIdx(notDupIdx, p.pairDupIdx); - refreshNotDupIdx(notOpticalDupIdx, p.pairOpticalDupIdx); - refreshNotDupIdx(notRepIdx, p.pairRepIdx); - refreshNotDupIdx(dupIdx, p.pairDupIdx); - refreshNotDupIdx(opticalDupIdx, p.pairOpticalDupIdx); - refreshNotDupIdx(repIdx, p.pairRepIdx); -} - -// -static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) { - SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - - vector reArr; - DPSet dupIdx; - MDSet opticalDupIdx; - DPSet repIdx; - MDSet notOpticalDupIdx; - MDSet notDupIdx; - MDSet notRepIdx; - // frags - getIntersectData(lsm.frags, csm.frags, &reArr); - processFrags(reArr, &dupIdx, ¬DupIdx); - refreshDupIdx(dupIdx, lp.fragDupIdx); - refreshNotDupIdx(dupIdx, cp.fragDupIdx); - refreshNotDupIdx(notDupIdx, lp.fragDupIdx); - refreshNotDupIdx(notDupIdx, cp.fragDupIdx); - - // pairs - reArr.clear(); - dupIdx.clear(); - notDupIdx.clear(); - - getIntersectData(lsm.pairs, csm.pairs, &reArr, true); - processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx); - refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, cp, lp); // cp,globaldup, -} - -// readends, lp -static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { - auto &interPairedData = lp.ckeyReadEndsMap; - SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // read - auto &lastUnpair = *itr; - auto &readName = lastUnpair.first; - auto &lastUnpairInfo = lastUnpair.second; - auto lastRE = lastUnpairInfo.unpairedRE; // read end - if (csm.unpairedDic.find(readName) != csm.unpairedDic.end()) { // read - auto &curUnpairInfo = csm.unpairedDic[readName]; - auto &curRE = curUnpairInfo.unpairedRE; - modifyPairedEnds(curRE, &lastRE); // lastRE,ReadEnds - CalcKey ck(lastRE); - auto &pairArr = interPairedData[ck]; - pairArr.push_back(lastRE); - // dictreadends - csm.unpairedDic.erase(readName); - itr = lsm.unpairedDic.erase(itr); - } else { - ++itr; - } - } -} - -// globallpreadends, global -static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { - auto &interPairedData = g.ckeyReadEndsMap; - SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // read - auto &lastUnpair = *itr; - auto &readName = lastUnpair.first; - auto &lastUnpairInfo = lastUnpair.second; - auto lastRE = lastUnpairInfo.unpairedRE; // read end - if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // globalread - auto &gUnpairInfo = g.unpairedDic[readName]; - auto &gRE = gUnpairInfo.unpairedRE; - modifyPairedEnds(lastRE, &gRE); // gRE,ReadEnds - CalcKey ck(gRE); - auto &pairArr = interPairedData[ck]; - pairArr.push_back(gRE); - // dictreadends - g.unpairedDic.erase(readName); - itr = lsm.unpairedDic.erase(itr); - } else { - ++itr; - } - } -} - -static void putDupinfoToGlobal(IntersectData &g, MarkDupData &lp) { - g.dupIdxArr.push_back(vector()); - auto &vIdx = g.dupIdxArr.back(); - lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); - vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); - std::sort(vIdx.begin(), vIdx.end()); - - g.opticalDupIdxArr.push_back(vector()); - auto &vOpticalIdx = g.opticalDupIdxArr.back(); - vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); - std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); - - g.repIdxArr.push_back(vector()); - auto &vRepIdx = g.repIdxArr.back(); - vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); - std::sort(vRepIdx.begin(), vRepIdx.end()); -} - -// for step-5 handle intersect data -static void doIntersect(PipelineArg &pipeArg) { - // spdlog::info("intersect order: {}", pipeArg.intersectOrder); - const int kInitIntersectOrder = 1; - IntersectData &g = pipeArg.intersectData; - MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; - MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder) % pipeArg.MARKBUFNUM]; - - SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - - // - processIntersectFragPairs(lp, cp); - - // lpnp - int64_t lastLeft = INT64_MIN, lastRight = INT64_MAX, curLeft = INT64_MAX, curRight = INT64_MAX; - if (lsm.pairs.size() > 0) { - lastLeft = lsm.frags[0].Left(); - lastRight = lsm.frags.back().Left(); - } - if (csm.pairs.size() > 0) { - curLeft = csm.frags[0].Left(); - curRight = csm.frags.back().Left(); - } - - - if (g.rightPos >= curLeft) { - spdlog::error("previous data can not contain readends included by next data block! {} {} {} {} {} {}", - lp.taskSeq, cp.taskSeq, g.rightPos, curLeft, lsm.pairs.size(), csm.pairs.size()); - } - g.rightPos = lastRight; - - findUnpairedInDatas(lp, cp); // lp - findUnpairedInGlobal(g, cp); // cpglobal - - // lp - TaskSeqDupInfo t; - for (auto itr = lp.ckeyReadEndsMap.begin(); itr != lp.ckeyReadEndsMap.end();) { - auto &ckVal = *itr; - auto &ck = ckVal.first; - auto &pairArr = ckVal.second; - getEqualRE(pairArr[0], lsm.pairs, &pairArr); // ,global - if (ck.Right() <= lastRight) { // , - if (ck.Left() >= curLeft) { // cp - getEqualRE(pairArr[0], csm.pairs, &pairArr); - } - // globalck - auto gitr = g.ckeyReadEndsMap.find(ck); - if (gitr != g.ckeyReadEndsMap.end()) { - auto &gPairArr = gitr->second; - pairArr.insert(pairArr.end(), gPairArr.begin(), gPairArr.end()); - g.ckeyReadEndsMap.erase(gitr); - } - sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); - itr = lp.ckeyReadEndsMap.erase(itr); - } else { - ++itr; - } - } - - // global - for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { - auto &ckVal = *itr; - auto &ck = ckVal.first; - auto &pairArr = ckVal.second; - if (ck.Left() >= lastLeft) { - getEqualRE(pairArr[0], lsm.pairs, &pairArr); - } - if (ck.Right() <= lastRight) { // ,reads - sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); - itr = g.ckeyReadEndsMap.erase(itr); - } else { - ++itr; - } - } - - // lpglobal - for (auto &ckVal : lp.ckeyReadEndsMap) { - auto &pairArr = g.ckeyReadEndsMap[ckVal.first]; - pairArr.insert(pairArr.end(), ckVal.second.begin(), ckVal.second.end()); - - } - lp.ckeyReadEndsMap.clear(); - // - refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); - // g - putDupinfoToGlobal(g, lp); - - for (auto &unPair : lsm.unpairedDic) { - g.unpairedDic.insert(unPair); - } -} - -static void *pipeRead(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); - inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM); - int64_t readNumSum = 0; - while (1) { - PROF_START(read_wait); - yarn::POSSESS(pipeArg.readSig); - yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 1); // ,bambuf - PROF_END(gprof[GP_read_wait], read_wait); - size_t readNum = 0; - PROF_START(read); - if (inBamBuf.ReadStat() >= 0) - readNum = inBamBuf.ReadBam(); // - PROF_END(gprof[GP_read], read); - if (readNum < 1) { - pipeArg.readFinish = 1; - yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // - break; - } - spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); - - pipeArg.readData.bams = inBamBuf.GetBamArr(); - - pipeArg.readData.bamStartIdx = readNumSum; - pipeArg.readData.taskSeq = pipeArg.readOrder; - - readNumSum += readNum; - inBamBuf.ClearAll(); // - pipeArg.readOrder += 1; // for next - yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // - } - spdlog::info("{} total reads processed", readNumSum); - return 0; -} -static void *pipeGenRE(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - auto &genREData = pipeArg.genREData; - // init generate read ends data by num_thread - int genREThread = pipeArg.numThread; - for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { - genREData[i].Init(genREThread); - } - - while (1) { - PROF_START(gen_wait); - yarn::POSSESS(pipeArg.readSig); - yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 0); // - yarn::POSSESS(pipeArg.genRESig); - PROF_END(gprof[GP_gen_wait], gen_wait); - - yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // BUFNUM - yarn::RELEASE(pipeArg.genRESig); // , - - if (pipeArg.readFinish) { // , - yarn::POSSESS(pipeArg.genRESig); - pipeArg.genREFinish = 1; - yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); - yarn::TWIST(pipeArg.readSig, yarn::BY, -1); - break; - } - /* bam,readends */ - PROF_START(gen); - doGenRE(pipeArg); - PROF_END(gprof[GP_gen], gen); - - yarn::POSSESS(pipeArg.genRESig); - pipeArg.genREOrder += 1; - yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); - yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // - } - return 0; -} -static void *pipeSort(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - - while (1) { - PROF_START(sort_wait); - yarn::POSSESS(pipeArg.genRESig); - yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, 0); // - yarn::RELEASE(pipeArg.genRESig); - PROF_END(gprof[GP_sort_wait], sort_wait); - - yarn::POSSESS(pipeArg.sortSig); - yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM - yarn::RELEASE(pipeArg.sortSig); - - if (pipeArg.genREFinish) { - // - while (pipeArg.sortOrder < pipeArg.genREOrder) { - yarn::POSSESS(pipeArg.sortSig); - yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM - yarn::RELEASE(pipeArg.sortSig); - - PROF_START(sort); - doSort(pipeArg); - PROF_END(gprof[GP_sort], sort); - - yarn::POSSESS(pipeArg.sortSig); - pipeArg.sortOrder += 1; - yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); - } - yarn::POSSESS(pipeArg.sortSig); - pipeArg.sortFinish = 1; - yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); - break; - } - /* readends */ - PROF_START(sort); - doSort(pipeArg); - PROF_END(gprof[GP_sort], sort); - - yarn::POSSESS(pipeArg.genRESig); - yarn::TWIST(pipeArg.genRESig, yarn::BY, -1); - - yarn::POSSESS(pipeArg.sortSig); - pipeArg.sortOrder += 1; - yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); - } - return 0; -} -static void *pipeMarkDup(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - - while (1) { - PROF_START(markdup_wait); - yarn::POSSESS(pipeArg.sortSig); - yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, 0); // - yarn::RELEASE(pipeArg.sortSig); - PROF_END(gprof[GP_markdup_wait], markdup_wait); - - yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); - yarn::RELEASE(pipeArg.markDupSig); - - if (pipeArg.sortFinish) { - // - while (pipeArg.markDupOrder < pipeArg.sortOrder) { - yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); - yarn::RELEASE(pipeArg.markDupSig); - - PROF_START(markdup); - doMarkDup(pipeArg); - PROF_END(gprof[GP_markdup], markdup); - - yarn::POSSESS(pipeArg.markDupSig); - pipeArg.markDupOrder += 1; - yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); - } - yarn::POSSESS(pipeArg.markDupSig); - pipeArg.markDupFinish = 1; - yarn::TWIST(pipeArg.markDupSig, yarn::TO, 2 + pipeArg.MARKBUFNUM); - break; - } - /* readends */ - PROF_START(markdup); - doMarkDup(pipeArg); - PROF_END(gprof[GP_markdup], markdup); - yarn::POSSESS(pipeArg.sortSig); - yarn::TWIST(pipeArg.sortSig, yarn::BY, -1); - - yarn::POSSESS(pipeArg.markDupSig); - pipeArg.markDupOrder += 1; - yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); - } - return 0; -} -static void *pipeIntersect(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - const int kInitIntersectOrder = 1; - pipeArg.intersectOrder = kInitIntersectOrder; - while (1) { - PROF_START(intersect_wait); - yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, kInitIntersectOrder); // - yarn::RELEASE(pipeArg.markDupSig); - PROF_END(gprof[GP_intersect_wait], intersect_wait); - - if (pipeArg.markDupFinish) { - while (pipeArg.intersectOrder < pipeArg.markDupOrder) { - PROF_START(intersect); - doIntersect(pipeArg); - PROF_END(gprof[GP_intersect], intersect); - pipeArg.intersectOrder += 1; - } - - break; - } - /* readends */ - PROF_START(intersect); - doIntersect(pipeArg); - PROF_END(gprof[GP_intersect], intersect); - - yarn::POSSESS(pipeArg.markDupSig); - yarn::TWIST(pipeArg.markDupSig, yarn::BY, -1); - - pipeArg.intersectOrder += 1; - } - return 0; -} - -/* ,global data */ -static void processLastData(PipelineArg &pipeArg) { - IntersectData &g = pipeArg.intersectData; - MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; - SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - int64_t lastLeft = INT64_MIN; - if (lsm.pairs.size() > 0) { - lastLeft = lsm.frags[0].Left(); - } - // global - TaskSeqDupInfo t; - for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { - auto &ckVal = *itr; - auto &ck = ckVal.first; - auto &pairArr = ckVal.second; - if (ck.Left() >= lastLeft) { - getEqualRE(pairArr[0], lsm.pairs, &pairArr); - } - sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan ); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); - itr = g.ckeyReadEndsMap.erase(itr); - } - // - refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp); - // g - putDupinfoToGlobal(g, lp); -} - -static void parallelPipeline() { - PipelineArg pipeArg(&nsgv::gDupRes); - pipeArg.numThread = nsgv::gBqsrArg.NUM_THREADS; - - pthread_t tidArr[5]; - pthread_create(&tidArr[0], 0, pipeRead, &pipeArg); - pthread_create(&tidArr[1], 0, pipeGenRE, &pipeArg); - pthread_create(&tidArr[2], 0, pipeSort, &pipeArg); - pthread_create(&tidArr[3], 0, pipeMarkDup, &pipeArg); - pthread_create(&tidArr[4], 0, pipeIntersect, &pipeArg); - for (int i = 0; i < 5; ++i) pthread_join(tidArr[i], 0); - PROF_START(merge_result); - processLastData(pipeArg); - PROF_END(gprof[GP_merge_result], merge_result); - - // spdlog::info("pipeArg size : {} GB", pipeArg.byteSize() / 1024.0 / 1024 / 1024); - - // size_t repNum = 0; - // for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); - // spdlog::info("rep num : {}", repNum); - - // spdlog::info("result size : {} GB", nsgv::gDupRes.byteSize() / 1024.0 / 1024 / 1024); -} - -/* , */ -void PipelineMarkDups() { -// if (nsgv::gBqsrArg.NUM_THREADS > 1) - return parallelPipeline(); - - PipelineArg pipeArg(&nsgv::gDupRes); - pipeArg.numThread = nsgv::gBqsrArg.NUM_THREADS; - BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); - inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM); - int64_t readNumSum = 0; - for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { - pipeArg.genREData[i].Init(pipeArg.numThread); - } - pipeArg.intersectOrder = 1; // do intersect 1 - while (1) { - size_t readNum = 0; - if (inBamBuf.ReadStat() >= 0) - readNum = inBamBuf.ReadBam(); // - if (readNum < 1) { - break; - } - spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); - - pipeArg.readData.bams = inBamBuf.GetBamArr(); - pipeArg.readData.bamStartIdx = readNumSum; - pipeArg.readData.taskSeq = pipeArg.readOrder; - // 1. do generate read ends - doGenRE(pipeArg); - pipeArg.genREOrder += 1; - // 2. do sort - doSort(pipeArg); - pipeArg.sortOrder += 1; - // 3. do markduplicate - doMarkDup(pipeArg); - pipeArg.markDupOrder += 1; - // 4. do intersect data - if (pipeArg.markDupOrder > 1) { - doIntersect(pipeArg); - pipeArg.intersectOrder += 1; - } - - readNumSum += readNum; - inBamBuf.ClearAll(); // - pipeArg.readOrder += 1; // for next - } - processLastData(pipeArg); -} diff --git a/src/bqsr/bqsr_pipeline.h b/src/bqsr/bqsr_pipeline.h deleted file mode 100644 index 9c1b6be..0000000 --- a/src/bqsr/bqsr_pipeline.h +++ /dev/null @@ -1,273 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "bqsr_types.h" - -struct ReadData { - vector bams; // read step output - int64_t bamStartIdx = 0; // bam,bam - int64_t taskSeq = 0; // -}; - -struct GenREData { - vector> pairsArr; // reads - vector> fragsArr; // reads - vector unpairedDicArr; // pair end - void Init(int nThread) { - for (int i = 0; i <= nThread; ++i) { // ,pairs, - pairsArr.push_back(vector()); - fragsArr.push_back(vector()); - unpairedDicArr.push_back(UnpairedNameMap()); - } - } - UnpairedNameMap unpairedDic; // sort step - size_t byteSize() { - size_t bytes = 0; - for (auto &v : pairsArr) - for (auto &r : v) bytes += sizeof(r); - for (auto &v : fragsArr) - for (auto &r : v) bytes += sizeof(r); - spdlog::info("genRE readends size : {} GB", bytes / 1024.0 / 1024 / 1024); - for (auto &m : unpairedDicArr) bytes += m.size() * sizeof(*m.begin()); - bytes += unpairedDic.size() * sizeof(*unpairedDic.begin()); - return bytes; - } -}; - -struct SortMarkData { - vector pairs; // reads - vector frags; // reads - UnpairedNameMap unpairedDic; // pair end - size_t byteSize() { - size_t bytes = 0; - for (auto &r : pairs) bytes += sizeof(r); - for (auto &r : frags) bytes += sizeof(r); - spdlog::info("sortmark readends size : {} GB", bytes / 1024.0 / 1024 / 1024); - bytes += unpairedDic.bucket_count() * sizeof(*unpairedDic.begin()); - return bytes; - } -}; - -struct SortData { - volatile void *dataPtr; // SortMarkData pointer -}; - -struct MarkDupData { - int64_t taskSeq = 0; // - DPSet pairDupIdx; // pairread - MDSet pairOpticalDupIdx; // opticalread - DPSet fragDupIdx; // fragread - DPSet pairRepIdx; // pairdupsetread - CkeyReadEndsMap ckeyReadEndsMap; - - volatile void *dataPtr; // SortMarkData pointer - - void clear() { - fragDupIdx.clear(); - pairDupIdx.clear(); - pairOpticalDupIdx.clear(); - pairRepIdx.clear(); - ckeyReadEndsMap.clear(); - } - - size_t byteSize() { - size_t bytes = 0; - bytes += pairDupIdx.size() * 100; - bytes += pairOpticalDupIdx.size() * 100; - bytes += fragDupIdx.size() * 100; - bytes += pairRepIdx.size() * 100; - return bytes; - } -}; - -struct DupResult { - vector> dupIdxArr; - vector> opticalDupIdxArr; - vector> repIdxArr; - size_t byteSize() { - size_t bytes = 0; - size_t tmp = 0; - for (auto &v : dupIdxArr) - for (auto &r : v) tmp += sizeof(r); - spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - tmp = 0; - for (auto &v : opticalDupIdxArr) - for (auto &r : v) tmp += sizeof(r); - spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - tmp = 0; - for (auto &v : repIdxArr) - for (auto &r : v) tmp += sizeof(r); - spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); - - return bytes; - } -}; - -struct IntersectData { - UnpairedNameMap unpairedDic; // pair end - CkeyReadEndsMap ckeyReadEndsMap; - int64_t rightPos = 0; - - // taskvector - vector> &dupIdxArr; - vector> &opticalDupIdxArr; - vector> &repIdxArr; - - IntersectData(DupResult *resPtr) - : dupIdxArr(resPtr->dupIdxArr), opticalDupIdxArr(resPtr->opticalDupIdxArr), repIdxArr(resPtr->repIdxArr) {} - - size_t byteSize() { - size_t bytes = 0; - bytes += unpairedDic.size() * sizeof(*unpairedDic.begin()); - for (auto &v : dupIdxArr) - for (auto &r : v) bytes += sizeof(r); - for (auto &v : opticalDupIdxArr) - for (auto &r : v) bytes += sizeof(r); - for (auto &v : repIdxArr) - for (auto &r : v) bytes += sizeof(r); - spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); - - return bytes; - } -}; - -// ,task, -struct PipelineArg { - static const int GENBUFNUM = 2; - static const int SORTBUFNUM = 2; - static const int MARKBUFNUM = 3; - uint64_t readOrder = 0; - uint64_t genREOrder = 0; - uint64_t sortOrder = 0; - uint64_t markDupOrder = 0; - uint64_t intersectOrder = 0; - int numThread = 0; - - volatile int readFinish = 0; - volatile int genREFinish = 0; - volatile int sortFinish = 0; - volatile int markDupFinish = 0; - - yarn::lock_t *readSig; - yarn::lock_t *genRESig; - yarn::lock_t *sortSig; - yarn::lock_t *markDupSig; - - PipelineArg(DupResult *resPtr) : intersectData(resPtr) { - readSig = yarn::NEW_LOCK(0); // 1, bufferbambuf, - genRESig = yarn::NEW_LOCK(0); // 2, buffer - sortSig = yarn::NEW_LOCK(0); - markDupSig = yarn::NEW_LOCK(0); - for (int i = 0; i < SORTBUFNUM; ++i) { - sortData[i].dataPtr = &sortMarkData[i]; - } - for (int i = 0; i < MARKBUFNUM; ++i) { - markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM]; - } - } - - SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM]; - - // for step-1 read - ReadData readData; - // for step-2 generate readends - GenREData genREData[GENBUFNUM]; - // for step-3 sort each thread frags and pairs - SortData sortData[SORTBUFNUM]; - // for step-4 mark duplicate - MarkDupData markDupData[MARKBUFNUM]; - // for step-5 deal with intersect data - IntersectData intersectData; - - size_t byteSize() { - size_t bytes = 0; - - size_t tmp = 0; - for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize(); - bytes += tmp; - spdlog::info("sortMarkData size : {} GB", tmp / 1024.0 / 1024 / 1024); - for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize(); - bytes += tmp; - spdlog::info("genREData size : {} GB", tmp / 1024.0 / 1024 / 1024); - for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize(); - bytes += tmp; - spdlog::info("markDupData size : {} GB", tmp / 1024.0 / 1024 / 1024); - tmp += intersectData.byteSize(); - bytes += tmp; - spdlog::info("intersectData size : {} GB", tmp / 1024.0 / 1024 / 1024); - - return bytes; - } -}; - -struct REArrIdIdx { - int arrId = 0; // - uint64_t arrIdx = 0; // - const ReadEnds *pE = nullptr; -}; - -struct REFragGreaterThan { - bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return ReadEnds::FragLittleThan(*b.pE, *a.pE); } -}; - -struct REPairGreaterThan { - bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return ReadEnds::PairLittleThan(*b.pE, *a.pE); - } -}; - -template -struct ReadEndsHeap { - // task vector - vector> *arr2d; - priority_queue, CMP> minHeap; - uint64_t popNum = 0; - - int Init(vector> *_arr2d) { - arr2d = _arr2d; - if (arr2d == nullptr) { - return -1; - } - for (int i = 0; i < arr2d->size(); ++i) { - auto &v = (*arr2d)[i]; - if (!v.empty()) { - minHeap.push({i, 1, &v[0]}); - } - } - return 0; - } - - const ReadEnds *Pop() { - const ReadEnds *ret = nullptr; - if (!minHeap.empty()) { - auto minVal = minHeap.top(); - minHeap.pop(); - ++popNum; - ret = minVal.pE; - auto &v = (*arr2d)[minVal.arrId]; - if (v.size() > minVal.arrIdx) { - minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]}); - } - } - return ret; - } - - uint64_t Size() { - uint64_t len = 0; - if (arr2d != nullptr) { - for (auto &v : *arr2d) { - len += v.size(); - } - } - return len - popNum; - } -}; - -// mark duplicate -void PipelineMarkDups(); \ No newline at end of file diff --git a/src/bqsr/bqsr_types.h b/src/bqsr/bqsr_types.h deleted file mode 100644 index 47fdd73..0000000 --- a/src/bqsr/bqsr_types.h +++ /dev/null @@ -1,312 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "read_ends.h" -#include "util/bam_buf.h" - -using std::priority_queue; -using std::set; -using std::string; -using std::unordered_set; -using std::vector; - -/* readendreadend */ -struct UnpairedREInfo { - int64_t taskSeq; // - ReadEnds unpairedRE; -}; - -/* pair,,read1read2 */ -struct CalcKey { - int8_t orientation = -1; - int32_t read1ReferenceIndex = -1; - int32_t read1Coordinate = -1; - int32_t read2ReferenceIndex = -1; - int32_t read2Coordinate = -1; - int64_t left = -1; - int64_t right = -1; - - CalcKey() {} - static CalcKey MaxCK() { - CalcKey ck; - ck.left = ck.right = INT64_MAX; - return ck; - } - static CalcKey MinCK() { - CalcKey ck; - ck.left = ck.right = INT64_MIN; - return ck; - } - - CalcKey(const ReadEnds &re) { - orientation = re.orientation; - read1ReferenceIndex = re.read1ReferenceIndex; - read1Coordinate = re.read1Coordinate; - read2ReferenceIndex = re.read2ReferenceIndex; - read2Coordinate = re.read2Coordinate; - left = Read1Pos(); - right = Read2Pos(); - } - - int64_t Read1Pos() const { return BamWrap::bam_global_pos(read1ReferenceIndex, read1Coordinate); } - int64_t Read2Pos() const { return BamWrap::bam_global_pos(read2ReferenceIndex, read2Coordinate); } - int64_t Left() const { return left; } - int64_t Right() const { return right; } - - bool operator<(const CalcKey &o) const { - int comp = read1ReferenceIndex - o.read1ReferenceIndex; - if (comp == 0) - comp = read1Coordinate - o.read1Coordinate; - if (comp == 0) - comp = read2ReferenceIndex - o.read2ReferenceIndex; - if (comp == 0) - comp = read2Coordinate - o.read2Coordinate; - // orientation, - if (comp == 0) - comp = orientation - o.orientation; - return comp < 0; - } - bool operator <= (const CalcKey &o) const { - return *this < o || *this == o; - } - bool operator==(const CalcKey &o) const { - return read1ReferenceIndex == o.read1ReferenceIndex && read1Coordinate == o.read1Coordinate && - orientation == o.orientation && read2ReferenceIndex == o.read2ReferenceIndex && - read2Coordinate == o.read2Coordinate; - } - bool operator<(const ReadEnds &o) const { - int comp = read1ReferenceIndex - o.read1ReferenceIndex; - if (comp == 0) - comp = read1Coordinate - o.read1Coordinate; - if (comp == 0) - comp = read2ReferenceIndex - o.read2ReferenceIndex; - if (comp == 0) - comp = read2Coordinate - o.read2Coordinate; - // orientation, - if (comp == 0) - comp = orientation - o.orientation; - return comp < 0; - } - std::size_t operator()() const { - size_t h1 = read1ReferenceIndex; - h1 = (h1 << 40) | (read1Coordinate << 8) | orientation; - size_t h2 = read2ReferenceIndex; - h2 = (h2 << 32) | read2Coordinate; - return std::hash()(h1) ^ std::hash()(h2); - } -}; - -struct CalcKeyHash { - std::size_t operator()(const CalcKey &o) const { return o(); } -}; - -struct CalcKeyEqual { - bool operator()(const CalcKey &o1, const CalcKey &o2) const { return o1 == o2; } -}; - -/* */ -struct DupInfo { - int16_t dupSet = 0; // dup set size - uint16_t repIdxHigh = 0; // read - uint32_t repIdxLow = 0; - int64_t idx; - - DupInfo() : DupInfo(-1, 0, 0) {} - DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) {} - DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), dupSet(dupSet_) { - repIdxHigh = repIdx_ >> 32; - repIdxLow = (uint32_t)repIdx_; - } - int64_t GetRepIdx() { - int64_t repIdx = repIdxHigh; - repIdx = (repIdx << 32) | repIdxLow; - return repIdx; - } - bool operator<(const DupInfo &o) const { return idx < o.idx; } - bool operator>(const DupInfo &o) const { return idx > o.idx; } - operator int64_t() const { return idx; } -}; - -struct DupInfoHash { - std::size_t operator()(const DupInfo &o) const { return std::hash()(o.idx); } -}; - -struct DupInfoEqual { - bool operator()(const DupInfo &o1, const DupInfo &o2) const { return o1.idx == o2.idx; } - bool operator()(const DupInfo &o1, const int64_t &o2) const { return o1.idx == o2; } - bool operator()(const int64_t &o1, const DupInfo &o2) const { return o1 == o2.idx; } -}; - -using MDMap = tsl::robin_map; - -template -// using MDSet = set; -// using MDSet = unordered_set; -using MDSet = tsl::robin_set; - -template -// using DPSet = set; -// using DPSet = unordered_set; -using DPSet = tsl::robin_set; - -template -using CalcSet = set; -// using CalcSet = tsl::robin_set; - -using ReadEndsSet = tsl::robin_set; - -/* pair read, */ -struct TaskSeqDupInfo { - DPSet dupIdx; - MDSet opticalDupIdx; - DPSet repIdx; - MDSet notDupIdx; - MDSet notOpticalDupIdx; - MDSet notRepIdx; -}; - -/* pair,read endread end */ -struct UnpairedPosInfo { - int unpairedNum = 0; - int64_t taskSeq; - vector pairArr; - MDSet readNameSet; -}; -// typedef unordered_map UnpairedNameMap; -// typedef unordered_map UnpairedPositionMap; - -typedef tsl::robin_map UnpairedNameMap; // read name,pair read -//typedef map UnpairedNameMap; // read name,pair read -typedef tsl::robin_map UnpairedPositionMap; // ,readread -// typedef map> CkeyReadEndsMap; // calckey,readEnds -typedef unordered_map, CalcKeyHash, CalcKeyEqual> - CkeyReadEndsMap; // calckey,readEnds -// typedef tsl::robin_map, CalcKeyHash, CalcKeyEqual> CkeyReadEndsMap; // -// calckey,readEnds - -/* */ -struct MarkDupDataArg { - int64_t taskSeq; // - int64_t bamStartIdx; // vBambambam - vector bams; // bam read - vector pairs; // reads - vector frags; // reads - DPSet pairDupIdx; // pairread - MDSet pairOpticalDupIdx; // opticalread - DPSet fragDupIdx; // fragread - DPSet pairRepIdx; // pairdupsetread - MDSet pairSingletonIdx; // readread pair - UnpairedNameMap unpairedDic; // pair end - UnpairedPositionMap unpairedPosArr; // ReadEndReadEnd, -}; - -/* - * , - */ -template -struct PairArrIdIdx { - int arrId = 0; - uint64_t arrIdx = 0; - T dupIdx = 0; -}; - -template -struct IdxGreaterThan { - bool operator()(const PairArrIdIdx &a, const PairArrIdIdx &b) { return a.dupIdx > b.dupIdx; } -}; - -template -struct DupIdxQueue { - // task vector - - // task,indexvector,vector, - vector> *dupIdx2DArr; - priority_queue, vector>, IdxGreaterThan> minHeap; - uint64_t popNum = 0; - - int Init(vector> *_dupIdx2DArr) { - dupIdx2DArr = _dupIdx2DArr; - if (dupIdx2DArr == nullptr) { - return -1; - } - for (int i = 0; i < dupIdx2DArr->size(); ++i) { - auto &v = (*dupIdx2DArr)[i]; - if (!v.empty()) { - minHeap.push({i, 1, v[0]}); - } - } - return 0; - } - - T Pop() { - T ret = -1; - if (!minHeap.empty()) { - auto idx = minHeap.top(); - minHeap.pop(); - ++popNum; - ret = idx.dupIdx; - auto &v = (*dupIdx2DArr)[idx.arrId]; - if (v.size() > idx.arrIdx) { - minHeap.push({idx.arrId, idx.arrIdx + 1, v[idx.arrIdx]}); - } - } - return ret; - } - - uint64_t Size() { - uint64_t len = 0; - if (dupIdx2DArr != nullptr) { - for (auto &v : *dupIdx2DArr) { - len += v.size(); - } - } - return len - popNum; - } - - uint64_t RealSize(const string fileName) { - if (this->Size() == 0) { - return 0; - } - uint64_t len = 0; - auto preTop = minHeap.top(); - DupInfo dupIdx = this->Pop(); - DupInfo nextDup = dupIdx; - auto topIdx = minHeap.top(); - - ofstream ofs(fileName); // ofstream ofs1(filePrefix + ".odup"); - - while (dupIdx != -1) { - - ofs << dupIdx.idx << endl; // ofs1 << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; - - ++len; - while (true) { - topIdx = minHeap.top(); - nextDup = this->Pop(); - if (nextDup != dupIdx) { - dupIdx = nextDup; - break; - } else { - cout << "the same dup: " << dupIdx << '\t' << preTop.arrId << '\t' << preTop.arrIdx << '\t' - << preTop.dupIdx << '\t' << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx - << endl; - } - } - - dupIdx = nextDup; - preTop = topIdx; - } - ofs.close(); // ofs1.close(); - cout << "RealSize: " << len << endl; - return len; - } -}; \ No newline at end of file diff --git a/src/bqsr/class_static_var.cpp b/src/bqsr/class_static_var.cpp deleted file mode 100644 index fdb304d..0000000 --- a/src/bqsr/class_static_var.cpp +++ /dev/null @@ -1,2 +0,0 @@ -#include "read_name_parser.h" -bool ReadNameParser::sWrongNameFormat = false; \ No newline at end of file diff --git a/src/bqsr/covariate.cpp b/src/bqsr/covariate.cpp index a2c0ab9..ecd31cf 100644 --- a/src/bqsr/covariate.cpp +++ b/src/bqsr/covariate.cpp @@ -25,7 +25,7 @@ int CycleCovariate::MAXIMUM_CYCLE_VALUE; // 对一条read计算协变量(该协变量被上一个read用过) void CovariateUtils::ComputeCovariates(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues) { - ReadGroupCovariate::RecordValues(sd, header, values, recordIndelValues); + // ReadGroupCovariate::RecordValues(sd, header, values, recordIndelValues); BaseQualityCovariate::RecordValues(sd, header, values, recordIndelValues); ContextCovariate::RecordValues(sd, header, values, recordIndelValues); CycleCovariate::RecordValues(sd, header, values, recordIndelValues); diff --git a/src/bqsr/dup_metrics.h b/src/bqsr/dup_metrics.h deleted file mode 100644 index cb3d0e7..0000000 --- a/src/bqsr/dup_metrics.h +++ /dev/null @@ -1,72 +0,0 @@ -#pragma once - -#include - -#include -#include - -#include "bqsr_types.h" - -using std::string; -using std::vector; - -/* - * - */ -struct DuplicationMetrics { - /** - * The library on which the duplicate marking was performed. - */ - string LIBRARY = ""; - /** - * The number of mapped reads examined which did not have a mapped mate pair, - * either because the read is unpaired, or the read is paired to an unmapped mate. - */ - uint64_t UNPAIRED_READS_EXAMINED = 0; - /** - * The number of mapped read pairs examined. (Primary, non-supplemental) - */ - uint64_t READ_PAIRS_EXAMINED = 0; - /** - * The number of reads that were either secondary or supplementary - */ - uint64_t SECONDARY_OR_SUPPLEMENTARY_RDS = 0; - /** - * The total number of unmapped reads examined. (Primary, non-supplemental) - */ - uint64_t UNMAPPED_READS = 0; - /** - * The number of fragments that were marked as duplicates. - */ - uint64_t UNPAIRED_READ_DUPLICATES = 0; - /** - * The number of read pairs that were marked as duplicates. - */ - uint64_t READ_PAIR_DUPLICATES = 0; - /** - * The number of read pairs duplicates that were caused by optical duplication. - * Value is always < READ_PAIR_DUPLICATES, which counts all duplicates regardless of source. - */ - uint64_t READ_PAIR_OPTICAL_DUPLICATES = 0; - /** - * The fraction of mapped sequence that is marked as duplicate. - */ - double PERCENT_DUPLICATION = 0.0; - /** - * The estimated number of unique molecules in the library based on PE duplication. - */ - uint64_t ESTIMATED_LIBRARY_SIZE = 0; - - // - vector CoverageMult; - - // ,4,4 - MDMap DuplicateCountHist; - MDMap NonOpticalDuplicateCountHist; - MDMap OpticalDuplicatesCountHist; - - // for test - MDSet singletonReads; - MDSet dupReads; // for test - MDSet bestReads; -}; \ No newline at end of file diff --git a/src/bqsr/read_ends.h b/src/bqsr/read_ends.h deleted file mode 100644 index fa5257a..0000000 --- a/src/bqsr/read_ends.h +++ /dev/null @@ -1,210 +0,0 @@ -/* -Description: read -ends, - -Copyright : All right reserved by ICT - -Author : Zhang Zhonghai -Date : 2023/11/3 -*/ -#pragma once - -#include -#include -#include - -#include - -/** - * Small interface that provides access to the physical location information - * about a cluster. All values should be defaulted to -1 if unavailable. - * ReadGroup and Tile should only allow non-zero positive integers, x and y - * coordinates may be negative. - */ -struct PhysicalLocation { - static const int NO_VALUE = -1; - /** - * Small class that provides access to the physical location information - * about a cluster. All values should be defaulted to -1 if unavailable. - * Tile should only allow non-zero positive integers, x and y coordinates - * must be non-negative. This is different from PhysicalLocationShort in - * that the x and y positions are ints, not shorts thus, they do not - * overflow within a HiSeqX tile. - */ - int16_t tile = -1; - // int32_t x = -1; - // int32_t y = -1; - // This is a bug in picard Markduplicates, because some tile coordinates exceede the range of int16_t - int16_t x = -1; - int16_t y = -1; -}; - -/* read ends,picard ReadEndsForMarkDuplicates*/ -struct ReadEnds : PhysicalLocation { - static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5; - /* bam */ - bool read1FirstOfPair = true; - /* ReadEnds */ - /** Little struct-like class to hold read pair (and fragment) end data for - * duplicate marking. */ - // int16_t libraryId; // , - int8_t orientation = -1; - int32_t read1ReferenceIndex = -1; - int32_t read1Coordinate = -1; - int32_t read2ReferenceIndex = -1; - // This field is overloaded for flow based processing as the end coordinate of read 1. (paired reads not supported) - int32_t read2Coordinate = -1; - /* Additional information used to detect optical dupes */ - // int16_t readGroup = -1; bamread - // group,normaltumor - /** For optical duplicate detection the orientation matters regard to 1st or - * 2nd end of a mate */ - int8_t orientationForOpticalDuplicates = -1; - /** A *transient* flag marking this read end as being an optical duplicate. - */ - bool isOpticalDuplicate = false; - - /* ReadEndsForMarkDuplicates */ - /** Little struct-like class to hold read pair (and fragment) end data for - * MarkDuplicatesWithMateCigar **/ - int16_t score = 0; - int64_t read1IndexInFile = -1; - int64_t read2IndexInFile = -1; - int64_t duplicateSetSize = -1; - - /* ReadEndsForMarkDuplicatesWithBarcodes () */ - // int32_t barcode = 0; // primary barcode for this read (and pair) - // int32_t readOneBarcode = 0; // read one barcode, 0 if not present - // int32_t readTwoBarcode = 0; // read two barcode, 0 if not present or not - // paired - - /* zzh */ - int64_t posKey = -1; // return (int64_t)tid << - // MAX_CONTIG_LEN_SHIFT | (int64_t)pos; (clip,map) - - /* ,readends,task */ - int oprateTime = 0; - - /* pairend read, */ - static int8_t GetOrientationByte(bool read1NegativeStrand, bool read2NegativeStrand) { - if (read1NegativeStrand) { - if (read2NegativeStrand) - return RR; - else - return RF; - } else { - if (read2NegativeStrand) - return FR; - else - return FF; - } - } - - /* readends() */ - static bool AreComparableForDuplicates(const ReadEnds &lhs, const ReadEnds &rhs, bool compareRead2) { - bool areComparable = true; - areComparable = lhs.read1ReferenceIndex == rhs.read1ReferenceIndex && - lhs.read1Coordinate == rhs.read1Coordinate && lhs.orientation == rhs.orientation; - if (areComparable && compareRead2) { - areComparable = - lhs.read2ReferenceIndex == rhs.read2ReferenceIndex && lhs.read2Coordinate == rhs.read2Coordinate; - } - return areComparable; - } - - /* */ - bool IsPositiveStrand() const { return orientation == F; } - - /* pairend */ - bool IsPaired() const { return read2ReferenceIndex != -1; } - - bool IsNegativeStrand() const { return orientation == R; } - - // ,ab,AreComparableForDuplicates - static inline bool ReadLittleThan(const ReadEnds &a, const ReadEnds &b, bool compareRead2 = false) { - int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; - if (comp == 0) - comp = a.read1Coordinate - b.read1Coordinate; - if (compareRead2) { - if (comp == 0) - comp = a.read2ReferenceIndex - b.read2ReferenceIndex; - if (comp == 0) - comp = a.read2Coordinate - b.read2Coordinate; - } - if (comp == 0) - comp = a.orientation - b.orientation; - - return comp < 0; - } - - static bool FragLittleThan(const ReadEnds &a, const ReadEnds &b) { - int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; - if (comp == 0) - comp = a.read1Coordinate - b.read1Coordinate; - if (comp == 0) // ,bam,read2ReferenceIndex, - comp = a.orientation - b.orientation; - if (comp == 0) - comp = a.read2ReferenceIndex - b.read2ReferenceIndex; - if (comp == 0) - comp = a.read2Coordinate - b.read2Coordinate; -// if (comp == 0) -// comp = a.tile - b.tile; -// if (comp == 0) -// comp = a.x - b.x; // picardbug,shortx,y, -// if (comp == 0) -// comp - a.y - b.y; - if (comp == 0) - comp = (int)(a.read1IndexInFile - b.read1IndexInFile); - if (comp == 0) - comp = (int)(a.read2IndexInFile - b.read2IndexInFile); - return comp < 0; - } - - static bool PairLittleThan(const ReadEnds &a, const ReadEnds &b) { - int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; - if (comp == 0) - comp = a.read1Coordinate - b.read1Coordinate; - if (comp == 0) - comp = a.read2ReferenceIndex - b.read2ReferenceIndex; - if (comp == 0) - comp = a.read2Coordinate - b.read2Coordinate; - if (comp == 0) // ,, - comp = a.orientation - b.orientation; -// if (comp == 0) -// comp = a.tile - b.tile; -// if (comp == 0) -// comp = a.x - b.x; // picardbug,shortx,y, -// if (comp == 0) -// comp - a.y - b.y; - if (comp == 0) - comp = (int)(a.read1IndexInFile - b.read1IndexInFile); - if (comp == 0) - comp = (int)(a.read2IndexInFile - b.read2IndexInFile); - return comp < 0; - } - - static bool CorLittleThan(const ReadEnds &a, const ReadEnds &b) { - int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; - if (comp == 0) - comp = a.read1Coordinate - b.read1Coordinate; - if (comp == 0) - comp = a.read2ReferenceIndex - b.read2ReferenceIndex; - if (comp == 0) - comp = a.read2Coordinate - b.read2Coordinate; - if (comp == 0) // ,, - comp = a.orientation - b.orientation; - return comp < 0; - } - - // for pairs only - int64_t Left() { return BamWrap::bam_global_pos(read1ReferenceIndex, read1Coordinate); } - int64_t Right() { return BamWrap::bam_global_pos(read2ReferenceIndex, read2Coordinate); } -}; - -struct ReadEndsHash { - std::size_t operator()(const ReadEnds &o) const { return std::hash()(o.read1IndexInFile); } -}; - -struct ReadEndsEqual { - bool operator()(const ReadEnds &o1, const ReadEnds &o2) const { return o1.read1IndexInFile == o2.read1IndexInFile; } -}; \ No newline at end of file diff --git a/src/bqsr/read_name_parser.h b/src/bqsr/read_name_parser.h deleted file mode 100644 index 817bb31..0000000 --- a/src/bqsr/read_name_parser.h +++ /dev/null @@ -1,224 +0,0 @@ -/* -Description: readname,tile, x, y - -Copyright : All right reserved by ICT - -Author : Zhang Zhonghai -Date : 2023/11/6 -*/ -#ifndef READ_NAME_PARSER_H_ -#define READ_NAME_PARSER_H_ - -#include -#include - -#include -#include - -#include "read_ends.h" - -// using std::regex; -using std::cmatch; -using std::regex; -using std::string; - - - -/** - * Provides access to the physical location information about a cluster. - * All values should be defaulted to -1 if unavailable. ReadGroup and Tile - * should only allow non-zero positive integers, x and y coordinates may be - * negative. - */ -struct ReadNameParser { - /** - * The read name regular expression (regex) is used to extract three pieces - * of information from the read name: tile, x location, and y location. Any - * read name regex should parse the read name to produce these and only - * these values. An example regex is: - * (?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$ - * which assumes that fields in the read name are delimited by ':' and the - * last three fields correspond to the tile, x and y locations, ignoring any - * trailing non-digit characters. - * - * The default regex is optimized for fast parsing (see {@link - * #getLastThreeFields(String, char, int[])}) by searching for the last - * three fields, ignoring any trailing non-digit characters, assuming the - * delimiter ':'. This should consider correctly read names where we have 5 - * or 7 field with the last three fields being tile/x/y, as is the case for - * the majority of read names produced by Illumina technology. - */ - const string DEFAULT_READ_NAME_REGEX = "(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$"; - bool warnAboutRegexNotMatching = true; - - static bool sWrongNameFormat; - - string readNameStored = ""; - PhysicalLocation physicalLocationStored; - int tmpLocationFields[3]; // for optimization of addLocationInformation - bool useOptimizedDefaultParsing = true; // was the regex default? - string readNameRegex = DEFAULT_READ_NAME_REGEX; - regex readNamePattern; - - ReadNameParser() : ReadNameParser(DEFAULT_READ_NAME_REGEX) {} - ReadNameParser(const string &strReadNameRegex) : ReadNameParser(strReadNameRegex, true) {} - ReadNameParser(const string &strReadNameRegex, bool isWarn) { - readNameRegex = strReadNameRegex; - if (strReadNameRegex == DEFAULT_READ_NAME_REGEX) - useOptimizedDefaultParsing = true; - else - useOptimizedDefaultParsing = false; - readNamePattern = std::regex(strReadNameRegex, std::regex_constants::optimize); - warnAboutRegexNotMatching = isWarn; - } - - /* readNameRegex */ - void SetReadNameRegex(const string &strReadNameRegex) { - readNameRegex = strReadNameRegex; - if (strReadNameRegex == DEFAULT_READ_NAME_REGEX) - useOptimizedDefaultParsing = true; - else - useOptimizedDefaultParsing = false; - readNamePattern = std::regex(strReadNameRegex, std::regex_constants::optimize); - // readNamePattern = strReadNameRegex; - } - - /* tile x y */ - bool AddLocationInformation(const string &readName, PhysicalLocation *loc) { - if (!(readName == readNameStored)) { - if (ReadLocationInformation(readName, loc)) { - readNameStored = readName; - physicalLocationStored = *loc; - return true; - } - // return false if read name cannot be parsed - return false; - } else { - *loc = physicalLocationStored; - return true; - } - } - - /** - * Method used to extract tile/x/y from the read name and add it to the - * PhysicalLocationShort so that it can be used later to determine optical - * duplication - * - * @param readName the name of the read/cluster - * @param loc the object to add tile/x/y to - * @return true if the read name contained the information in parsable form, - * false otherwise - */ - bool ReadLocationInformation(const string &readName, PhysicalLocation *loc) { - try { - // Optimized version if using the default read name regex (== used on purpose): - if (useOptimizedDefaultParsing) { - const int fields = getLastThreeFields(readName, ':'); - if (!(fields == 5 || fields == 7)) { - if (warnAboutRegexNotMatching) { - spdlog::warn( - "Default READ_NAME_REGEX '{}' did not match read " - "name '{}'." - "You may need to specify a READ_NAME_REGEX in " - "order to correctly identify optical duplicates. " - "Note that this message will not be emitted again " - "even if other read names do not match the regex.", - readNameRegex.c_str(), readName.c_str()); - warnAboutRegexNotMatching = false; - sWrongNameFormat = true; - } - return false; - } - loc->tile = (int16_t)tmpLocationFields[0]; - loc->x = tmpLocationFields[1]; - loc->y = tmpLocationFields[2]; - return true; - } else if (readNameRegex.empty()) { - return false; - } else { - // Standard version that will use the regex - cmatch m; - if (std::regex_match(readName.c_str(), m, readNamePattern)) { - loc->tile = std::stoi(m[1].str()); - loc->x = std::stoi(m[2].str()); - loc->y = std::stoi(m[3].str()); - return true; - } else { - if (warnAboutRegexNotMatching) { - spdlog::warn( - "READ_NAME_REGEX '{}' did not match read name '{}'." - "Your regex may not be correct. " - "Note that this message will not be emitted again " - "even if other read names do not match the regex.", - readNameRegex.c_str(), readName.c_str()); - warnAboutRegexNotMatching = false; - sWrongNameFormat = true; - } - return false; - } - } - } catch (const std::runtime_error &e) { - if (warnAboutRegexNotMatching) { - spdlog::warn( - "A field parsed out of a read name was expected to contain " - "an integer and did not. READ_NAME_REGEX: {}; Read name: " - "{}; Error Msg: {}", - readNameRegex.c_str(), readName.c_str(), e.what()); - warnAboutRegexNotMatching = false; - sWrongNameFormat = true; - } - } catch (...) { - if (warnAboutRegexNotMatching) { - spdlog::warn( - "A field parsed out of a read name was expected to contain " - "an integer and did not. READ_NAME_REGEX: {}; Read name: " - "{}", - readNameRegex.c_str(), readName.c_str()); - warnAboutRegexNotMatching = false; - sWrongNameFormat = true; - } - } - - return true; - } - - /** - * Given a string, splits the string by the delimiter, and returns the the - * last three fields parsed as integers. Parsing a field considers only a - * sequence of digits up until the first non-digit character. The three - * values are stored in the passed-in array. - * - * @throws NumberFormatException if any of the tokens that should contain - * numbers do not start with parsable numbers - */ - int getLastThreeFields(const string &readName, char delim) { - int tokensIdx = 2; // start at the last token - int numFields = 0; - int i, endIdx; - endIdx = readName.size(); - // find the last three tokens only - for (i = (int)readName.size() - 1; 0 <= i && 0 <= tokensIdx; i--) { - if (readName.at(i) == delim || 0 == i) { - numFields++; - const int startIdx = (0 == i) ? 0 : (i + 1); - tmpLocationFields[tokensIdx] = std::stoi(readName.substr(startIdx, endIdx - startIdx)); - tokensIdx--; - endIdx = i; - } - } - // continue to find the # of fields - while (0 <= i) { - if (readName.at(i) == delim || 0 == i) - numFields++; - i--; - } - if (numFields < 3) { - tmpLocationFields[0] = tmpLocationFields[1] = tmpLocationFields[2] = -1; - numFields = -1; - } - - return numFields; - } -}; - -#endif \ No newline at end of file diff --git a/src/bqsr/recal_funcs.h b/src/bqsr/recal_funcs.h new file mode 100644 index 0000000..1cea324 --- /dev/null +++ b/src/bqsr/recal_funcs.h @@ -0,0 +1,226 @@ +/* + Description: Recalibrate计算过程的工具函数 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2025/12/29 +*/ +#pragma once + +#include "util/stable_array.h" +#include "util/sam_data.h" +#include "bqsr/aux_arg.h" +#include "util/bam_wrap.h" +#include "util/interval.h" +#include "util/vcf_parser.h" +#include "util/profiling.h" + +struct RecalFuncs { + static constexpr int MAX_SITES_INTERVAL = 100000; + static constexpr uint8_t NO_BAQ_UNCERTAINTY = (uint8_t)'@'; + + // 设置某个位置是indel + static inline void updateIndel(StableArray& isIndel, int index) { + if (index >= 0 && index < isIndel.size()) { + isIndel[index] = 1; + } + } + + // 计算该read的每个碱基位置是否是SNP或Indel + static int calculateIsSNPOrIndel(AuxVar& aux, SamData& sd, StableArray& isSNP, StableArray& isIns, StableArray& isDel) { + isSNP.resize_fill(sd.read_len, 0); + isIns.resize_fill(sd.read_len, 0); + isDel.resize_fill(sd.read_len, 0); + // 1. 读取参考基因组,先看看串行运行性能,稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤 + Interval interval{sd.start_pos, sd.end_pos}; // 闭区间 + PROF_START(read_ref); + read_ref_base(aux, interval.left, interval); + PROF_GP_END(read_ref); + string refBases(aux.ref_seq); + + // 2. 遍历cigar,计算每个碱基是否是SNP或Indel + PROF_START(calc_snp); + int readPos = 0, refPos = 0, nEvents = 0; + for (int i = 0; i < sd.cigars.size(); ++i) { + const char c = sd.cigars[i].op; + int len = sd.cigars[i].len; + if (c == 'M' || c == '=' || c == 'X') { + for (int j = 0; j < len; ++j) { + // 按位置将read和ref碱基进行比较,不同则是snp,注意read起始位置要加上left_clip + int snpInt = sd.bases[readPos] == refBases[refPos] ? 0 : 1; + isSNP[readPos] = snpInt; + nEvents += snpInt; + readPos++; + refPos++; + } + } else if (c == 'D') { + // 应该是在上一个消耗碱基的cigar的最后一个位置,标记Del + int index = sd.bw->GetReadNegativeStrandFlag() ? readPos : readPos - 1; + updateIndel(isDel, index); + refPos += len; + } else if (c == 'N') { + refPos += len; + } else if (c == 'I') { + // 与Del不同,Ins应该是在下一个cigar开始的位置,标记Ins + bool forwardStrandRead = !sd.bw->GetReadNegativeStrandFlag(); + if (forwardStrandRead) { + updateIndel(isIns, readPos - 1); + } + readPos += len; + if (!forwardStrandRead) { + updateIndel(isIns, readPos); + } + } else if (c == 'S') { + readPos += len; + } + } + nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0); + PROF_GP_END(calc_snp); + return nEvents; + } + + // 读取给定区间的reference + static inline void read_ref_base(AuxVar& aux, int64_t cur_pos, Interval& interval) { + if (aux.ref_seq != NULL) + free(aux.ref_seq); + int tid = BamWrap::bam_tid(cur_pos); + const char* chr = sam_hdr_tid2name(aux.header, tid); + int seq_begin = BamWrap::bam_pos(interval.left); //- aux.REFERENCE_HALF_WINDOW_LENGTH; + int seq_end = BamWrap::bam_pos(interval.right); //+ aux.REFERENCE_HALF_WINDOW_LENGTH; + aux.ref_seq = faidx_fetch_seq(aux.faidx, chr, seq_begin, seq_end, &aux.ref_len); + // aux.offset = aux.REFERENCE_HALF_WINDOW_LENGTH; + } + + // 获取一行字符串 + static void get_line_from_buf(char* buf, int64_t total, int64_t* cur, string* line) { + line->clear(); + if (*cur >= total) + return; + char b; + while (*cur < total && (b = buf[(*cur)++]) != '\n') { + line->push_back(b); + } + } + + // 计算与read有交叉的已知位点信息, 应该要判断一下,是按照read的范围去读取vcf,还是按照一个batch read的范围去读取 + static void calculateKnownSites(SamData& sd, vector& vcfs, sam_hdr_t* samHdr, StableArray& knownSites) { + BamWrap* bw = sd.bw; + int tid = bw->contig_id(); + int64_t startPos = bw->start_pos(); // 闭区间 + int64_t endPos = bw->end_pos(); // 闭区间 + knownSites.resize_fill(sd.read_len, 0); + + // update vcfs + for (auto& vcf : vcfs) { + // 清理旧的interval + while (!vcf.knownSites.empty()) { + auto& intv = vcf.knownSites.front(); + // spdlog::info("intv bam {}, {}", intv.right, startPos); + if (intv.right < startPos) + vcf.knownSites.pop_front(); + else + break; + } + if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) + continue; + + // 读取新的interval + int64_t fpos, flen; + endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos); + Interval readIntv(startPos, endPos); + vcf.index.SearchInterval(startPos, endPos, &fpos, &flen); + if (flen > 0) { + vcf.inStm.seekg(fpos, ios::beg); + if (flen > vcf.bufLen) { + vcf.bufLen = flen; + vcf.buf = (char*)realloc(vcf.buf, flen); + } + char* buf = vcf.buf; + vcf.inStm.read(buf, flen); + string line; + int64_t cur = 0; + get_line_from_buf(buf, flen, &cur, &line); + while (line.size() > 0) { + stringstream ss_line(line); + string stid; + int tid, pos; + int64_t locus; + string id, ref; + ss_line >> stid >> pos >> id >> ref; + tid = sam_hdr_name2tid(samHdr, stid.c_str()); + int64_t varStart = BamWrap::bam_global_pos(tid, pos); + Interval varIntv(varStart, varStart + ref.size() - 1); + if (readIntv.overlaps(varIntv)) { + vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间 + } + get_line_from_buf(buf, flen, &cur, &line); + } + } + } + for (auto& vcf : vcfs) { + for (auto& intv : vcf.knownSites) { + // knownSite is outside clipping window for the read, ignore + if (intv.right < sd.softStart()) + continue; + if (intv.left > sd.softEnd()) + break; + // 计算对应的位点 + ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.left); + int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx; + if (rcStart.cigarOp == 'D') { + --featureStartOnRead; + } + ReadIdxCigar rcEnd = sd.getReadIndexForReferenceCoordinate(intv.right); + int featureEndOnRead = rcEnd.readIdx == SamData::READ_INDEX_NOT_FOUND ? sd.read_len : rcEnd.readIdx; + if (featureStartOnRead > sd.read_len) { + featureStartOnRead = featureEndOnRead = sd.read_len; + } + for (int i = max(0, featureStartOnRead); i < min(sd.read_len, featureEndOnRead + 1); ++i) { + knownSites[i] = true; + } + } + } + } + + // 应该是计算一段数据的平均值 + static void calculateAndStoreErrorsInBlock(int i, int blockStartIndex, StableArray& errorArr, StableArray& fracErrs) { + int totalErrors = 0; + for (int j = max(0, blockStartIndex - 1); j <= i; j++) { + totalErrors += errorArr[j]; + } + for (int j = max(0, blockStartIndex - 1); j <= i; j++) { + fracErrs[j] = ((double)totalErrors) / ((double)(i - max(0, blockStartIndex - 1) + 1)); + } + } + + // 应该是用来处理BAQ的,把不等于特定BAQ分数的碱基作为一段数据统一处理 + static void calculateFractionalErrorArray(StableArray& errorArr, StableArray& baqArr, StableArray& fracErrs) { + fracErrs.resize_fill(baqArr.size(), 0.0); + // errorArray和baqArray必须长度相同 + const int BLOCK_START_UNSET = -1; + bool inBlock = false; + int blockStartIndex = BLOCK_START_UNSET; + int i; + for (i = 0; i < fracErrs.size(); ++i) { + if (baqArr[i] == NO_BAQ_UNCERTAINTY) { // 不等于NO_BAQ_UNCERTAINTY的碱基当成一段数据来处理 + if (!inBlock) { + fracErrs[i] = errorArr[i]; + } else { + calculateAndStoreErrorsInBlock(i, blockStartIndex, errorArr, fracErrs); + inBlock = false; // reset state variables + blockStartIndex = BLOCK_START_UNSET; // reset state variables + } + } else { + inBlock = true; + if (blockStartIndex == BLOCK_START_UNSET) { + blockStartIndex = i; + } + } + } + // 处理最后一个block + if (inBlock) { + calculateAndStoreErrorsInBlock(i - 1, blockStartIndex, errorArr, fracErrs); + } + } +}; \ No newline at end of file diff --git a/src/bqsr/recal_utils.cpp b/src/bqsr/recal_utils.cpp index 2dfc1e0..1b4f468 100644 --- a/src/bqsr/recal_utils.cpp +++ b/src/bqsr/recal_utils.cpp @@ -9,48 +9,4 @@ #include "recal_utils.h" -/** - * Increments the RecalDatum at the specified position in the specified table, or put a new item there - * if there isn't already one. - * - * Note: we intentionally do not use varargs here to avoid the performance cost of allocating an array on every call. It showed on the profiler. - * - * @param table the table that holds/will hold our item - * @param qual qual for this event - * @param isError error value for this event - * @param key0, key1, key2 location in table of our item - */ -void RecalUtils::IncrementDatum3keys(Array3D table, uint8_t qual, double isError, int key0, int key1, int key2) { - RecalDatum &existingDatum = table.get(key0, key1, key2); - - if (existingDatum.numObservations == 0) { - // No existing item, put a new one - table.put(RecalDatum(1, qual, isError), key0, key1, key2); - } else { - // Easy case: already an item here, so increment it - existingDatum.increment(1, isError); - } -} - -/** - * Increments the RecalDatum at the specified position in the specified table, or put a new item there - * if there isn't already one. - * - * Note: we intentionally do not use varargs here to avoid the performance cost of allocating an array on every call. It showed on the profiler. - * - * @param table the table that holds/will hold our item - * @param qual qual for this event - * @param isError error value for this event - * @param key0, key1, key2, key3 location in table of our item - */ -void RecalUtils::IncrementDatum4keys(Array4D table, uint8_t qual, double isError, int key0, int key1, int key2, int key3) { - RecalDatum& existingDatum = table.get(key0, key1, key2, key3); - - if (existingDatum.numObservations == 0) { - // No existing item, put a new one - table.put(RecalDatum(1, qual, isError), key0, key1, key2, key3); - } else { - // Easy case: already an item here, so increment it - existingDatum.increment(1, isError); - } -} \ No newline at end of file +vector RecalUtils::validEventTypes; \ No newline at end of file diff --git a/src/bqsr/recal_utils.h b/src/bqsr/recal_utils.h index 9c8d82a..c300985 100644 --- a/src/bqsr/recal_utils.h +++ b/src/bqsr/recal_utils.h @@ -20,6 +20,7 @@ #include "util/report_table.h" #include "util/utils.h" #include "covariate.h" +#include "read_recal_info.h" struct RecalUtils { @@ -27,9 +28,98 @@ struct RecalUtils { static constexpr int REPORTED_QUALITY_DECIMAL_PLACES = 4; static constexpr int NUMBER_ERRORS_DECIMAL_PLACES = 2; - // 根据每个read的key,在recalTable中添加对应的数据 - static void IncrementDatum3keys(Array3D table, uint8_t qual, double isError, int key0, int key1, int key2); - static void IncrementDatum4keys(Array4D table, uint8_t qual, double isError, int key0, int key1, int key2, int key3); + static vector validEventTypes; // 真正需要计算的event types + + static void initEventTypes(bool computeIndelBQSRTables) { + validEventTypes.clear(); + validEventTypes.push_back(EventType::BASE_SUBSTITUTION); + if (computeIndelBQSRTables) { + validEventTypes.push_back(EventType::BASE_INSERTION); + validEventTypes.push_back(EventType::BASE_DELETION); + } + } + + /** + * Update the recalibration statistics using the information in recalInfo. + * + * Implementation detail: we only populate the quality score table and the optional tables. + * The read group table will be populated later by collapsing the quality score table. + * + * @param recalInfo data structure holding information about the recalibration values for a single read + */ + static void updateRecalTablesForRead(ReadRecalInfo& info, RecalTables& recalTables) { + SamData& read = info.read; + PerReadCovariateMatrix& readCovars = info.covariates; + Array3D& qualityScoreTable = recalTables.qualityScoreTable; + Array4D& contextTable = recalTables.contextTable; + Array4D& cycleTable = recalTables.cycleTable; + + int readLength = read.read_len; + for (int offset = 0; offset < readLength; ++offset) { + // if (read.rid == 46114) { + // fprintf(gf[3], "%d %d\n", offset, info.skips[offset] ? 1 : 0); + // } + if (!info.skips[offset]) { + // if (true){ // 不跳过当前位置 + for (int idx = 0; idx < validEventTypes.size(); ++idx) { + // 获取四个值,readgroup / qualityscore / context / cycle + EventTypeValue& event = validEventTypes[idx]; + vector& covariatesAtOffset = readCovars[event.index][offset]; + uint8_t qual = info.getQual(event, offset); + double isError = info.getErrorFraction(event, offset); + + int readGroup = covariatesAtOffset[ReadGroupCovariate::index]; + int baseQuality = covariatesAtOffset[BaseQualityCovariate::index]; + + // 处理base quality score协变量 + // RecalUtils::IncrementDatum3keys(qualityScoreTable, qual, isError, readGroup, baseQuality, event.index); + //if (read.rid == 46114) { + // fprintf(gf[3], "%d %d %f\n", offset, baseQuality, isError); + //} + qualityScoreTable[readGroup][baseQuality][event.index].increment(1, isError, baseQuality); + + auto& d = qualityScoreTable[readGroup][baseQuality][event.index]; + // spdlog::info("isError {} : {}, mis {}, obs {}", isError, info.snp_errs[offset], d.numMismatches, d.numObservations); + + // 处理context covariate + int contextCovariate = covariatesAtOffset[ContextCovariate::index]; + if (contextCovariate >= 0) + contextTable[readGroup][baseQuality][contextCovariate][event.index].increment(1, isError, baseQuality); + // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.contextTable, qual, isError, readGroup, baseQuality, contextCovariate, + // event.index); + // 处理cycle covariate + int cycleCovariate = covariatesAtOffset[CycleCovariate::index]; + if (cycleCovariate >= 0) + cycleTable[readGroup][baseQuality][cycleCovariate][event.index].increment(1, isError, baseQuality); + // RecalUtils::IncrementDatum4keys(nsgv::gRecalTables.cycleTable, qual, isError, readGroup, baseQuality, cycleCovariate, + // event.index); + } + } + } + // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); + // for (auto& arr1 : qualityScoreTable.data) { + // for (size_t si = 0; si < arr1.size(); ++si) { + // for (auto &val : arr1[si]) { + // if (val.numObservations > 0) + // fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); + // } + // } + // } + // fprintf(gf[3], "\n"); + + // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); + // for (auto& arr1 : contextTable.data) { + // for (size_t si = 0; si < arr1.size(); ++si) { + // for (auto &arr2 : arr1[si]) { + // for (auto& val : arr2) { + // if (val.numObservations > 0) + // fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); + // } + // } + // } + // } + // fprintf(gf[3], "\n"); + } // 输出bqsr报告 static void outputRecalibrationReport(const BQSRArg& RAC, const QuantizationInfo& quantInfo, const RecalTables& recalTables) { @@ -113,13 +203,8 @@ struct RecalUtils { table.addColumn({"Observations", "%d"}); table.addColumn({"Errors", "%.2f"}); - //spdlog::info("rg0: {}, {}, {}, {}", r.readGroupTable[0][0].numObservations, r.readGroupTable[0][0].numMismatches, - // r.readGroupTable[0][0].reportedQuality, r.readGroupTable[0][0].empiricalQuality); - _Foreach2DK(r.readGroupTable, datum, { RecalDatum &dat = const_cast(datum); - // spdlog::info("errors: {}, {}", datum.numMismatches, dat.getNumMismatches()); - // spdlog::info("obs: {}, {}", datum.numObservations, dat.getNumObservations()); if (dat.getNumObservations() > 0) { table.addRowData({ ReadGroupCovariate::IdToRg[k1], diff --git a/src/util/profiling.cpp b/src/util/profiling.cpp index 5d456da..6bfb7fc 100644 --- a/src/util/profiling.cpp +++ b/src/util/profiling.cpp @@ -56,8 +56,10 @@ int DisplayProfiling(int nthread) { // PRINT_GP(intersect_wait); PRINT_GP(clip_read); PRINT_GP(read_ref); + PRINT_GP(calc_snp); PRINT_GP(read_vcf); PRINT_GP(covariate); + PRINT_GP(frac_err); PRINT_GP(update_info); //PRINT_GP(markdup); //PRINT_GP(intersect); diff --git a/src/util/profiling.h b/src/util/profiling.h index 0ee68e8..d89657b 100644 --- a/src/util/profiling.h +++ b/src/util/profiling.h @@ -25,6 +25,7 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE]; #define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec() #define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec() #define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time +#define PROF_GP_END(tmp_time) gprof[GP_##tmp_time] += RealtimeMsec() - prof_tmp_##tmp_time #define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec() #define PROF_PRINT_END(tmp_time) \ tmp_time = RealtimeMsec() - tmp_time; \ @@ -32,6 +33,7 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE]; #else #define PROF_START(tmp_time) #define PROF_END(result, tmp_time) +#define PROF_GP_END(tmp_time) #define PROF_PRINT_START(tmp_time) #define PROF_PRINT_END(tmp_time) #endif @@ -41,9 +43,11 @@ enum { GP_0 = 0, GP_1, GP_2, GP_3, GP_4, GP_5, GP_6, GP_7, GP_8, GP_9, GP_10 }; enum { GP_read_wait = 11, GP_clip_read, + GP_calc_snp, GP_covariate, GP_read_ref, GP_read_vcf, + GP_frac_err, GP_update_info, GP_gen_wait, GP_sort_wait, diff --git a/src/util/stable_array.h b/src/util/stable_array.h index d10800a..0d0aeb1 100644 --- a/src/util/stable_array.h +++ b/src/util/stable_array.h @@ -32,7 +32,7 @@ struct StableArray { idx = _size; } - void resize(size_t _size, const T &val) { + void resize_fill(size_t _size, const T &val) { if (arr.size() < _size) { arr.resize(_size); } diff --git a/src/util/utils.h b/src/util/utils.h index 4bd592d..134771c 100644 --- a/src/util/utils.h +++ b/src/util/utils.h @@ -8,4 +8,14 @@ using std::string; void error(const char* format, ...); struct Utils { + /* + * 根据文件名,获取文件扩展名 + */ + static string getFileExtension(const string& filename) { + auto last_dot = filename.find_last_of('.'); + if (last_dot == string::npos) { + return ""; + } + return filename.substr(last_dot + 1); + } }; \ No newline at end of file diff --git a/src/util/vcf_parser.h b/src/util/vcf_parser.h new file mode 100644 index 0000000..9e28038 --- /dev/null +++ b/src/util/vcf_parser.h @@ -0,0 +1,53 @@ + +/* + Description: 解析knownsites vcf文件的类 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2025/12/24 +*/ +#pragma once + +#include + +#include +#include +#include + +#include "interval.h" +#include "linear_index.h" + +using std::deque; +using std::ifstream; +using std::string; + +// 需要支持vcf idx,tbi,csi三种索引方式 +// vcf和idx是一对 +// vcf.gz和tbi或csi是一对 + +// 解析knownSites +struct VCFParser { + deque knownSites; // 已知的变异位点 + char* buf = nullptr; // // 数据buffer + uint32_t bufLen = 4 * 1024; // 数据buffer长度 + LinearIndex index; // vcf文件索引 + ifstream inStm; // vcf文件流 + + VCFParser() { Init(); } + VCFParser(const string& vcfFileName) { Init(vcfFileName); } + VCFParser(const string& vcfFileName, sam_hdr_t* samHeader) { Init(vcfFileName, samHeader); } + + void Init() { buf = (char*)malloc(bufLen); } + void Init(const string& vcfFileName) { + Init(); + inStm.open(vcfFileName, ifstream::in); + string idxFileName = vcfFileName + ".idx"; + if (!index.ReadIndex(idxFileName)) + error("[%s] fail to load the %s index file\n", __func__, idxFileName.c_str()); + } + void Init(const string& vcfFileName, sam_hdr_t* samHeader) { + index.SetHeader(samHeader); + Init(vcfFileName); + } +}; \ No newline at end of file