完善了统计时间

This commit is contained in:
zzh 2026-01-01 00:41:55 +08:00
parent 95c4a16151
commit 745963831d
10 changed files with 132 additions and 126 deletions

View File

@ -5,5 +5,5 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
# set(CMAKE_BUILD_TYPE Debug) # set(CMAKE_BUILD_TYPE Debug)
# set(CMAKE_BUILD_TYPE Release) # set(CMAKE_BUILD_TYPE Release)
add_definitions(-DSHOW_PERF=1) # add_definitions(-DSHOW_PERF=1)
add_subdirectory(src) add_subdirectory(src)

View File

@ -153,8 +153,10 @@ int SerialBQSR(AuxVar &aux) {
++round; ++round;
// 一. 读取bam数据 // 一. 读取bam数据
size_t readNum = 0; size_t readNum = 0;
PROF_START(GP_read);
if (inBamBuf.ReadStat() >= 0) if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam(); readNum = inBamBuf.ReadBam();
PROF_GP_END(GP_read);
if (readNum < 1) { if (readNum < 1) {
break; break;
} }
@ -176,7 +178,7 @@ int SerialBQSR(AuxVar &aux) {
if (sd.read_len <= 0) if (sd.read_len <= 0)
continue; continue;
PROF_START(clip_read); PROF_START(GP_clip_read);
// 4. 对read的两端进行检测去除hardclipadapter // 4. 对read的两端进行检测去除hardclipadapter
ReadTransformer::hardClipAdaptorSequence(bw, sd); ReadTransformer::hardClipAdaptorSequence(bw, sd);
if (sd.read_len <= 0) if (sd.read_len <= 0)
@ -187,7 +189,7 @@ int SerialBQSR(AuxVar &aux) {
continue; continue;
// 应用所有的变换计算samdata的相关信息 // 应用所有的变换计算samdata的相关信息
sd.applyTransformations(); sd.applyTransformations();
PROF_END(gprof[GP_clip_read], clip_read); PROF_GP_END(GP_clip_read);
//const char* qname = bam_get_qname(sd.bw->b); //const char* qname = bam_get_qname(sd.bw->b);
// fprintf(gf[4], "%ld %d %d %d\n", sd.rid, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos)); // fprintf(gf[4], "%ld %d %d %d\n", sd.rid, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos));
@ -195,7 +197,7 @@ int SerialBQSR(AuxVar &aux) {
// 6. 更新每个read的platform信息好像没啥用暂时忽略 // 6. 更新每个read的platform信息好像没啥用暂时忽略
// 这里计算snp和indel有点问题snp和del结果不对白天调试一下 // 这里计算snp和indel有点问题snp和del结果不对白天调试一下
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel, 0);
// fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[4], "%d ", skips[ii] ? 1 : 0); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[4], "%d ", skips[ii] ? 1 : 0);
@ -224,9 +226,9 @@ int SerialBQSR(AuxVar &aux) {
// 到这里基本的数据都准备好了后续就是进行bqsr的统计了 // 到这里基本的数据都准备好了后续就是进行bqsr的统计了
// 8. 计算这条read对应的协变量 // 8. 计算这条read对应的协变量
PROF_START(covariate); PROF_START(GP_covariate);
CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true, 0);
PROF_END(gprof[GP_covariate], covariate); PROF_GP_END(GP_covariate);
// fprintf(gf[4], "%ld %d\n", sd.rid, sd.read_len); // fprintf(gf[4], "%ld %d\n", sd.rid, sd.read_len);
// for (auto &arr1 : readCovariates) { // for (auto &arr1 : readCovariates) {
@ -248,7 +250,7 @@ int SerialBQSR(AuxVar &aux) {
// fprintf(gf[3], "\n"); // fprintf(gf[3], "\n");
// 9. 计算这条read需要跳过的位置 // 9. 计算这条read需要跳过的位置
PROF_START(read_vcf); PROF_START(GP_read_vcf);
RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, 0); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, 0);
for (int ii = 0; ii < sd.read_len; ++ii) { for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
@ -257,7 +259,7 @@ int SerialBQSR(AuxVar &aux) {
//stringstream ss; //stringstream ss;
//for (auto s : skips) ss << (int)s << ' '; //for (auto s : skips) ss << (int)s << ' ';
//spdlog::info("{}", ss.str()); //spdlog::info("{}", ss.str());
PROF_GP_END(read_vcf); PROF_GP_END(GP_read_vcf);
// fprintf(gf[3], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // fprintf(gf[3], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[3], "%d ", skips[ii] ? 1 : 0); // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[3], "%d ", skips[ii] ? 1 : 0);
@ -268,19 +270,19 @@ int SerialBQSR(AuxVar &aux) {
// fprintf(gf[0], "\n"); // fprintf(gf[0], "\n");
// 10. 根据BAQ进一步处理snpindel得到处理后的数据 // 10. 根据BAQ进一步处理snpindel得到处理后的数据
PROF_START(frac_err); PROF_START(GP_frac_err);
RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors);
RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors);
RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors);
PROF_GP_END(frac_err); PROF_GP_END(GP_frac_err);
// aggregate all of the info into our info object, and update the data // aggregate all of the info into our info object, and update the data
// 11. 合并之前计算的数据得到info并更新bqsr table数据 // 11. 合并之前计算的数据得到info并更新bqsr table数据
ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors);
PROF_START(update_info); PROF_START(GP_update_info);
RecalUtils::updateRecalTablesForRead(info, recalTables); RecalUtils::updateRecalTablesForRead(info, recalTables);
PROF_END(gprof[GP_update_info], update_info); PROF_GP_END(GP_update_info);
} }
readNumSum += readNum; readNumSum += readNum;
inBamBuf.ClearAll(); // inBamBuf.ClearAll(); //
@ -289,26 +291,32 @@ int SerialBQSR(AuxVar &aux) {
spdlog::info("read count: {}", readNumSum); spdlog::info("read count: {}", readNumSum);
// 12. 创建总结数据 // 12. 创建总结数据
PROF_START(GP_collapse_round);
collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable);
roundTableValues(recalTables); roundTableValues(recalTables);
PROF_GP_END(GP_collapse_round);
#if 0 #if 0
printRecalTables(recalTables); printRecalTables(recalTables);
#endif #endif
// 13. 量化质量分数 // 13. 量化质量分数
PROF_START(GP_quantize);
QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);
PROF_GP_END(GP_quantize);
// 14. 输出结果 // 14. 输出结果
PROF_START(GP_print_report);
RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables); RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables);
PROF_GP_END(GP_print_report);
return 0; return 0;
} }
// 多线程处理bam数据, tmd是乱序的 // 多线程处理bam数据, tmd是乱序的
static void thread_worker(void* data, long idx, int tid, int steal) { static void thread_worker(void* data, long idx, int thid, int steal) {
// static void thread_worker(void* data, long idx, int tid) { // static void thread_worker(void* data, long idx, int tid) {
AuxVar& aux = (*(vector<AuxVar>*)data)[tid]; AuxVar& aux = (*(vector<AuxVar>*)data)[thid];
auto& readCovariates = aux.readCovariates; auto& readCovariates = aux.readCovariates;
RecalTables& recalTables = aux.recalTables; RecalTables& recalTables = aux.recalTables;
SamData& sd = aux.sd; SamData& sd = aux.sd;
@ -319,7 +327,6 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
auto &bams = *aux.bamArr; auto &bams = *aux.bamArr;
if (steal) if (steal)
for (auto& vcf : aux.vcfArr) vcf.knownSites.clear(); for (auto& vcf : aux.vcfArr) vcf.knownSites.clear();
int f = tid * 4;
#if 1 #if 1
int startIdx = idx * aux.BAM_BLOCK_NUM; int startIdx = idx * aux.BAM_BLOCK_NUM;
int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size()); int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size());
@ -338,26 +345,16 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
sd.rid = i + aux.processedReads; sd.rid = i + aux.processedReads;
if (sd.read_len <= 0) continue; if (sd.read_len <= 0) continue;
//PROF_START(clip_read); PROF_START(TP_clip_read);
ReadTransformer::hardClipAdaptorSequence(bw, sd); ReadTransformer::hardClipAdaptorSequence(bw, sd);
if (sd.read_len <= 0) continue; if (sd.read_len <= 0) continue;
ReadTransformer::hardClipSoftClippedBases(bw, sd); ReadTransformer::hardClipSoftClippedBases(bw, sd);
if (sd.read_len <= 0) continue; if (sd.read_len <= 0) continue;
sd.applyTransformations(); sd.applyTransformations();
// PROF_END(gprof[GP_clip_read], clip_read); PROF_TP_END(TP_clip_read);
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel, thid);
const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel);
#if 0
fprintf(gf[f + 0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 0], "%d ", isSNP[ii]);
fprintf(gf[f + 0], "\n");
fprintf(gf[f + 1], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 1], "%d ", isIns[ii]);
fprintf(gf[f + 1], "\n");
fprintf(gf[f + 2], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 2], "%d ", isDel[ii]);
fprintf(gf[f + 2], "\n");
#endif
bool baqCalculated = false; bool baqCalculated = false;
if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) {
baqCalculated = BAQ::flatBAQArray(sd, baqArray); baqCalculated = BAQ::flatBAQArray(sd, baqArray);
@ -366,44 +363,27 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
} }
if (!baqCalculated) continue; if (!baqCalculated) continue;
// PROF_START(covariate); PROF_START(TP_covariate);
CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true, thid);
// PROF_END(gprof[GP_covariate], covariate); PROF_TP_END(TP_covariate);
// PROF_START(read_vcf); RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, thid);
RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips, tid);
for (int ii = 0; ii < sd.read_len; ++ii) { for (int ii = 0; ii < sd.read_len; ++ii) {
skips[ii] = skips[ii] =
skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
} }
// PROF_GP_END(read_vcf);
#if 0 PROF_START(TP_frac_err);
fprintf(gf[f + 3], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[f + 3], "%d ", skips[ii] ? 1 : 0);
fprintf(gf[f + 3], "\n");
#endif
#if 0
int fidx = 0 + 2 * tid;
//if (sd.rid % 2 == 0) fidx = 0 + 2 * tid;
//else fidx = 1 + 2 * tid;
fprintf(gf[fidx], "%ld %d\t", sd.rid, sd.read_len);
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[fidx], "%d ", skips[ii] ? 1 : 0);
fprintf(gf[fidx], "\n");
#endif
// PROF_START(frac_err);
RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors);
RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors);
RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors);
// PROF_GP_END(frac_err); PROF_TP_END(TP_frac_err);
ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors);
//PROF_START(update_info); PROF_START(TP_update_info);
RecalUtils::updateRecalTablesForRead(info, recalTables); RecalUtils::updateRecalTablesForRead(info, recalTables);
//PROF_END(gprof[GP_update_info], update_info); PROF_TP_END(TP_update_info);
} }
} }
@ -418,7 +398,9 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
++round; ++round;
// 一. 读取bam数据 // 一. 读取bam数据
size_t readNum = 0; size_t readNum = 0;
PROF_START(GP_read);
if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam();
PROF_GP_END(GP_read);
if (readNum < 1) { break; } if (readNum < 1) { break; }
auto bams = inBamBuf.GetBamArr(); auto bams = inBamBuf.GetBamArr();
for_each(auxArr.begin(), auxArr.end(), [&](AuxVar& aux) { for_each(auxArr.begin(), auxArr.end(), [&](AuxVar& aux) {
@ -426,11 +408,13 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
}); });
spdlog::info("{} reads processed in {} round", readNum, round); spdlog::info("{} reads processed in {} round", readNum, round);
PROF_START(GP_thread_worker);
#if 1 #if 1
kt_for_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM); kt_for_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM);
#else #else
kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size()); kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size());
#endif #endif
PROF_GP_END(GP_thread_worker);
readNumSum += readNum; readNumSum += readNum;
AuxVar::processedReads += readNum; AuxVar::processedReads += readNum;
inBamBuf.ClearAll(); // inBamBuf.ClearAll(); //
@ -442,6 +426,8 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
// printRecalTables(recalTables); // printRecalTables(recalTables);
for (int i = 0; i < auxArr.size(); ++i) for (int i = 0; i < auxArr.size(); ++i)
spdlog::info("thread {} processed reads {}.", i, auxArr[i].threadProcessedReads); spdlog::info("thread {} processed reads {}.", i, auxArr[i].threadProcessedReads);
PROF_START(GP_merge_covs);
for (int i = 1; i < auxArr.size(); ++i) { for (int i = 1; i < auxArr.size(); ++i) {
auxArr[0].threadProcessedReads += auxArr[i].threadProcessedReads; auxArr[0].threadProcessedReads += auxArr[i].threadProcessedReads;
_Foreach3DK(auxArr[i].recalTables.qualityScoreTable, qualDatum, { _Foreach3DK(auxArr[i].recalTables.qualityScoreTable, qualDatum, {
@ -460,19 +446,26 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
} }
}); });
} }
PROF_GP_END(GP_merge_covs);
spdlog::info("All processed reads {}.", auxArr[0].threadProcessedReads); spdlog::info("All processed reads {}.", auxArr[0].threadProcessedReads);
// 创建总结数据 // 创建总结数据
PROF_START(GP_collapse_round);
collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable);
roundTableValues(recalTables); roundTableValues(recalTables);
PROF_GP_END(GP_collapse_round);
// printRecalTables(recalTables); // printRecalTables(recalTables);
// 量化质量分数 // 量化质量分数
PROF_START(GP_quantize);
QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS);
PROF_GP_END(GP_quantize);
// 输出结果 // 输出结果
PROF_START(GP_print_report);
RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables); RecalUtils::outputRecalibrationReport(nsgv::gBqsrArg, quantInfo, recalTables);
PROF_GP_END(GP_print_report);
return 0; return 0;
} }
@ -562,7 +555,7 @@ static void globalDestroy() {
int BaseRecalibrator() { int BaseRecalibrator() {
int ret = 0; int ret = 0;
PROF_START(whole_process); PROF_START(GP_whole_process);
globalInit(); globalInit();
if (nsgv::gBqsrArg.NUM_THREADS == 1) if (nsgv::gBqsrArg.NUM_THREADS == 1)
ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据生成recal table ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据生成recal table
@ -570,7 +563,7 @@ int BaseRecalibrator() {
ret = ParallelBQSR(nsgv::gAuxVars); // 并行处理数据生成recal table ret = ParallelBQSR(nsgv::gAuxVars); // 并行处理数据生成recal table
globalDestroy(); globalDestroy();
sam_close(nsgv::gInBamFp); sam_close(nsgv::gInBamFp);
PROF_END(gprof[GP_whole_process], whole_process); PROF_GP_END(GP_whole_process);
return ret; return ret;
} }

View File

@ -1,4 +1,5 @@
#include "covariate.h" #include "covariate.h"
#include "util/profiling.h"
// for EventType // for EventType
EventTypeValue EventType::BASE_SUBSTITUTION = {0, 'M', "Base Substitution"}; EventTypeValue EventType::BASE_SUBSTITUTION = {0, 'M', "Base Substitution"};
@ -24,11 +25,19 @@ int CycleCovariate::MAXIMUM_CYCLE_VALUE;
// for CovariateUtils // for CovariateUtils
// 对一条read计算协变量该协变量被上一个read用过 // 对一条read计算协变量该协变量被上一个read用过
void CovariateUtils::ComputeCovariates(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values, void CovariateUtils::ComputeCovariates(SamData& sd, sam_hdr_t* header, PerReadCovariateMatrix& values,
bool recordIndelValues) { bool recordIndelValues, int thid) {
PROF_START(TP_readgroup);
ReadGroupCovariate::RecordValues(sd, header, values, recordIndelValues); ReadGroupCovariate::RecordValues(sd, header, values, recordIndelValues);
PROF_TP_END(TP_readgroup);
PROF_START(TP_qualityscore);
BaseQualityCovariate::RecordValues(sd, header, values, recordIndelValues); BaseQualityCovariate::RecordValues(sd, header, values, recordIndelValues);
PROF_TP_END(TP_qualityscore);
PROF_START(TP_context);
ContextCovariate::RecordValues(sd, header, values, recordIndelValues); ContextCovariate::RecordValues(sd, header, values, recordIndelValues);
PROF_TP_END(TP_context);
PROF_START(TP_cycle);
CycleCovariate::RecordValues(sd, header, values, recordIndelValues); CycleCovariate::RecordValues(sd, header, values, recordIndelValues);
PROF_TP_END(TP_cycle);
} }
/* /*

View File

@ -316,5 +316,5 @@ struct CovariateUtils {
} }
// 对一条read计算协变量该协变量被上一个read用过 // 对一条read计算协变量该协变量被上一个read用过
static void ComputeCovariates(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues); static void ComputeCovariates(SamData& ad, sam_hdr_t* header, PerReadCovariateMatrix& values, bool recordIndelValues, int thid);
}; };

View File

@ -29,19 +29,19 @@ struct RecalFuncs {
} }
// 计算该read的每个碱基位置是否是SNP或Indel // 计算该read的每个碱基位置是否是SNP或Indel
static int calculateIsSNPOrIndel(AuxVar& aux, SamData& sd, StableArray<int>& isSNP, StableArray<int>& isIns, StableArray<int>& isDel) { static int calculateIsSNPOrIndel(AuxVar& aux, SamData& sd, StableArray<int>& isSNP, StableArray<int>& isIns, StableArray<int>& isDel, int thid) {
isSNP.resize_fill(sd.read_len, 0); isSNP.resize_fill(sd.read_len, 0);
isIns.resize_fill(sd.read_len, 0); isIns.resize_fill(sd.read_len, 0);
isDel.resize_fill(sd.read_len, 0); isDel.resize_fill(sd.read_len, 0);
// 1. 读取参考基因组先看看串行运行性能稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤 // 1. 读取参考基因组先看看串行运行性能稍后可以将读入ref和vcf合并起来做成一个并行流水线步骤
Interval interval{sd.start_pos, sd.end_pos}; // 闭区间 Interval interval{sd.start_pos, sd.end_pos}; // 闭区间
PROF_START(read_ref); PROF_START(TP_read_ref);
read_ref_base(aux, interval.left, interval); read_ref_base(aux, interval.left, interval);
PROF_GP_END(read_ref); PROF_TP_END(TP_read_ref);
const char *refBases = aux.ref_seq; const char *refBases = aux.ref_seq;
// 2. 遍历cigar计算每个碱基是否是SNP或Indel // 2. 遍历cigar计算每个碱基是否是SNP或Indel
PROF_START(calc_snp); PROF_START(TP_calc_snp);
int readPos = 0, refPos = 0, nEvents = 0; int readPos = 0, refPos = 0, nEvents = 0;
for (int i = 0; i < sd.cigars.size(); ++i) { for (int i = 0; i < sd.cigars.size(); ++i) {
const char c = sd.cigars[i].op; const char c = sd.cigars[i].op;
@ -77,7 +77,7 @@ struct RecalFuncs {
} }
} }
nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0); nEvents += std::accumulate(isIns.begin(), isIns.end(), 0) + std::accumulate(isDel.begin(), isDel.end(), 0);
PROF_GP_END(calc_snp); PROF_TP_END(TP_calc_snp);
return nEvents; return nEvents;
} }
@ -114,6 +114,7 @@ struct RecalFuncs {
// update vcfs // update vcfs
// int idx = 0; // int idx = 0;
PROF_START(TP_read_vcf);
for (auto& vcf : vcfs) { for (auto& vcf : vcfs) {
// 为啥多线程环境会出现deque的front和[0]不一样?好像是调试的时候的问题,实际运行时没再出现 // 为啥多线程环境会出现deque的front和[0]不一样?好像是调试的时候的问题,实际运行时没再出现
// if (vcf.knownSites.front().left != vcf.knownSites[0].left || vcf.knownSites.front().right != vcf.knownSites[0].right) // if (vcf.knownSites.front().left != vcf.knownSites[0].left || vcf.knownSites.front().right != vcf.knownSites[0].right)
@ -166,7 +167,9 @@ struct RecalFuncs {
} }
} }
} }
PROF_TP_END(TP_read_vcf);
// fprintf(gf[0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid); // fprintf(gf[0], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
PROF_START(TP_calc_skips);
for (auto& vcf : vcfs) { for (auto& vcf : vcfs) {
for (auto& intv : vcf.knownSites) { for (auto& intv : vcf.knownSites) {
// knownSite is outside clipping window for the read, ignore // knownSite is outside clipping window for the read, ignore
@ -198,6 +201,7 @@ struct RecalFuncs {
} }
//idx += 1; //idx += 1;
} }
PROF_TP_END(TP_calc_skips);
//fprintf(gf[0], "\n"); //fprintf(gf[0], "\n");
} }

View File

@ -143,7 +143,7 @@ int main_BaseRecalibrator(int argc, char *argv[]) {
BaseRecalibrator(); BaseRecalibrator();
spdlog::info("fast base recalibration phase-1 end"); spdlog::info("fast base recalibration phase-1 end");
DisplayProfiling(1); DisplayProfiling(nsgv::gBqsrArg.NUM_THREADS);
return 0; return 0;
} }

View File

@ -36,50 +36,49 @@ static int CalcThreadTime(uint64_t *a, int len, double *max, double *min, double
} }
#define PRINT_GP(gpname) \ #define PRINT_GP(gpname) \
fprintf(stderr, "time G %-15s: %0.2lfs\n", #gpname, gprof[GP_##gpname] * 1.0 / proc_freq); fprintf(stderr, "time %-15s: %0.2lfs\n", #gpname, gprof[gpname] * 1.0 / proc_freq);
#define PRINT_TP(tpname, nthread) \ #define PRINT_TP(tpname) \
{ \ { \
double maxTime, minTime, avgTime; \ double maxTime, minTime, avgTime; \
CalcThreadTime(tprof[TP_##tpname], nthread, &maxTime, &minTime, &avgTime); \ CalcThreadTime(tprof[tpname], nthread, &maxTime, &minTime, &avgTime); \
fprintf(stderr, "time T %-15s: avg %0.2lfs min %0.2lfs max %0.2lfs\n", #tpname, avgTime, minTime, maxTime); \ fprintf(stderr, "time %-15s: avg %0.2lfs min %0.2lfs max %0.2lfs\n", #tpname, avgTime, minTime, maxTime); \
} }
int DisplayProfiling(int nthread) { int DisplayProfiling(int nthread) {
#ifdef SHOW_PERF #ifdef SHOW_PERF
fprintf(stderr, "\n"); fprintf(stderr, "\n");
// PRINT_GP(read_wait); PRINT_GP(GP_read);
// PRINT_GP(gen_wait); if (nthread == 1) {
// PRINT_GP(sort_wait); PRINT_GP(GP_clip_read);
// PRINT_GP(markdup_wait); PRINT_GP(GP_read_ref);
// PRINT_GP(intersect_wait); PRINT_GP(GP_calc_snp);
PRINT_GP(clip_read); PRINT_GP(GP_read_vcf);
PRINT_GP(read_ref); PRINT_TP(TP_read_vcf);
PRINT_GP(calc_snp); PRINT_TP(TP_calc_skips);
PRINT_GP(read_vcf); PRINT_GP(GP_covariate);
PRINT_GP(covariate); PRINT_TP(TP_readgroup);
PRINT_GP(frac_err); PRINT_TP(TP_qualityscore);
PRINT_GP(update_info); PRINT_TP(TP_context);
//PRINT_GP(markdup); PRINT_TP(TP_cycle);
//PRINT_GP(intersect); PRINT_GP(GP_frac_err);
// PRINT_GP(merge_result); PRINT_GP(GP_update_info);
// PRINT_GP(sort_pair); } else {
// PRINT_GP(sort_frag); PRINT_TP(TP_clip_read);
// PRINT_GP(markdup_pair); PRINT_TP(TP_read_ref);
// PRINT_GP(markdup_frag); PRINT_TP(TP_calc_snp);
// PRINT_GP(merge_match); PRINT_TP(TP_read_vcf);
// PRINT_GP(merge_markdup); PRINT_TP(TP_calc_skips);
// PRINT_GP(merge_update); PRINT_TP(TP_covariate);
// PRINT_GP(merge_add); PRINT_TP(TP_readgroup);
//PRINT_GP(markdup_all); PRINT_TP(TP_qualityscore);
// PRINT_GP(final_read); PRINT_TP(TP_context);
//PRINT_GP(write); PRINT_TP(TP_cycle);
// PRINT_GP(whole_process); PRINT_TP(TP_frac_err);
PRINT_TP(TP_update_info);
// PRINT_TP(gen, nthread); }
// PRINT_TP(sort_frag, nthread); PRINT_GP(GP_whole_process);
// PRINT_TP(sort_pair, nthread);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
#endif #endif

View File

@ -25,7 +25,8 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE];
#define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec() #define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec() #define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time #define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time
#define PROF_GP_END(tmp_time) gprof[GP_##tmp_time] += RealtimeMsec() - prof_tmp_##tmp_time #define PROF_GP_END(tmp_time) gprof[tmp_time] += RealtimeMsec() - prof_tmp_##tmp_time
#define PROF_TP_END(tmp_time) tprof[tmp_time][thid] += RealtimeMsec() - prof_tmp_##tmp_time
#define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec() #define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec()
#define PROF_PRINT_END(tmp_time) \ #define PROF_PRINT_END(tmp_time) \
tmp_time = RealtimeMsec() - tmp_time; \ tmp_time = RealtimeMsec() - tmp_time; \
@ -34,6 +35,7 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE];
#define PROF_START(tmp_time) #define PROF_START(tmp_time)
#define PROF_END(result, tmp_time) #define PROF_END(result, tmp_time)
#define PROF_GP_END(tmp_time) #define PROF_GP_END(tmp_time)
#define PROF_TP_END(tmp_time)
#define PROF_PRINT_START(tmp_time) #define PROF_PRINT_START(tmp_time)
#define PROF_PRINT_END(tmp_time) #define PROF_PRINT_END(tmp_time)
#endif #endif
@ -41,40 +43,39 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE];
// GLOBAL // GLOBAL
enum { GP_0 = 0, GP_1, GP_2, GP_3, GP_4, GP_5, GP_6, GP_7, GP_8, GP_9, GP_10 }; enum { GP_0 = 0, GP_1, GP_2, GP_3, GP_4, GP_5, GP_6, GP_7, GP_8, GP_9, GP_10 };
enum { enum {
GP_read_wait = 11, GP_clip_read = 11,
GP_clip_read,
GP_calc_snp, GP_calc_snp,
GP_covariate, GP_covariate,
GP_read_ref, GP_read_ref,
GP_read_vcf, GP_read_vcf,
GP_frac_err, GP_frac_err,
GP_update_info, GP_update_info,
GP_gen_wait, GP_merge_covs,
GP_sort_wait, GP_collapse_round,
GP_markdup_wait, GP_quantize,
GP_intersect_wait, GP_print_report,
GP_read, GP_read,
GP_gen,
GP_sort,
GP_markdup,
GP_intersect,
GP_merge_result,
GP_markdup_pair,
GP_markdup_frag,
GP_sort_pair,
GP_sort_frag,
GP_merge_match,
GP_merge_markdup,
GP_merge_update,
GP_merge_add,
GP_markdup_all,
GP_final_read,
GP_write, GP_write,
GP_thread_worker,
GP_whole_process GP_whole_process
}; };
// THREAD // THREAD
enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 }; enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 };
enum { TP_gen = 11, TP_sort, TP_sort_frag, TP_sort_pair}; enum {
TP_clip_read = 11,
TP_calc_snp,
TP_covariate,
TP_read_ref,
TP_read_vcf,
TP_calc_skips,
TP_frac_err,
TP_readgroup,
TP_qualityscore,
TP_context,
TP_cycle,
TP_update_info,
TP_whole_process
};
uint64_t RealtimeMsec(void); uint64_t RealtimeMsec(void);