From 5d9a52e90b3298d567b682ab05279fe1962e9dbd Mon Sep 17 00:00:00 2001 From: zzh Date: Sun, 15 Dec 2024 17:18:07 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=AE=8C=E6=AF=95=EF=BC=8C?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E8=BF=90=E8=A1=8C=EF=BC=8C=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E4=B8=80=E4=BA=9B=E6=80=A7=E8=83=BD=E7=BB=9F=E8=AE=A1=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 3 ++- src/CMakeLists.txt | 2 +- src/{version.h => fastdup_version.h} | 0 src/main.cpp | 2 +- src/markdup/markdup.cpp | 23 +++++++++++++++++++++-- src/markdup/pipeline_md.cpp | 28 ++++++++++++++++++++++++++-- src/util/profiling.cpp | 11 ++++++++++- src/util/profiling.h | 12 ++++++++++-- 8 files changed, 71 insertions(+), 10 deletions(-) rename src/{version.h => fastdup_version.h} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index cf8e1f8..1c5e3ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,5 +5,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_BUILD_TYPE Debug) # set(CMAKE_BUILD_TYPE Release) -add_definitions(-DSHOW_PERF) +# add_definitions(-DSHOW_PERF) +add_definitions(-DSHOW_PERF=1) add_subdirectory(src) \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 56f0d07..a80c2de 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,9 +9,9 @@ aux_source_directory(${PROJECT_SOURCE_DIR}/src/markdup MARKDUP_SRC) set(KTHREAD_FILE ${PROJECT_SOURCE_DIR}/ext/klib/kthread.c) # including path -include_directories("${PROJECT_SOURCE_DIR}/src") include_directories("${PROJECT_SOURCE_DIR}/ext") include_directories("${PROJECT_SOURCE_DIR}/ext/htslib") +include_directories("${PROJECT_SOURCE_DIR}/src") # linking path link_directories("${PROJECT_SOURCE_DIR}/ext/htslib") diff --git a/src/version.h b/src/fastdup_version.h similarity index 100% rename from src/version.h rename to src/fastdup_version.h diff --git a/src/main.cpp b/src/main.cpp index 88d801f..964bb6a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -10,7 +10,7 @@ #include "markdup/md_args.h" #include "util/profiling.h" -#include "version.h" +#include "fastdup_version.h" namespace nsgv { extern MarkDupsArg gMdArg; diff --git a/src/markdup/markdup.cpp b/src/markdup/markdup.cpp index e729237..3e7be67 100644 --- a/src/markdup/markdup.cpp +++ b/src/markdup/markdup.cpp @@ -14,12 +14,12 @@ Date : 2023/10/23 #include #include "dup_metrics.h" +#include "fastdup_version.h" #include "md_args.h" #include "md_funcs.h" #include "pipeline_md.h" #include "read_name_parser.h" #include "util/profiling.h" -#include "version.h" #define BAM_BLOCK_SIZE 16L * 1024 * 1024 @@ -81,13 +81,32 @@ int MarkDuplicates() { hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); // 测试读写速度 -#if 1 +#if 0 bam1_t *bamp = bam_init1(); while (sam_read1(nsgv::gInBamFp, nsgv::gInBamHeader, bamp) >= 0); DisplayProfiling(nsgv::gMdArg.NUM_THREADS); exit(0); #endif + /* 冗余检查和标记 */ + pipelineMarkDups(); + + /* 初始化输出文件 */ + char modeout[12] = "wb"; + sam_open_mode(modeout + 1, nsgv::gMdArg.OUTPUT_FILE.c_str(), NULL); + nsgv::gOutBamFp = sam_open(nsgv::gMdArg.OUTPUT_FILE.c_str(), modeout); + nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader); + // 修改输出文件的header + // sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION, + // "CL", + // (nsgv::gMdArg.PROGRAM_RECORD_ID + " " + nsgv::gMdArg.GetArgValueString() + " " + + // nsgv::gMdArg.GetArgValueString()) + // .c_str(), + // NULL); + + // 用同样的线程池处理输出文件 + hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); return 0; diff --git a/src/markdup/pipeline_md.cpp b/src/markdup/pipeline_md.cpp index 8b50905..38b705a 100644 --- a/src/markdup/pipeline_md.cpp +++ b/src/markdup/pipeline_md.cpp @@ -364,6 +364,7 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { frags.clear(); unpairedDic.clear(); + PROF_START(gen); size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read @@ -390,9 +391,16 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { } } } + PROF_END(tprof[TP_gen][tid], gen); + + PROF_START(sort_frag); // sortReadEndsArr(frags); sort(frags.begin(), frags.end()); + PROF_END(tprof[TP_sort_frag][tid], sort_frag); + + PROF_START(sort_pair); sort(pairs.begin(), pairs.end()); + PROF_END(tprof[TP_sort_pair][tid], sort_pair); } static void doGenRE(PipelineArg &pipeArg) { @@ -451,14 +459,18 @@ static void doSort(PipelineArg &pipeArg) { smd.frags.clear(); const ReadEnds *pRE; ReadEndsHeap pairsHeap, fragsHeap; + PROF_START(sort_pair); pairsHeap.Init(&genREData.pairsArr); while ((pRE = pairsHeap.Pop()) != nullptr) { smd.pairs.push_back(*pRE); } + PROF_END(gprof[GP_sort_pair], sort_pair); + PROF_START(sort_frag); fragsHeap.Init(&genREData.fragsArr); while ((pRE = fragsHeap.Pop()) != nullptr) { smd.frags.push_back(*pRE); } + PROF_END(gprof[GP_sort_frag], sort_frag); } // for step-4 sort static void doMarkDup(PipelineArg &pipeArg) { @@ -477,9 +489,13 @@ static void doMarkDup(PipelineArg &pipeArg) { SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; // 先处理 pair + PROF_START(markdup_pair); processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx); + PROF_END(gprof[GP_markdup_pair], markdup_pair); // 再处理frag + PROF_START(markdup_frag); processFrags(smd.frags, &mdData.fragDupIdx); + PROF_END(gprof[GP_markdup_frag], markdup_frag); } template @@ -947,6 +963,7 @@ static void mergeAllTask(PipelineArg &pipeArg) { IntersectData &g = pipeArg.intersectData; SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; // 遗留的未匹配的pair + PROF_START(merge_match); for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read auto &readName = prevUnpair.first; auto &prevPosInfo = prevUnpair.second; @@ -965,7 +982,9 @@ static void mergeAllTask(PipelineArg &pipeArg) { g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 } } + PROF_END(gprof[GP_merge_match], merge_match); + PROF_START(merge_markdup); map taskChanged; for (auto &e : g.unpairedPosArr) { auto posKey = e.first; @@ -990,8 +1009,10 @@ static void mergeAllTask(PipelineArg &pipeArg) { g.latterNotRepIdxArr[taskSeq]); } g.unpairedPosArr.clear(); + PROF_END(gprof[GP_merge_markdup], merge_markdup); // 将dupidx放进全局数据 + PROF_START(merge_update); vector cacheDupIdx; vector midArr; vector intCacheDupIdx; @@ -1004,7 +1025,9 @@ static void mergeAllTask(PipelineArg &pipeArg) { intCacheDupIdx, intMidArr); for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); + PROF_END(gprof[GP_merge_update], merge_update); + PROF_START(merge_add); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); @@ -1020,6 +1043,7 @@ static void mergeAllTask(PipelineArg &pipeArg) { auto &vRepIdx = g.repIdxArr.back(); vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); std::sort(vRepIdx.begin(), vRepIdx.end()); + PROF_END(gprof[GP_merge_add], merge_add); } static void parallelPipeline() { @@ -1083,8 +1107,8 @@ static void parallelPipeline() { // cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl; // cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl; // -// cout << "dup num : " << dupNum << "\t" << dup.size() << endl; -// cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; + cout << "dup num : " << dupNum << "\t" << dup.size() << endl; + cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; } diff --git a/src/util/profiling.cpp b/src/util/profiling.cpp index d3b6800..eee577f 100644 --- a/src/util/profiling.cpp +++ b/src/util/profiling.cpp @@ -60,9 +60,18 @@ int DisplayProfiling(int nthread) { PRINT_GP(markdup); PRINT_GP(intersect); PRINT_GP(merge_result); + PRINT_GP(sort_pair); + PRINT_GP(sort_frag); + PRINT_GP(markdup_pair); + PRINT_GP(markdup_frag); + PRINT_GP(merge_match); + PRINT_GP(merge_markdup); + PRINT_GP(merge_update); + PRINT_GP(merge_add); PRINT_TP(gen, nthread); - PRINT_TP(sort, nthread); + PRINT_TP(sort_frag, nthread); + PRINT_TP(sort_pair, nthread); fprintf(stderr, "\n"); #endif diff --git a/src/util/profiling.h b/src/util/profiling.h index ff5c417..fc844e9 100644 --- a/src/util/profiling.h +++ b/src/util/profiling.h @@ -43,11 +43,19 @@ enum { GP_sort, GP_markdup, GP_intersect, - GP_merge_result + GP_merge_result, + GP_markdup_pair, + GP_markdup_frag, + GP_sort_pair, + GP_sort_frag, + GP_merge_match, + GP_merge_markdup, + GP_merge_update, + GP_merge_add }; // THREAD enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 }; -enum { TP_gen = 11, TP_sort }; +enum { TP_gen = 11, TP_sort, TP_sort_frag, TP_sort_pair}; uint64_t RealtimeMsec(void);