重构完毕,正确运行,加了一些性能统计数据

This commit is contained in:
zzh 2024-12-15 17:18:07 +08:00
parent e8b5b4405a
commit 5d9a52e90b
8 changed files with 71 additions and 10 deletions

View File

@ -5,5 +5,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
# set(CMAKE_BUILD_TYPE Debug) # set(CMAKE_BUILD_TYPE Debug)
# set(CMAKE_BUILD_TYPE Release) # set(CMAKE_BUILD_TYPE Release)
add_definitions(-DSHOW_PERF) # add_definitions(-DSHOW_PERF)
add_definitions(-DSHOW_PERF=1)
add_subdirectory(src) add_subdirectory(src)

View File

@ -9,9 +9,9 @@ aux_source_directory(${PROJECT_SOURCE_DIR}/src/markdup MARKDUP_SRC)
set(KTHREAD_FILE ${PROJECT_SOURCE_DIR}/ext/klib/kthread.c) set(KTHREAD_FILE ${PROJECT_SOURCE_DIR}/ext/klib/kthread.c)
# including path # including path
include_directories("${PROJECT_SOURCE_DIR}/src")
include_directories("${PROJECT_SOURCE_DIR}/ext") include_directories("${PROJECT_SOURCE_DIR}/ext")
include_directories("${PROJECT_SOURCE_DIR}/ext/htslib") include_directories("${PROJECT_SOURCE_DIR}/ext/htslib")
include_directories("${PROJECT_SOURCE_DIR}/src")
# linking path # linking path
link_directories("${PROJECT_SOURCE_DIR}/ext/htslib") link_directories("${PROJECT_SOURCE_DIR}/ext/htslib")

View File

@ -10,7 +10,7 @@
#include "markdup/md_args.h" #include "markdup/md_args.h"
#include "util/profiling.h" #include "util/profiling.h"
#include "version.h" #include "fastdup_version.h"
namespace nsgv { namespace nsgv {
extern MarkDupsArg gMdArg; extern MarkDupsArg gMdArg;

View File

@ -14,12 +14,12 @@ Date : 2023/10/23
#include <vector> #include <vector>
#include "dup_metrics.h" #include "dup_metrics.h"
#include "fastdup_version.h"
#include "md_args.h" #include "md_args.h"
#include "md_funcs.h" #include "md_funcs.h"
#include "pipeline_md.h" #include "pipeline_md.h"
#include "read_name_parser.h" #include "read_name_parser.h"
#include "util/profiling.h" #include "util/profiling.h"
#include "version.h"
#define BAM_BLOCK_SIZE 16L * 1024 * 1024 #define BAM_BLOCK_SIZE 16L * 1024 * 1024
@ -81,13 +81,32 @@ int MarkDuplicates() {
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
// 测试读写速度 // 测试读写速度
#if 1 #if 0
bam1_t *bamp = bam_init1(); bam1_t *bamp = bam_init1();
while (sam_read1(nsgv::gInBamFp, nsgv::gInBamHeader, bamp) >= 0); while (sam_read1(nsgv::gInBamFp, nsgv::gInBamHeader, bamp) >= 0);
DisplayProfiling(nsgv::gMdArg.NUM_THREADS); DisplayProfiling(nsgv::gMdArg.NUM_THREADS);
exit(0); exit(0);
#endif #endif
/* 冗余检查和标记 */
pipelineMarkDups();
/* 初始化输出文件 */
char modeout[12] = "wb";
sam_open_mode(modeout + 1, nsgv::gMdArg.OUTPUT_FILE.c_str(), NULL);
nsgv::gOutBamFp = sam_open(nsgv::gMdArg.OUTPUT_FILE.c_str(), modeout);
nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader);
// 修改输出文件的header
// sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION,
// "CL",
// (nsgv::gMdArg.PROGRAM_RECORD_ID + " " + nsgv::gMdArg.GetArgValueString() + " " +
// nsgv::gMdArg.GetArgValueString())
// .c_str(),
// NULL);
// 用同样的线程池处理输出文件
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite);
return 0; return 0;

View File

@ -364,6 +364,7 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) {
frags.clear(); frags.clear();
unpairedDic.clear(); unpairedDic.clear();
PROF_START(gen);
size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); size_t start_id = LOWER_BOUND(idx, nThread, bams.size());
size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); size_t end_id = UPPER_BOUND(idx, nThread, bams.size());
for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read
@ -390,9 +391,16 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) {
} }
} }
} }
PROF_END(tprof[TP_gen][tid], gen);
PROF_START(sort_frag);
// sortReadEndsArr(frags); // sortReadEndsArr(frags);
sort(frags.begin(), frags.end()); sort(frags.begin(), frags.end());
PROF_END(tprof[TP_sort_frag][tid], sort_frag);
PROF_START(sort_pair);
sort(pairs.begin(), pairs.end()); sort(pairs.begin(), pairs.end());
PROF_END(tprof[TP_sort_pair][tid], sort_pair);
} }
static void doGenRE(PipelineArg &pipeArg) { static void doGenRE(PipelineArg &pipeArg) {
@ -451,14 +459,18 @@ static void doSort(PipelineArg &pipeArg) {
smd.frags.clear(); smd.frags.clear();
const ReadEnds *pRE; const ReadEnds *pRE;
ReadEndsHeap pairsHeap, fragsHeap; ReadEndsHeap pairsHeap, fragsHeap;
PROF_START(sort_pair);
pairsHeap.Init(&genREData.pairsArr); pairsHeap.Init(&genREData.pairsArr);
while ((pRE = pairsHeap.Pop()) != nullptr) { while ((pRE = pairsHeap.Pop()) != nullptr) {
smd.pairs.push_back(*pRE); smd.pairs.push_back(*pRE);
} }
PROF_END(gprof[GP_sort_pair], sort_pair);
PROF_START(sort_frag);
fragsHeap.Init(&genREData.fragsArr); fragsHeap.Init(&genREData.fragsArr);
while ((pRE = fragsHeap.Pop()) != nullptr) { while ((pRE = fragsHeap.Pop()) != nullptr) {
smd.frags.push_back(*pRE); smd.frags.push_back(*pRE);
} }
PROF_END(gprof[GP_sort_frag], sort_frag);
} }
// for step-4 sort // for step-4 sort
static void doMarkDup(PipelineArg &pipeArg) { static void doMarkDup(PipelineArg &pipeArg) {
@ -477,9 +489,13 @@ static void doMarkDup(PipelineArg &pipeArg) {
SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; SortMarkData &smd = *(SortMarkData *)mdData.dataPtr;
// 先处理 pair // 先处理 pair
PROF_START(markdup_pair);
processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx); processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx);
PROF_END(gprof[GP_markdup_pair], markdup_pair);
// 再处理frag // 再处理frag
PROF_START(markdup_frag);
processFrags(smd.frags, &mdData.fragDupIdx); processFrags(smd.frags, &mdData.fragDupIdx);
PROF_END(gprof[GP_markdup_frag], markdup_frag);
} }
template <typename T> template <typename T>
@ -947,6 +963,7 @@ static void mergeAllTask(PipelineArg &pipeArg) {
IntersectData &g = pipeArg.intersectData; IntersectData &g = pipeArg.intersectData;
SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr;
// 遗留的未匹配的pair // 遗留的未匹配的pair
PROF_START(merge_match);
for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first; auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second; auto &prevPosInfo = prevUnpair.second;
@ -965,7 +982,9 @@ static void mergeAllTask(PipelineArg &pipeArg) {
g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数
} }
} }
PROF_END(gprof[GP_merge_match], merge_match);
PROF_START(merge_markdup);
map<int64_t, TaskSeqDupInfo> taskChanged; map<int64_t, TaskSeqDupInfo> taskChanged;
for (auto &e : g.unpairedPosArr) { for (auto &e : g.unpairedPosArr) {
auto posKey = e.first; auto posKey = e.first;
@ -990,8 +1009,10 @@ static void mergeAllTask(PipelineArg &pipeArg) {
g.latterNotRepIdxArr[taskSeq]); g.latterNotRepIdxArr[taskSeq]);
} }
g.unpairedPosArr.clear(); g.unpairedPosArr.clear();
PROF_END(gprof[GP_merge_markdup], merge_markdup);
// 将dupidx放进全局数据 // 将dupidx放进全局数据
PROF_START(merge_update);
vector<DupInfo> cacheDupIdx; vector<DupInfo> cacheDupIdx;
vector<DupInfo> midArr; vector<DupInfo> midArr;
vector<int64_t> intCacheDupIdx; vector<int64_t> intCacheDupIdx;
@ -1004,7 +1025,9 @@ static void mergeAllTask(PipelineArg &pipeArg) {
intCacheDupIdx, intMidArr); intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
PROF_END(gprof[GP_merge_update], merge_update);
PROF_START(merge_add);
g.dupIdxArr.push_back(vector<DupInfo>()); g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back(); auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
@ -1020,6 +1043,7 @@ static void mergeAllTask(PipelineArg &pipeArg) {
auto &vRepIdx = g.repIdxArr.back(); auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end()); std::sort(vRepIdx.begin(), vRepIdx.end());
PROF_END(gprof[GP_merge_add], merge_add);
} }
static void parallelPipeline() { static void parallelPipeline() {
@ -1083,8 +1107,8 @@ static void parallelPipeline() {
// cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl; // cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl;
// cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl; // cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl;
// //
// cout << "dup num : " << dupNum << "\t" << dup.size() << endl; cout << "dup num : " << dupNum << "\t" << dup.size() << endl;
// cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl;
} }

View File

@ -60,9 +60,18 @@ int DisplayProfiling(int nthread) {
PRINT_GP(markdup); PRINT_GP(markdup);
PRINT_GP(intersect); PRINT_GP(intersect);
PRINT_GP(merge_result); PRINT_GP(merge_result);
PRINT_GP(sort_pair);
PRINT_GP(sort_frag);
PRINT_GP(markdup_pair);
PRINT_GP(markdup_frag);
PRINT_GP(merge_match);
PRINT_GP(merge_markdup);
PRINT_GP(merge_update);
PRINT_GP(merge_add);
PRINT_TP(gen, nthread); PRINT_TP(gen, nthread);
PRINT_TP(sort, nthread); PRINT_TP(sort_frag, nthread);
PRINT_TP(sort_pair, nthread);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
#endif #endif

View File

@ -43,11 +43,19 @@ enum {
GP_sort, GP_sort,
GP_markdup, GP_markdup,
GP_intersect, GP_intersect,
GP_merge_result GP_merge_result,
GP_markdup_pair,
GP_markdup_frag,
GP_sort_pair,
GP_sort_frag,
GP_merge_match,
GP_merge_markdup,
GP_merge_update,
GP_merge_add
}; };
// THREAD // THREAD
enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 }; enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 };
enum { TP_gen = 11, TP_sort }; enum { TP_gen = 11, TP_sort, TP_sort_frag, TP_sort_pair};
uint64_t RealtimeMsec(void); uint64_t RealtimeMsec(void);