初步并行实现,还没完全

This commit is contained in:
zzh 2025-12-30 03:14:05 +08:00
parent 3815a67618
commit 84463ede19
7 changed files with 272 additions and 34 deletions

View File

@ -0,0 +1,3 @@
#include "aux_arg.h"
int64_t AuxVar::processedReads = 0;

View File

@ -15,13 +15,21 @@
#include "util/vcf_parser.h" #include "util/vcf_parser.h"
#include "recal_tables.h" #include "recal_tables.h"
#include "covariate.h"
#include "util/bam_wrap.h"
#include "util/sam_data.h"
#include "util/stable_array.h"
#include "util/bam_buf.h"
using std::vector; using std::vector;
// 解析后的一些参数,文件,数据等 // 解析后的一些参数,文件,数据等
struct AuxVar { struct AuxVar {
const static int REF_CONTEXT_PAD = 3; // 需要做一些填充 //const static int REF_CONTEXT_PAD = 3; // 需要做一些填充
const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列防止边界效应 //const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列防止边界效应
static constexpr int BAM_BLOCK_NUM = 1000; // 每个线程每次处理1k个bam记录
static int64_t processedReads;
sam_hdr_t* header = nullptr; // bam header sam_hdr_t* header = nullptr; // bam header
faidx_t* faidx = nullptr; // reference index faidx_t* faidx = nullptr; // reference index
@ -29,7 +37,16 @@ struct AuxVar {
int ref_len = 0; // reference sequence length int ref_len = 0; // reference sequence length
int offset = 0; // 在要求的ref序列两边多余取出的碱基数量 int offset = 0; // 在要求的ref序列两边多余取出的碱基数量
BamArray *bamArr = nullptr; // bam数据数组
vector<VCFParser> vcfArr; // 从vcf中获取已知位点 vector<VCFParser> vcfArr; // 从vcf中获取已知位点
PerReadCovariateMatrix readCovariates; // 每个read对应的协变量矩阵
RecalTables recalTables; // 每个线程对应一个recal tables RecalTables recalTables; // 每个线程对应一个recal tables
SamData sd;
StableArray<int> isSNP, isIns, isDel; // 该位置是否是SNP, indel位置0不是1是
StableArray<uint8_t> baqArray;
StableArray<double> snpErrors, insErrors, delErrors;
StableArray<uint8_t> skips; // 该位置是否是已知位点
}; };

View File

@ -13,6 +13,7 @@ Date : 2023/10/23
#include <htslib/sam.h> #include <htslib/sam.h>
#include <htslib/synced_bcf_reader.h> #include <htslib/synced_bcf_reader.h>
#include <htslib/thread_pool.h> #include <htslib/thread_pool.h>
#include <klib/kthread.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <iomanip> #include <iomanip>
@ -110,14 +111,21 @@ void roundTableValues(RecalTables& rt) {
} }
// 串行bqsr // 串行bqsr
int SerialBQSR() { int SerialBQSR(AuxVar &aux) {
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut);
int64_t readNumSum = 0; int64_t readNumSum = 0;
int round = 0; int round = 0;
PerReadCovariateMatrix readCovariates; PerReadCovariateMatrix readCovariates;
CovariateUtils::InitPerReadCovMat(readCovariates); CovariateUtils::InitPerReadCovMat(readCovariates);
RecalTables& recalTables = nsgv::gAuxVars[0].recalTables; RecalTables& recalTables = aux.recalTables;
SamData& sd = aux.sd;
StableArray<int>&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置0不是1是
StableArray<uint8_t> &baqArray = aux.baqArray;
StableArray<double> &snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors;
StableArray<uint8_t> &skips = aux.skips; // 该位置是否是已知位点
while (true) { while (true) {
++round; ++round;
// 一. 读取bam数据 // 一. 读取bam数据
@ -131,12 +139,6 @@ int SerialBQSR() {
spdlog::info("{} reads processed in {} round", readNum, round); spdlog::info("{} reads processed in {} round", readNum, round);
// 二. 遍历每个bamread记录进行处理 // 二. 遍历每个bamread记录进行处理
SamData sd;
StableArray<int> isSNP, isIns, isDel; // 该位置是否是SNP, indel位置0不是1是
StableArray<uint8_t> baqArray;
StableArray<double> snpErrors, insErrors, delErrors;
StableArray<uint8_t> skips; // 该位置是否是已知位点
for (int i = 0; i < bams.size(); ++i) { for (int i = 0; i < bams.size(); ++i) {
// 1. 对每个read需要检查cigar是否合法即没有两个连续的相同的cigar而且需要将首尾的deletion处理掉目前看好像没啥影响我们忽略这一步 // 1. 对每个read需要检查cigar是否合法即没有两个连续的相同的cigar而且需要将首尾的deletion处理掉目前看好像没啥影响我们忽略这一步
// 2. 对质量分数长度跟碱基长度不匹配的read缺少的质量分数用默认值补齐先忽略后边有需要再处理 // 2. 对质量分数长度跟碱基长度不匹配的read缺少的质量分数用默认值补齐先忽略后边有需要再处理
@ -164,7 +166,7 @@ int SerialBQSR() {
PROF_END(gprof[GP_clip_read], clip_read); PROF_END(gprof[GP_clip_read], clip_read);
// 6. 更新每个read的platform信息好像没啥用暂时忽略 // 6. 更新每个read的platform信息好像没啥用暂时忽略
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(nsgv::gAuxVars[0], sd, isSNP, isIns, isDel); const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel);
/*fprintf(gf[0], "%d\t", sd.read_len); /*fprintf(gf[0], "%d\t", sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]);
@ -194,7 +196,7 @@ int SerialBQSR() {
// 8. 计算这条read对应的协变量 // 8. 计算这条read对应的协变量
PROF_START(covariate); PROF_START(covariate);
CovariateUtils::ComputeCovariates(sd, nsgv::gInBamHeader, readCovariates, true); CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true);
PROF_END(gprof[GP_covariate], covariate); PROF_END(gprof[GP_covariate], covariate);
// fprintf(gf[3], "%ld %ld %d %ld\n", sd.rid, readCovariates.size(), sd.read_len, readCovariates[0][0].size()); // fprintf(gf[3], "%ld %ld %d %ld\n", sd.rid, readCovariates.size(), sd.read_len, readCovariates[0][0].size());
@ -220,7 +222,7 @@ int SerialBQSR() {
// 9. 计算这条read需要跳过的位置 // 9. 计算这条read需要跳过的位置
PROF_START(read_vcf); PROF_START(read_vcf);
RecalFuncs::calculateKnownSites(sd, nsgv::gAuxVars[0].vcfArr, nsgv::gInBamHeader, skips); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips);
for (int ii = 0; ii < sd.read_len; ++ii) { for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
@ -264,6 +266,109 @@ int SerialBQSR() {
return 0; return 0;
} }
static void thread_worker(void* data, long idx, int tid) {
AuxVar& aux = ((AuxVar*)data)[tid];
auto& readCovariates = aux.readCovariates;
RecalTables& recalTables = aux.recalTables;
SamData& sd = aux.sd;
StableArray<int>&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置0不是1是
StableArray<uint8_t>& baqArray = aux.baqArray;
StableArray<double>&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors;
StableArray<uint8_t>& skips = aux.skips; // 该位置是否是已知位点
auto &bams = *aux.bamArr;
int startIdx = idx * aux.BAM_BLOCK_NUM;
int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size());
for (int i = startIdx; i < stopIdx; ++i) {
BamWrap* bw = bams[i];
sd.init();
sd.parseBasic(bw);
sd.rid = i + aux.processedReads;
if (sd.read_len <= 0) continue;
//PROF_START(clip_read);
ReadTransformer::hardClipAdaptorSequence(bw, sd);
if (sd.read_len <= 0) continue;
ReadTransformer::hardClipSoftClippedBases(bw, sd);
if (sd.read_len <= 0) continue;
sd.applyTransformations();
// PROF_END(gprof[GP_clip_read], clip_read);
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel);
bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = BAQ::flatBAQArray(sd, baqArray);
} else {
// baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray);
}
if (!baqCalculated) continue;
// PROF_START(covariate);
CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true);
// PROF_END(gprof[GP_covariate], covariate);
// PROF_START(read_vcf);
RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips);
for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] =
skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
}
// PROF_GP_END(read_vcf);
// PROF_START(frac_err);
RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors);
RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors);
RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors);
// PROF_GP_END(frac_err);
ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors);
//PROF_START(update_info);
RecalUtils::updateRecalTablesForRead(info, recalTables);
//PROF_END(gprof[GP_update_info], update_info);
}
}
// 并行bqsr
int ParallelBQSR(vector<AuxVar>& auxArr) {
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut);
int64_t readNumSum = 0;
int round = 0;
while (true) {
++round;
// 一. 读取bam数据
size_t readNum = 0;
if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam();
if (readNum < 1) { break; }
auto bams = inBamBuf.GetBamArr();
for_each(auxArr.begin(), auxArr.end(), [&](AuxVar& aux) {
aux.bamArr = &bams;
});
spdlog::info("{} reads processed in {} round", readNum, round);
kt_for(nsgv::gBqsrArg.NUM_THREADS, thread_worker, auxArr.data(), (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM);
readNumSum += readNum;
AuxVar::processedReads += readNum;
inBamBuf.ClearAll(); //
}
spdlog::info("read count: {}", readNumSum);
// // 12. 创建总结数据
// collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable);
// roundTableValues(recalTables);
//
// // 13. 量化质量分数
// QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);
//
// // 14. 输出结果
// RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables);
//
return 0;
}
// 在进行数据处理之前,初始化一些全局数据 // 在进行数据处理之前,初始化一些全局数据
static void globalInit() { static void globalInit() {
open_debug_files(); open_debug_files();
@ -277,7 +382,8 @@ static void globalInit() {
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
/* 并行读取bam数据 */ /* 并行读取bam数据 */
htsThreadPool htsPoolRead = {NULL, 0}; // htsThreadPool htsPoolRead = {NULL, 0}; //
int readThreadNum = min(nsgv::gBqsrArg.NUM_THREADS, 8);
htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS); htsPoolRead.pool = hts_tpool_init(nsgv::gBqsrArg.NUM_THREADS);
if (!htsPoolRead.pool ) { if (!htsPoolRead.pool ) {
spdlog::error("[{}] failed to set up thread pool", __LINE__); spdlog::error("[{}] failed to set up thread pool", __LINE__);
@ -313,6 +419,7 @@ static void globalInit() {
} }
// 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue // 注意初始化顺序这个必须在协变量初始化之后因为需要用到MaximumKeyValue
nsgv::gAuxVars[i].recalTables.init(nsgv::gInBamHeader->hrecs->nrg); nsgv::gAuxVars[i].recalTables.init(nsgv::gInBamHeader->hrecs->nrg);
CovariateUtils::InitPerReadCovMat(nsgv::gAuxVars[i].readCovariates);
} }
// 0. 初始化一些全局数据 // 0. 初始化一些全局数据
@ -348,7 +455,8 @@ int BaseRecalibrator() {
PROF_START(whole_process); PROF_START(whole_process);
globalInit(); globalInit();
ret = SerialBQSR(); // 串行处理数据生成recal table // ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据生成recal table
ret = ParallelBQSR(nsgv::gAuxVars); // 串行处理数据生成recal table
globalDestroy(); globalDestroy();
sam_close(nsgv::gInBamFp); sam_close(nsgv::gInBamFp);
PROF_END(gprof[GP_whole_process], whole_process); PROF_END(gprof[GP_whole_process], whole_process);

View File

@ -31,6 +31,85 @@ void CovariateUtils::ComputeCovariates(SamData& sd, sam_hdr_t* header, PerReadCo
CycleCovariate::RecordValues(sd, header, values, recordIndelValues); CycleCovariate::RecordValues(sd, header, values, recordIndelValues);
} }
/*
// kernel fusion
void CovariateUtils::ComputeCovariates(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues) {
#define __bq_set_all_cov_indel(ins, del) \
do { \
for (int i = 0; i < sd.read_len; ++i) { \
const int vi = negativeStrand ? sd.read_len - 1 - i : i; \
const int cycleM = CycleCovariate::CycleKey(sd, i, false, CycleCovariate::MAXIMUM_CYCLE_VALUE); \
const int cycleIndel = CycleCovariate::CycleKey(sd, i, true, CycleCovariate::MAXIMUM_CYCLE_VALUE); \
CovariateUtils::SetAllCovs(crg, quals[i], nBasePairContextAtEachCycle[vi], cycleM, crg, (ins), indelKeys[vi], cycleIndel, crg, (del), \
indelKeys[vi], cycleIndel, i, values); \
} \
} while (0)
#define __bq_set_all_cov(ins, del) \
do { \
for (int i = 0; i < sd.read_len; ++i) { \
const int vi = negativeStrand ? sd.read_len - 1 - i : i; \
const int cycleM = CycleCovariate::CycleKey(sd, i, false, CycleCovariate::MAXIMUM_CYCLE_VALUE); \
CovariateUtils::SetAllCovs(crg, quals[i], nBasePairContextAtEachCycle[vi], cycleM, crg, (ins), 0, 0, crg, (del), 0, 0, i, values); \
} \
} while (0)
// 1. read group
uint8_t* rgStr = bam_aux_get(sd.bw->b, "RG");
char* rgVal = nullptr;
if (rgStr) rgVal = bam_aux2Z(rgStr);
int crg = 0;
if (rgVal == nullptr || ReadGroupCovariate::RgToId.find(rgVal) == ReadGroupCovariate::RgToId.end()) {
spdlog::error("The RG tag value for read can not be found in header!");
} else {
crg = ReadGroupCovariate::RgToId[rgVal];
}
// 2. base quality
// 在前面的处理过后quals应该和base长度一致了
#define __bq_set_cov(ins, del) \
do { \
for (int i = 0; i < sd.read_len; ++i) { \
CovariateUtils::SetBaseQual(quals[i], (ins), (del), i, values); \
} \
} while (0)
const int readLength = sd.read_len;
const bool negativeStrand = sd.bw->GetReadNegativeStrandFlag();
const int INDEL_QUAL = 45;
auto& quals = sd.base_quals;
auto& strandedClippedBases = sd.strandedClippedBases;
strandedClippedBases.copy(sd.bases);
ContextCovariate::GetStrandedClippedBytes(sd, strandedClippedBases, ContextCovariate::lowQualTail);
auto& nBasePairContextAtEachCycle = sd.nBasePairContextAtEachCycle;
ContextCovariate::GetReadContextAtEachPosition(strandedClippedBases, ContextCovariate::mismatchesContextSize, ContextCovariate::mismatchesKeyMask,
nBasePairContextAtEachCycle);
if (recordIndelValues) {
// context
auto& indelKeys = sd.indelKeys;
ContextCovariate::GetReadContextAtEachPosition(strandedClippedBases, ContextCovariate::indelsContextSize, ContextCovariate::indelsKeyMask,
indelKeys);
// cycle cov is in the loop
// quality
uint8_t* insQualPtr = bam_aux_get(sd.bw->b, "BI"); // base qualities for insertions
uint8_t* delQualPtr = bam_aux_get(sd.bw->b, "BD"); // base qualities for deletions
if (insQualPtr == nullptr && delQualPtr == nullptr) {
__bq_set_all_cov_indel(INDEL_QUAL, INDEL_QUAL);
} else if (insQualPtr == nullptr) {
uint8_t* delQuals = (uint8_t*)bam_aux2Z(delQualPtr) + sd.left_clip;
__bq_set_all_cov_indel(INDEL_QUAL, delQuals[i]);
} else {
uint8_t* insQuals = (uint8_t*)bam_aux2Z(insQualPtr) + sd.left_clip;
__bq_set_all_cov_indel(insQuals[i], INDEL_QUAL);
}
} else {
__bq_set_all_cov(0, 0);
}
}
*/
// ReadGroupCovariate 协变量的方法 // ReadGroupCovariate 协变量的方法
void ReadGroupCovariate::RecordValues(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues) { void ReadGroupCovariate::RecordValues(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues) {
uint8_t *rgStr = bam_aux_get(sd.bw->b, "RG"); uint8_t *rgStr = bam_aux_get(sd.bw->b, "RG");
@ -99,7 +178,7 @@ static char SimpleComplement(const char base) {
} }
} }
static void ClipLowQualEndsWithN(string& bases, const StableArray<uint8_t> &quals, uint8_t lowQTail, bool isNegativeStrand) { static void ClipLowQualEndsWithN(StableArray<char>& bases, const StableArray<uint8_t>& quals, uint8_t lowQTail, bool isNegativeStrand) {
// 处理左边 // 处理左边
int left = 0; int left = 0;
int readLen = bases.size(); int readLen = bases.size();
@ -118,10 +197,10 @@ static void ClipLowQualEndsWithN(string& bases, const StableArray<uint8_t> &qual
break; break;
} }
} }
if (left == readLen) { //if (left == readLen) {
bases.clear(); // bases.clear();
return; // return;
} //}
// 处理右边 // 处理右边
int right = (int)bases.size() - 1; int right = (int)bases.size() - 1;
if (isNegativeStrand) { if (isNegativeStrand) {
@ -139,13 +218,12 @@ static void ClipLowQualEndsWithN(string& bases, const StableArray<uint8_t> &qual
break; break;
} }
} }
if (right < left) // if (right < left)
bases.clear(); // bases.clear();
} }
// 获取去除低质量分数碱基之后的read碱基序列将低质量分数的碱基变成N // 获取去除低质量分数碱基之后的read碱基序列将低质量分数的碱基变成N
void ContextCovariate::GetStrandedClippedBytes(SamData& sd, string& clippedBases, uint8_t lowQTail) { void ContextCovariate::GetStrandedClippedBytes(SamData& sd, StableArray<char>& clippedBases, uint8_t lowQTail) {
if (sd.bw->GetReadNegativeStrandFlag()) { // 反向互补 if (sd.bw->GetReadNegativeStrandFlag()) { // 反向互补
for (int i = 0; i < sd.read_len; ++i) clippedBases[i] = SimpleComplement(sd.bases[sd.read_len - 1 - i]); for (int i = 0; i < sd.read_len; ++i) clippedBases[i] = SimpleComplement(sd.bases[sd.read_len - 1 - i]);
} }
@ -160,7 +238,7 @@ void ContextCovariate::GetStrandedClippedBytes(SamData& sd, string& clippedBases
* @param end the end position in the array (exclusive) * @param end the end position in the array (exclusive)
* @return the key representing the dna sequence * @return the key representing the dna sequence
*/ */
int ContextCovariate::KeyFromContext(const string& dna, const int start, const int end) { int ContextCovariate::KeyFromContext(const StableArray<char>& dna, const int start, const int end) {
int key = end - start; int key = end - start;
int bitOffset = LENGTH_BITS; int bitOffset = LENGTH_BITS;
for (int i = start; i < end; i++) { for (int i = start; i < end; i++) {
@ -188,7 +266,7 @@ int ContextCovariate::KeyFromContext(const string& dna, const int start, const i
* @return a list that has the same length as the read and contains the (preceding) n-base context at each position. * @return a list that has the same length as the read and contains the (preceding) n-base context at each position.
* *
*/ */
void ContextCovariate::GetReadContextAtEachPosition(const string& bases, const int contextSize, const int mask, vector<int>& keys) { void ContextCovariate::GetReadContextAtEachPosition(const StableArray<char>& bases, const int contextSize, const int mask, StableArray<int>& keys) {
int readLength = bases.size(); int readLength = bases.size();
keys.resize(readLength); keys.resize(readLength);
int keyIdx = 0; int keyIdx = 0;
@ -245,11 +323,13 @@ void ContextCovariate::RecordValues(SamData& sd, sam_hdr_t* header, PerReadCovar
const int originalReadLength = sd.read_len; const int originalReadLength = sd.read_len;
// store the original bases and then write Ns over low quality ones // store the original bases and then write Ns over low quality ones
string strandedClippedBases(sd.bases.arr.data(), sd.read_len); // string strandedClippedBases(sd.bases.arr.data(), sd.read_len);
auto &strandedClippedBases = sd.strandedClippedBases;
strandedClippedBases.copy(sd.bases);
// GetStrandedClippedBytes(bw, sd, strandedClippedBases, 30); // 注意这里的lowQualTail数值 // GetStrandedClippedBytes(bw, sd, strandedClippedBases, 30); // 注意这里的lowQualTail数值
GetStrandedClippedBytes(sd, strandedClippedBases, lowQualTail); // 命名我之前看到过这个30的 GetStrandedClippedBytes(sd, strandedClippedBases, lowQualTail); // 命名我之前看到过这个30的
// spdlog::info("bases: {}", strandedClippedBases); // spdlog::info("bases: {}", strandedClippedBases);
vector<int> nBasePairContextAtEachCycle; auto& nBasePairContextAtEachCycle = sd.nBasePairContextAtEachCycle;
GetReadContextAtEachPosition(strandedClippedBases, mismatchesContextSize, mismatchesKeyMask, nBasePairContextAtEachCycle); GetReadContextAtEachPosition(strandedClippedBases, mismatchesContextSize, mismatchesKeyMask, nBasePairContextAtEachCycle);
const int readLengthAfterClipping = strandedClippedBases.size(); const int readLengthAfterClipping = strandedClippedBases.size();
@ -268,7 +348,7 @@ void ContextCovariate::RecordValues(SamData& sd, sam_hdr_t* header, PerReadCovar
const bool negativeStrand = sd.bw->GetReadNegativeStrandFlag(); const bool negativeStrand = sd.bw->GetReadNegativeStrandFlag();
// Note: duplicated the loop to avoid checking recordIndelValues on each iteration // Note: duplicated the loop to avoid checking recordIndelValues on each iteration
if (recordIndelValues) { if (recordIndelValues) {
vector<int> indelKeys; auto& indelKeys = sd.indelKeys;
GetReadContextAtEachPosition(strandedClippedBases, indelsContextSize, indelsKeyMask, indelKeys); GetReadContextAtEachPosition(strandedClippedBases, indelsContextSize, indelsKeyMask, indelKeys);
for (int i = 0; i < readLengthAfterClipping; i++) { for (int i = 0; i < readLengthAfterClipping; i++) {
const int readOffset = GetStrandedOffset(negativeStrand, i, readLengthAfterClipping); const int readOffset = GetStrandedOffset(negativeStrand, i, readLengthAfterClipping);

View File

@ -199,11 +199,11 @@ struct ContextCovariate {
} }
// 获取去除低质量分数碱基之后的read碱基序列将低质量分数的碱基变成N // 获取去除低质量分数碱基之后的read碱基序列将低质量分数的碱基变成N
static void GetStrandedClippedBytes(SamData& ad, string& clippedBases, uint8_t lowQTail); static void GetStrandedClippedBytes(SamData& ad, StableArray<char>& clippedBases, uint8_t lowQTail);
// Creates a int representation of a given dna string. // Creates a int representation of a given dna string.
static int KeyFromContext(const string& dna, const int start, const int end); static int KeyFromContext(const StableArray<char>& dna, const int start, const int end);
// For each position of the read, calculate the n-base-pair *read* base context (as opposed to the reference context). // For each position of the read, calculate the n-base-pair *read* base context (as opposed to the reference context).
static void GetReadContextAtEachPosition(const string& bases, const int contextSize, const int mask, vector<int>& keys); static void GetReadContextAtEachPosition(const StableArray<char>& bases, const int contextSize, const int mask, StableArray<int>& keys);
// 设置协变量的值 // 设置协变量的值
static void RecordValues(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues); static void RecordValues(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues);
@ -296,6 +296,25 @@ struct CovariateUtils {
matrix[EventType::BASE_DELETION.index][readOffset].cycle = deletion; matrix[EventType::BASE_DELETION.index][readOffset].cycle = deletion;
} }
static inline void SetAllCovs(int m1, int i1, int d1, int m2, int i2, int d2, int m3, int i3, int d3, int m4, int i4, int d4, int readOffset,
PerReadCovariateMatrix& matrix) {
CovariateValues *cvp = &matrix[EventType::BASE_SUBSTITUTION.index][readOffset];
cvp->readGroup = m1;
cvp->baseQuality = m2;
cvp->context = m3;
cvp->cycle = m4;
cvp = &matrix[EventType::BASE_INSERTION.index][readOffset];
cvp->readGroup = i1;
cvp->baseQuality = i2;
cvp->context = i3;
cvp->cycle = i4;
cvp = &matrix[EventType::BASE_DELETION.index][readOffset];
cvp->readGroup = d1;
cvp->baseQuality = d2;
cvp->context = d3;
cvp->cycle = d4;
}
// 对一条read计算协变量该协变量被上一个read用过 // 对一条read计算协变量该协变量被上一个read用过
static void ComputeCovariates(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues); static void ComputeCovariates(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues);
}; };

View File

@ -56,6 +56,11 @@ struct SamData {
StableArray<uint8_t> del_quals; // delete质量分数, BD (大部分应该都没有) StableArray<uint8_t> del_quals; // delete质量分数, BD (大部分应该都没有)
StableArray<Cigar> cigars; StableArray<Cigar> cigars;
// 用作临时buffer
StableArray<char> strandedClippedBases; // for context covariate
StableArray<int> nBasePairContextAtEachCycle; // for context covariate
StableArray<int> indelKeys; // for context covariate
int64_t& softStart() { return start_pos; } int64_t& softStart() { return start_pos; }
int64_t& softEnd() { return end_pos; } int64_t& softEnd() { return end_pos; }

View File

@ -19,7 +19,7 @@ struct StableArray {
vector<T> arr; vector<T> arr;
size_t idx = 0; size_t idx = 0;
void clear() { idx = 0; } void clear() { idx = 0; }
size_t size() { return idx; } size_t size() const { return idx; }
bool empty() { return idx == 0; } bool empty() { return idx == 0; }
void reserve(size_t _size) { void reserve(size_t _size) {
if (arr.size() < _size) if (arr.size() < _size)
@ -49,6 +49,12 @@ struct StableArray {
idx++; idx++;
} }
} }
void copy(const StableArray<T>& other) {
resize(other.size());
for (size_t i = 0; i < other.size(); ++i) {
arr[i] = other.arr[i];
}
}
inline T& operator[](size_t pos) { return arr[pos]; } inline T& operator[](size_t pos) { return arr[pos]; }
//inline const T& operator[](size_t pos) { return arr[pos]; } //inline const T& operator[](size_t pos) { return arr[pos]; }
inline const T& operator[](size_t pos) const { return arr[pos]; } inline const T& operator[](size_t pos) const { return arr[pos]; }