From 01d14d539f04952652f84d5da1c8afb5e94e81f1 Mon Sep 17 00:00:00 2001 From: zzh Date: Mon, 16 Dec 2024 02:55:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E4=BA=86=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E8=B0=83=E8=AF=95=E4=BF=A1=E6=81=AF=EF=BC=8C=E6=8A=8Acout?= =?UTF-8?q?=E7=AD=89=E8=BE=93=E5=87=BA=E9=83=BD=E6=8D=A2=E6=88=90=E4=BA=86?= =?UTF-8?q?spdlog=EF=BC=8C=E5=8F=AF=E4=BB=A5=E4=BD=9C=E4=B8=BA=E5=BC=80?= =?UTF-8?q?=E6=BA=90=E4=BB=A3=E7=A0=81=E5=8F=91=E5=B8=83=E4=BA=86=EF=BC=8C?= =?UTF-8?q?README=E8=BF=98=E9=9C=80=E8=A6=81=E8=AF=A6=E7=BB=86=E5=86=99?= =?UTF-8?q?=E4=B8=80=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 - README.md | 11 +- metrics.txt | 111 ++++++++++++++ src/main.cpp | 19 ++- src/markdup/markdup.cpp | 282 +++++++++++++++++++++++++++++++++++- src/markdup/md_args.h | 13 +- src/markdup/pipeline_md.cpp | 94 +----------- src/markdup/read_ends.h | 1 + src/util/profiling.cpp | 3 + src/util/profiling.h | 11 +- 10 files changed, 434 insertions(+), 112 deletions(-) create mode 100644 metrics.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c5e3ab..25ae2fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,5 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_BUILD_TYPE Debug) # set(CMAKE_BUILD_TYPE Release) -# add_definitions(-DSHOW_PERF) add_definitions(-DSHOW_PERF=1) add_subdirectory(src) \ No newline at end of file diff --git a/README.md b/README.md index 2d60ace..0095767 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ -# markdups +# FastDup -mark duplicate,去除冗余 \ No newline at end of file +Identifies duplicate reads. This tool locates and tags duplicate reads in a coordinate ordered SAM or BAM file. + +Use the same algorithm as picard MarkDuplicates and output identical results. +Use spdlog as log tool and the default level is 'info'. + +### Features + +* Fast - \ No newline at end of file diff --git a/metrics.txt b/metrics.txt new file mode 100644 index 0000000..bb5ff03 --- /dev/null +++ b/metrics.txt @@ -0,0 +1,111 @@ +## StringHeader +# /home/zzh/work/ngs/FastDup/build/bin/fastdup --input /home/zzh/data/bam/normal_all.sam --output /home/zzh/data1/out.sam --metrics ./metrics.txt --num-threads 1 --create-index --index-format CSI --tag-duplicate-set-members +## StringHeader +# Started on: December 16, 2024 at 02:43:41 AM CST + +## METRICS +LIBRARY UNPAIRED_READS_EXAMINED READ_PAIRS_EXAMINED SECONDARY_OR_SUPPLEMENTARY_RDS UNMAPPED_READS UNPAIRED_READ_DUPLICATES READ_PAIR_DUPLICATES READ_PAIR_OPTICAL_DUPLICATES PERCENT_DUPLICATION ESTIMATED_LIBRARY_SIZE +normal 108919 66154888 117508 161395 67414 15058359 876584 0.227945 127582703 + +## HISTOGRAM Double +BIN CoverageMult +1 1.010249 +2 1.611750 +3 1.969882 +4 2.183113 +5 2.310070 +6 2.385660 +7 2.430666 +8 2.457463 +9 2.473417 +10 2.482917 +11 2.488573 +12 2.491940 +13 2.493945 +14 2.495139 +15 2.495850 +16 2.496273 +17 2.496525 +18 2.496675 +19 2.496764 +20 2.496817 +21 2.496849 +22 2.496868 +23 2.496879 +24 2.496886 +25 2.496890 +26 2.496892 +27 2.496894 +28 2.496894 +29 2.496895 +30 2.496895 +31 2.496895 +32 2.496896 +33 2.496896 +34 2.496896 +35 2.496896 +36 2.496896 +37 2.496896 +38 2.496896 +39 2.496896 +40 2.496896 +41 2.496896 +42 2.496896 +43 2.496896 +44 2.496896 +45 2.496896 +46 2.496896 +47 2.496896 +48 2.496896 +49 2.496896 +50 2.496896 +51 2.496896 +52 2.496896 +53 2.496896 +54 2.496896 +55 2.496896 +56 2.496896 +57 2.496896 +58 2.496896 +59 2.496896 +60 2.496896 +61 2.496896 +62 2.496896 +63 2.496896 +64 2.496896 +65 2.496896 +66 2.496896 +67 2.496896 +68 2.496896 +69 2.496896 +70 2.496896 +71 2.496896 +72 2.496896 +73 2.496896 +74 2.496896 +75 2.496896 +76 2.496896 +77 2.496896 +78 2.496896 +79 2.496896 +80 2.496896 +81 2.496896 +82 2.496896 +83 2.496896 +84 2.496896 +85 2.496896 +86 2.496896 +87 2.496896 +88 2.496896 +89 2.496896 +90 2.496896 +91 2.496896 +92 2.496896 +93 2.496896 +94 2.496896 +95 2.496896 +96 2.496896 +97 2.496896 +98 2.496896 +99 2.496896 +100 2.496896 diff --git a/src/main.cpp b/src/main.cpp index 964bb6a..38bbda4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -8,9 +9,9 @@ #include #include +#include "fastdup_version.h" #include "markdup/md_args.h" #include "util/profiling.h" -#include "fastdup_version.h" namespace nsgv { extern MarkDupsArg gMdArg; @@ -18,6 +19,16 @@ extern MarkDupsArg gMdArg; int MarkDuplicates(); +string get_current_time_str() { + time_t time_val; + struct tm *at; + char now[80]; + time(&time_val); + at = localtime(&time_val); + strftime(now, 79, "%B %d, %Y at %I:%M:%S %p %Z", at); + return string(now); +} + int main(int argc, char *argv[]) { // init log spdlog::set_default_logger(spdlog::stderr_color_st("fastdup")); @@ -159,6 +170,12 @@ int main(int argc, char *argv[]) { // std::cout << program << std::endl; + nsgv::gMdArg.START_TIME = get_current_time_str(); + nsgv::gMdArg.CLI_STR = argv[0]; + for (int i = 1; i < argc; ++i) { + nsgv::gMdArg.CLI_STR += " " + std::string(argv[i]); + } + try { program.parse_args(argc, argv); nsgv::gMdArg.INPUT_FILE = program.get("--input"); diff --git a/src/markdup/markdup.cpp b/src/markdup/markdup.cpp index 3e7be67..37067a6 100644 --- a/src/markdup/markdup.cpp +++ b/src/markdup/markdup.cpp @@ -11,6 +11,7 @@ Date : 2023/10/23 #include #include +#include #include #include "dup_metrics.h" @@ -36,6 +37,13 @@ PipelineArg gPipe; }; +// 字节缓冲区 +struct ByteBuf { + uint8_t *buf = nullptr; + int size = 0; // 当前使用量 + int capacity = 0; // 最大容量 +}; + /* * 获取文件名后缀 */ @@ -89,25 +97,283 @@ int MarkDuplicates() { #endif /* 冗余检查和标记 */ + PROF_START(markdup_all); pipelineMarkDups(); + PROF_END(gprof[GP_markdup_all], markdup_all); - /* 初始化输出文件 */ + /* 标记冗余, 将处理后的结果写入文件 */ char modeout[12] = "wb"; sam_open_mode(modeout + 1, nsgv::gMdArg.OUTPUT_FILE.c_str(), NULL); nsgv::gOutBamFp = sam_open(nsgv::gMdArg.OUTPUT_FILE.c_str(), modeout); + if (!nsgv::gOutBamFp) { + spdlog::error("[{}] create output sam/bam file failed.\n", __func__); + return -1; + } nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader); // 修改输出文件的header - // sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION, - // "CL", - // (nsgv::gMdArg.PROGRAM_RECORD_ID + " " + nsgv::gMdArg.GetArgValueString() + " " + - // nsgv::gMdArg.GetArgValueString()) - // .c_str(), - // NULL); + sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION, + "CL", nsgv::gMdArg.CLI_STR.c_str(), NULL); - // 用同样的线程池处理输出文件 + // 用同样的线程数量处理输出文件 hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); + sam_close(nsgv::gInBamFp); // 重新打开bam文件 + nsgv::gInBamFp = sam_open_format(nsgv::gMdArg.INPUT_FILE.c_str(), "r", nullptr); + if (!nsgv::gInBamFp) { + spdlog::error("[{}] load sam/bam file failed.\n", __func__); + return -1; + } + hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); + nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); + if (sam_hdr_write(nsgv::gOutBamFp, nsgv::gOutBamHeader) != 0) { + spdlog::error("failed writing header to \"{}\"", nsgv::gMdArg.OUTPUT_FILE); + sam_close(nsgv::gOutBamFp); + sam_close(nsgv::gInBamFp); + return -1; + } + // 输出index文件 + string indexFn = nsgv::gMdArg.OUTPUT_FILE + ".bai"; // min_shift = 0 是bai格式 + if ("sam" == getFileExtension(nsgv::gMdArg.OUTPUT_FILE) || !nsgv::gMdArg.CREATE_INDEX) { + indexFn = ""; + } + if (!indexFn.empty()) { + int index_min_shift = 0; + if (nsgv::gMdArg.INDEX_FORMAT == nsmd::IndexFormat::CSI) { + indexFn = nsgv::gMdArg.OUTPUT_FILE + ".csi"; + index_min_shift = 14; + } + if (sam_idx_init(nsgv::gOutBamFp, nsgv::gOutBamHeader, 0 /*csi 14*/, indexFn.c_str()) < 0) { + spdlog::error("failed to open index \"{}\" for writing", indexFn); + sam_close(nsgv::gOutBamFp); + sam_close(nsgv::gInBamFp); + return -1; + } + } + // 读取输入文件,标记冗余并输出 + BamBufType inBuf(nsgv::gMdArg.DUPLEX_IO); + inBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); + + DupIdxQueue dupIdxQue, repIdxQue; + DupIdxQueue opticalIdxQue; + dupIdxQue.Init(&nsgv::gPipe.intersectData.dupIdxArr); + repIdxQue.Init(&nsgv::gPipe.intersectData.repIdxArr); + opticalIdxQue.Init(&nsgv::gPipe.intersectData.opticalDupIdxArr); + spdlog::info("{} duplicate reads found", dupIdxQue.Size()); + + uint64_t bamIdx = 0; + DupInfo dupIdx = dupIdxQue.Pop(); + DupInfo repIdx = repIdxQue.Pop(); + uint64_t opticalIdx = opticalIdxQue.Pop(); + + ByteBuf bytes; + bam1_t *bp = bam_init1(); + bool isDup = false; + bool isOpticalDup = false; + bool isInDuplicateSet = false; + uint32_t representativeReadIndexInFile = 0; + uint32_t duplicateSetSize = 0; + + int64_t realDupSize = 0; + + // exit(0); + PROF_START(write); + while (inBuf.ReadStat() >= 0) { + PROF_START(final_read); + size_t readNum = inBuf.ReadBam(); + PROF_END(gprof[GP_final_read], final_read); + PROF_PRINT_START(read_write); + for (size_t i = 0; i < readNum; ++i) { + BamWrap *bw = inBuf[i]; + if (bam_copy1(bp, bw->b) == nullptr) { + spdlog::error("Can not copy sam record!"); + return -1; + } + bw->b = bp; + isDup = false; + isOpticalDup = false; + isInDuplicateSet = false; + + // 删除原来的duplicate tag + if (nsgv::gMdArg.CLEAR_DT) { + uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str()); + if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); + } + + // 统计信息 + if (bw->GetReadUnmappedFlag()) { + ++nsgv::gMetrics.UNMAPPED_READS; + } else if (bw->IsSecondaryOrSupplementary()) { + ++nsgv::gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS; + } else if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) { + ++nsgv::gMetrics.UNPAIRED_READS_EXAMINED; + } else { + ++nsgv::gMetrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end + } + + /* 判断是否冗余 */ + if (bamIdx == dupIdx) { + ++realDupSize; // for test + isDup = true; + if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { + isInDuplicateSet = true; + representativeReadIndexInFile = dupIdx.repIdx; + duplicateSetSize = dupIdx.dupSet; + } + + // 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同 + while ((dupIdx = dupIdxQue.Pop()) == bamIdx); + if (opticalIdx == bamIdx) + isOpticalDup = true; + else if (opticalIdx < bamIdx) { + while ((opticalIdx = opticalIdxQue.Pop()) < bamIdx); + if (opticalIdx == bamIdx) + isOpticalDup = true; + } + + // 添加冗余标识 + bw->SetDuplicateReadFlag(true); + + uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str()); + if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); + + if (isOpticalDup) + bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z', + nsgv::gMdArg.DUPLICATE_TYPE_SEQUENCING.size() + 1, + (const uint8_t *)nsgv::gMdArg.DUPLICATE_TYPE_SEQUENCING.c_str()); + else + bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z', + nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.size() + 1, + (const uint8_t *)nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.c_str()); + + // 计算统计信息 + if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { + // Update the duplication metrics + if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) { + ++nsgv::gMetrics.UNPAIRED_READ_DUPLICATES; + } else { + ++nsgv::gMetrics.READ_PAIR_DUPLICATES; // will need to be divided by 2 at the end + } + } + } else { + bw->SetDuplicateReadFlag(false); + } + if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent + isInDuplicateSet = true; + representativeReadIndexInFile = repIdx.repIdx; + duplicateSetSize = repIdx.dupSet; + while (repIdx == bamIdx || repIdx.dupSet == 0) { + if (repIdxQue.Size() > 0) + repIdx = repIdxQue.Pop(); + else { + repIdx = -1; + break; + } + } + } + + if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) { + if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { + // cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; + uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str()); + if (oldTagVal != NULL) + bam_aux_del(bw->b, oldTagVal); + bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i', + sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile); + oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_SIZE_TAG.c_str()); + if (oldTagVal != NULL) + bam_aux_del(bw->b, oldTagVal); + bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize), + (const uint8_t *)&duplicateSetSize); + } + } + // 每个read都要写到output,只是添加标识,或者删掉这些冗余record + ++bamIdx; + if (isDup && nsgv::gMdArg.REMOVE_DUPLICATES) { + continue; + } + if (isOpticalDup && nsgv::gMdArg.REMOVE_SEQUENCING_DUPLICATES) { + continue; + } + if (!nsgv::gMdArg.PROGRAM_RECORD_ID.empty() && nsgv::gMdArg.ADD_PG_TAG_TO_READS) { + uint8_t *oldTagVal = bam_aux_get(bw->b, "PG"); + if (oldTagVal != NULL) + bam_aux_del(bw->b, oldTagVal); + bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1, + (const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str()); + } +#if 0 + if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) { + spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str()); + sam_close(nsgv::gOutBamFp); + sam_close(nsgv::gInBamFp); + return -1; + } +#endif + } + inBuf.ClearAll(); + PROF_PRINT_END(read_write); + spdlog::info("write {} reads to output", readNum); + } + bam_destroy1(bp); + + // 计算统计信息 + nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2; + nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2; + for (auto &arr : nsgv::gPipe.intersectData.opticalDupIdxArr) + nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); + nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES = nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2; + nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE = + estimateLibrarySize(nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES, + nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_DUPLICATES); + if (nsgv::gMetrics.UNPAIRED_READS_EXAMINED + nsgv::gMetrics.READ_PAIRS_EXAMINED != 0) { + nsgv::gMetrics.PERCENT_DUPLICATION = + (nsgv::gMetrics.UNPAIRED_READ_DUPLICATES + nsgv::gMetrics.READ_PAIR_DUPLICATES * 2) / + (double)(nsgv::gMetrics.UNPAIRED_READS_EXAMINED + nsgv::gMetrics.READ_PAIRS_EXAMINED * 2); + } else { + nsgv::gMetrics.PERCENT_DUPLICATION = 0; + } + calculateRoiHistogram(nsgv::gMetrics); + + // 将统计信息写到文件里 + if (!nsgv::gMdArg.METRICS_FILE.empty()) { + ofstream ofsM(nsgv::gMdArg.METRICS_FILE); + ofsM << "## StringHeader" << endl; + ofsM << "# " << nsgv::gMdArg.CLI_STR << endl; + ofsM << "## StringHeader" << endl; + ofsM << "# Started on: " << nsgv::gMdArg.START_TIME << endl << endl; + ofsM << "## METRICS" + << endl; + ofsM << "LIBRARY\tUNPAIRED_READS_EXAMINED\tREAD_PAIRS_EXAMINED\tSECONDARY_OR_SUPPLEMENTARY_RDS\tUNMAPPED_" + "READS\tUNPAIRED_READ_DUPLICATES\tREAD_PAIR_DUPLICATES\tREAD_PAIR_OPTICAL_DUPLICATES\tPERCENT_" + "DUPLICATION\tESTIMATED_LIBRARY_SIZE" + << endl; + ofsM << nsgv::gMetrics.LIBRARY << "\t" << nsgv::gMetrics.UNPAIRED_READS_EXAMINED << "\t" << nsgv::gMetrics.READ_PAIRS_EXAMINED + << "\t" << nsgv::gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS << "\t" << nsgv::gMetrics.UNMAPPED_READS << "\t" + << nsgv::gMetrics.UNPAIRED_READ_DUPLICATES << "\t" << nsgv::gMetrics.READ_PAIR_DUPLICATES << "\t" + << nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES << "\t" << nsgv::gMetrics.PERCENT_DUPLICATION << "\t" + << nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE << endl + << endl; + ofsM << "## HISTOGRAM\tDouble" << endl; + ofsM << "BIN CoverageMult" << endl; + for (int i = 1; i <= 100; ++i) { + ofsM << i << "\t" << std::fixed << std::setprecision(6) << nsgv::gMetrics.CoverageMult[i] << endl; + } + ofsM.close(); + } + PROF_END(gprof[GP_write], write); + + if (!indexFn.empty() && sam_idx_save(nsgv::gOutBamFp) < 0) { + spdlog::error("writing index failed"); + sam_close(nsgv::gOutBamFp); + sam_close(nsgv::gInBamFp); + return -1; + } + + /* 关闭文件,收尾清理 */ + sam_close(nsgv::gOutBamFp); + sam_close(nsgv::gInBamFp); return 0; } \ No newline at end of file diff --git a/src/markdup/md_args.h b/src/markdup/md_args.h index 007dbc1..d629eed 100644 --- a/src/markdup/md_args.h +++ b/src/markdup/md_args.h @@ -256,14 +256,9 @@ struct MarkDupsArg { /* Add PG tag to each read in a SAM or BAM (PGTagArgumentCollection)*/ bool ADD_PG_TAG_TO_READS = true; - // 解析参数 - void ParseArgument(int argc, char **argv); - - void PrintArgValue(); - string GetArgValueString(); - - static void PrintHelp(); - - static void PrintVersion(); + // 命令行字符串 + string CLI_STR; + // 开始运行时间 + string START_TIME; }; \ No newline at end of file diff --git a/src/markdup/pipeline_md.cpp b/src/markdup/pipeline_md.cpp index 38b705a..4c2424c 100644 --- a/src/markdup/pipeline_md.cpp +++ b/src/markdup/pipeline_md.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -16,7 +17,6 @@ #include "util/profiling.h" #include "util/yarn.h" -using std::cout; using std::vector; namespace nsgv { @@ -247,7 +247,6 @@ static void markdups(MarkDupDataArg *arg) { static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, bool isPairCmp = false) { if (leftArr.empty() || rightArr.empty()) { - // cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl; return; } const size_t leftEndIdx = leftArr.size() - 1; @@ -782,7 +781,7 @@ static void *pipeRead(void *data) { yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 break; } - cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl; + spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); pipeArg.readData.bams = inBamBuf.GetBamArr(); @@ -794,7 +793,7 @@ static void *pipeRead(void *data) { pipeArg.readOrder += 1; // for next yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 } - cout << "total reads: " << readNumSum << endl; + spdlog::info("total reads processed {}", readNumSum); return 0; } static void *pipeGenRE(void *data) { @@ -825,7 +824,6 @@ static void *pipeGenRE(void *data) { break; } /* 处理bam,生成readends */ - // cout << "genRE order: " << pipeArg.genREOrder << "\t" << pipeArg.readData.bamStartIdx << endl; PROF_START(gen); doGenRE(pipeArg); // usleep(200000); @@ -836,7 +834,6 @@ static void *pipeGenRE(void *data) { yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据 } - cout << "pipe gen reads" << endl; return 0; } static void *pipeSort(void *data) { @@ -867,7 +864,6 @@ static void *pipeSort(void *data) { break; } /* 排序 readends */ - // cout << "sort order: " << pipeArg.sortOrder << endl; PROF_START(sort); doSort(pipeArg); PROF_END(gprof[GP_sort], sort); @@ -879,7 +875,6 @@ static void *pipeSort(void *data) { pipeArg.sortOrder += 1; yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); } - cout << "end pipe sort" << endl; return 0; } static void *pipeMarkDup(void *data) { @@ -910,7 +905,6 @@ static void *pipeMarkDup(void *data) { break; } /* 冗余检测 readends */ - // cout << "markdup order: " << pipeArg.markDupOrder << endl; PROF_START(markdup); doMarkDup(pipeArg); PROF_END(gprof[GP_markdup], markdup); @@ -921,7 +915,6 @@ static void *pipeMarkDup(void *data) { pipeArg.markDupOrder += 1; yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); } - cout << "end pipe markdup" << endl; return 0; } static void *pipeIntersect(void *data) { @@ -953,7 +946,6 @@ static void *pipeIntersect(void *data) { pipeArg.intersectOrder += 1; } - cout << "end pipe intersect" << endl; return 0; } @@ -1060,56 +1052,6 @@ static void parallelPipeline() { PROF_START(merge_result); mergeAllTask(pipeArg); PROF_END(gprof[GP_merge_result], merge_result); - - int64_t dupNum = 0; - int64_t opticalDupNum = 0; - for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size(); - for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size(); - - map dup; -#if 0 - int taskSeq = 0; - for (auto &arr : pipeArg.intersectData.dupIdxArr) { - for (auto idx : arr) { - if (dup.find(idx.idx) != dup.end()) { - //if (taskSeq - 1 > dup[idx]) - cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl; - } - dup[idx.idx] = taskSeq; - } - // cout << taskSeq << "\t" << arr.size() << endl; - taskSeq++; - } -#endif - - cout << "Final read order: " << pipeArg.readOrder << endl; - cout << "Final gen order: " << pipeArg.genREOrder << endl; - cout << "Final sort order: " << pipeArg.sortOrder << endl; - cout << "Final mark order: " << pipeArg.markDupOrder << endl; - cout << "Final inter order: " << pipeArg.intersectOrder << endl; - -// cout << "w read time : " << tm_arr[10].acc_seconds_elapsed() << endl; -// cout << "w gen time : " << tm_arr[11].acc_seconds_elapsed() << endl; -// cout << "w sort time : " << tm_arr[12].acc_seconds_elapsed() << endl; -// cout << "w markdup time : " << tm_arr[13].acc_seconds_elapsed() << endl; -// cout << "w intersect time: " << tm_arr[14].acc_seconds_elapsed() << endl; -// -// cout << "w1 gen time : " << tm_arr[21].acc_seconds_elapsed() << endl; -// cout << "w1 sort time : " << tm_arr[22].acc_seconds_elapsed() << endl; -// cout << "w1 markdup time : " << tm_arr[23].acc_seconds_elapsed() << endl; -// -// cout << "read time : " << tm_arr[0].acc_seconds_elapsed() << endl; -// cout << "gen time : " << tm_arr[1].acc_seconds_elapsed() << endl; -// cout << "sort time : " << tm_arr[2].acc_seconds_elapsed() << endl; -// cout << "markdup time : " << tm_arr[3].acc_seconds_elapsed() << endl; -// cout << "intersect time: " << tm_arr[4].acc_seconds_elapsed() << endl; -// -// cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl; -// cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl; -// - cout << "dup num : " << dupNum << "\t" << dup.size() << endl; - cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; - } /* 并行流水线方式处理数据,标记冗余 */ @@ -1133,7 +1075,7 @@ void pipelineMarkDups() { if (readNum < 1) { break; } - cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl; + spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); pipeArg.readData.bams = inBamBuf.GetBamArr(); pipeArg.readData.bamStartIdx = readNumSum; @@ -1158,32 +1100,4 @@ void pipelineMarkDups() { pipeArg.readOrder += 1; // for next } mergeAllTask(pipeArg); - - int64_t dupNum = 0; - int64_t opticalDupNum = 0; - for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size(); - for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size(); - - map dup; -#if 0 - int taskSeq = 0; - for (auto &arr : pipeArg.intersectData.dupIdxArr) { - for (auto idx : arr) { - if (dup.find(idx.idx) != dup.end()) { - cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl; - } - dup[idx.idx] = taskSeq; - } - taskSeq++; - } -#endif - - cout << "total reads: " << readNumSum << endl; - cout << "Final read order: " << pipeArg.readOrder << endl; - cout << "Final gen order: " << pipeArg.genREOrder << endl; - cout << "Final sort order: " << pipeArg.sortOrder << endl; - cout << "Final mark order: " << pipeArg.markDupOrder << endl; - cout << "Final inter order: " << pipeArg.intersectOrder << endl; - cout << "dup num : " << dupNum << "\t" << dup.size() << endl; - cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; } \ No newline at end of file diff --git a/src/markdup/read_ends.h b/src/markdup/read_ends.h index 4bcf89c..a1f68d4 100644 --- a/src/markdup/read_ends.h +++ b/src/markdup/read_ends.h @@ -32,6 +32,7 @@ struct PhysicalLocation { int16_t tile = -1; // int32_t x = -1; // int32_t y = -1; + // This is a bug in picard Markduplicates, because some tile coordinates exceede the range of int16_t int16_t x = -1; int16_t y = -1; }; diff --git a/src/util/profiling.cpp b/src/util/profiling.cpp index eee577f..7173483 100644 --- a/src/util/profiling.cpp +++ b/src/util/profiling.cpp @@ -68,6 +68,9 @@ int DisplayProfiling(int nthread) { PRINT_GP(merge_markdup); PRINT_GP(merge_update); PRINT_GP(merge_add); + PRINT_GP(markdup_all); + PRINT_GP(final_read); + PRINT_GP(write); PRINT_TP(gen, nthread); PRINT_TP(sort_frag, nthread); diff --git a/src/util/profiling.h b/src/util/profiling.h index fc844e9..d4bddd6 100644 --- a/src/util/profiling.h +++ b/src/util/profiling.h @@ -25,9 +25,15 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE]; #define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec() #define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec() #define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time +#define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec() +#define PROF_PRINT_END(tmp_time) \ + tmp_time = RealtimeMsec() - tmp_time; \ + fprintf(stderr, "time %-15s: %0.2lfs\n", #tmp_time, tmp_time * 1.0 / proc_freq) #else #define PROF_START(tmp_time) #define PROF_END(result, tmp_time) +#define PROF_PRINT_START(tmp_time) +#define PROF_PRINT_END(tmp_time) #endif // GLOBAL @@ -51,7 +57,10 @@ enum { GP_merge_match, GP_merge_markdup, GP_merge_update, - GP_merge_add + GP_merge_add, + GP_markdup_all, + GP_final_read, + GP_write }; // THREAD enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 };