/* Description: 标记bam文件中的冗余信息,只处理按照坐标排序后的bam,且bam为单一样本数据 Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "dup_metrics.h" #include "markdups_arg.h" #include "md_funcs.h" #include "parallel_md.h" #include "pipeline_md.h" #include "serial_md.h" #include "shared_args.h" using namespace std; using std::cout; using std::string; #define SMA_TAG_PG "PG" #define BAM_BLOCK_SIZE 32L * 1024 * 1024 #define NO_SUCH_INDEX INT64_MAX Timer tm_arr[50]; // 用来测试性能 /* 全局本地变量 */ vector g_vRnParser; // 每个线程一个read name parser samFile *g_inBamFp; // 输入的bam文件 sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息 samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 sam_hdr_t *g_outBamHeader; // 输出文件的header /* 参数对象作为全局对象,免得多次作为参数传入函数中 */ GlobalArg &g_gArg = GlobalArg::Instance(); static MarkDupsArg g_mdArg_; MarkDupsArg &g_mdArg = g_mdArg_; static GlobalDataArg gData_; GlobalDataArg &gData = gData_; DuplicationMetrics gMetrics_; DuplicationMetrics &gMetrics = gMetrics_; // 记录统计信息 /* 程序版本等信息 */ const char *PROGRAM_GROUP_VERSION = "3.2.0"; // 调试信息 int zzhtestnum = 0; int64_t bam_num1=0, bam_num2=0; /** * Estimates the size of a library based on the number of paired end molecules observed * and the number of unique pairs observed. *

* Based on the Lander-Waterman equation that states: * C/X = 1 - exp( -N/X ) * where * X = number of distinct molecules in library * N = number of read pairs * C = number of distinct fragments observed in read pairs */ static int64_t estimateLibrarySize(int64_t readPairs, int64_t uniqueReadPairs) { int64_t librarySize = 0; int64_t readPairDuplicates = readPairs - uniqueReadPairs; auto f = [](double x, double c, double n) { return c / x - 1 + exp(-n / x); }; if (readPairs > 0 && readPairDuplicates > 0) { double m = 1.0; double M = 100.0; if (uniqueReadPairs >= readPairs || f(m * uniqueReadPairs, uniqueReadPairs, readPairs) < 0) { cerr << "Invalid values for pairs and unique pairs: " << readPairs << ", " << uniqueReadPairs << endl; return 0; } // find value of M, large enough to act as other side for bisection method while (f(M * uniqueReadPairs, uniqueReadPairs, readPairs) > 0) { M *= 10.0; } // use bisection method (no more than 40 times) to find solution for (int i = 0; i < 40; ++i) { double r = (m + M) / 2.0; double u = f(r * uniqueReadPairs, uniqueReadPairs, readPairs); if (u == 0) break; else if (u > 0) m = r; else if (u < 0) M = r; } return uniqueReadPairs * (m + M) / 2.0; } return librarySize; } /* * 获取文件名后缀 */ string getFileExtension(const string &filename) { auto last_dot = filename.find_last_of('.'); if (last_dot == string::npos) { return ""; } return filename.substr(last_dot + 1); } /* * mark duplicate * 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 */ int MarkDuplicates(int argc, char *argv[]) { Timer::log_time("程序开始"); Timer time_all; /* 读取命令行参数 */ g_mdArg.parseArgument(argc, argv, &g_gArg); // 解析命令行参数 if (g_gArg.num_threads < 1) g_gArg.num_threads = 1; // 线程数不能小于1 /* 初始化一些参数和变量*/ g_vRnParser.resize(g_gArg.num_threads); for (auto &parser : g_vRnParser) parser.SetReadNameRegex(g_mdArg.READ_NAME_REGEX); // 用来解析read name中的tile,x,y信息 /* 打开输入bam文件 */ g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); if (!g_inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header // 获取样本名称(libraryId) gMetrics.LIBRARY = sam_hdr_line_name(g_inBamHeader, "RG", 0); /* 利用线程池对输入输出文件进行读写 */ htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 // htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); // htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); htsPoolRead.pool = hts_tpool_init(16); htsPoolWrite.pool = hts_tpool_init(32); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); sam_close(g_inBamFp); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); /* 初始化输出文件 */ char modeout[12] = "wb"; sam_open_mode(modeout + 1, g_gArg.out_fn.c_str(), NULL); g_outBamFp = sam_open(g_gArg.out_fn.c_str(), modeout); g_outBamHeader = sam_hdr_dup(g_inBamHeader); // 修改输出文件的header sam_hdr_add_line(g_outBamHeader, "PG", "ID", g_mdArg.PROGRAM_RECORD_ID.c_str(), "VN", PROGRAM_GROUP_VERSION, "CL", (g_mdArg.PROGRAM_RECORD_ID + " " + g_gArg.GetArgValueString() + " " + g_mdArg.GetArgValueString()).c_str(), NULL); // 用同样的线程池处理输出文件 hts_set_opt(g_outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); /* 冗余检查和标记 */ // if (g_gArg.num_threads == 1) { // // serialMarkDups(); // 串行运行 // parallelMarkDups(); // 并行运行 // } else { // parallelMarkDups(); // 并行运行 // } pipelineMarkDups(); //parallelMarkDups(); /* 标记冗余, 将处理后的结果写入文件 */ sam_close(g_inBamFp); // 重新打开bam文件 g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); if (!g_inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header if (sam_hdr_write(g_outBamFp, g_outBamHeader) != 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } // 输出index文件 // string indexFn = g_gArg.out_fn + ".csi"; // 现在索引都是csi格式的 string indexFn = g_gArg.out_fn + ".bai"; // min_shift = 0 是bai格式 if ("sam" == getFileExtension(g_gArg.out_fn)) { indexFn = ""; } if (!indexFn.empty()) { int index_min_shift = 0; if (g_mdArg.INDEX_FORMAT == ns_md::IndexFormat::CSI) { indexFn = g_gArg.out_fn + ".csi"; index_min_shift = 14; } if (sam_idx_init(g_outBamFp, g_outBamHeader, 0 /*csi 14*/, indexFn.c_str()) < 0) { Error("failed to open index \"%s\" for writing", indexFn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } } // 读取输入文件 // BamBufType inBuf(false); BamBufType inBuf(g_gArg.use_asyncio); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); DupIdxQueue dupIdxQue, repIdxQue; DupIdxQueue opticalIdxQue; dupIdxQue.Init(&gData.dupIdxArr); repIdxQue.Init(&gData.repIdxArr); opticalIdxQue.Init(&gData.opticalDupIdxArr); Timer tw; cout << "dupsize: " << dupIdxQue.Size() << endl; uint64_t bamIdx = 0; DupInfo dupIdx = dupIdxQue.Pop(); DupInfo repIdx = repIdxQue.Pop(); uint64_t opticalIdx = opticalIdxQue.Pop(); ByteBuf bytes; bam1_t *bp = bam_init1(); bool isDup = false; bool isOpticalDup = false; bool isInDuplicateSet = false; uint32_t representativeReadIndexInFile = 0; uint32_t duplicateSetSize = 0; int64_t realDupSize = 0; exit(0); while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); for (size_t i = 0; i < inBuf.Size(); ++i) { BamWrap *bw = inBuf[i]; if (bam_copy1(bp, bw->b) == nullptr) { Error("Can not copy sam record!"); return -1; } bw->b = bp; isDup = false; isOpticalDup = false; isInDuplicateSet = false; // 删除原来的duplicate tag if (g_mdArg.CLEAR_DT) { uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); } // 统计信息 if (bw->GetReadUnmappedFlag()) { ++gMetrics.UNMAPPED_READS; } else if (bw->IsSecondaryOrSupplementary()) { ++gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS; } else if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) { ++gMetrics.UNPAIRED_READS_EXAMINED; } else { ++gMetrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end } /* 判断是否冗余 */ if (bamIdx == dupIdx) { // cerr << bamIdx << endl; ++realDupSize; // for test isDup = true; if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { isInDuplicateSet = true; representativeReadIndexInFile = dupIdx.repIdx; duplicateSetSize = dupIdx.dupSet; } // 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同 while ((dupIdx = dupIdxQue.Pop()) == bamIdx); if (opticalIdx == bamIdx) isOpticalDup = true; else if (opticalIdx < bamIdx) { while ((opticalIdx = opticalIdxQue.Pop()) < bamIdx); if (opticalIdx == bamIdx) isOpticalDup = true; } // 添加冗余标识 bw->SetDuplicateReadFlag(true); uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); if (isOpticalDup) bam_aux_append(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z', g_mdArg.DUPLICATE_TYPE_SEQUENCING.size() + 1, (const uint8_t *)g_mdArg.DUPLICATE_TYPE_SEQUENCING.c_str()); else bam_aux_append(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z', g_mdArg.DUPLICATE_TYPE_LIBRARY.size() + 1, (const uint8_t *)g_mdArg.DUPLICATE_TYPE_LIBRARY.c_str()); // 计算统计信息 if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { // Update the duplication metrics if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) { ++gMetrics.UNPAIRED_READ_DUPLICATES; } else { ++gMetrics.READ_PAIR_DUPLICATES; // will need to be divided by 2 at the end } } } else { bw->SetDuplicateReadFlag(false); } if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent isInDuplicateSet = true; representativeReadIndexInFile = repIdx.repIdx; duplicateSetSize = repIdx.dupSet; while (repIdx == bamIdx || repIdx.dupSet == 0) { if (repIdxQue.Size() > 0) repIdx = repIdxQue.Pop(); else { repIdx = -1; break; } } } if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) { // cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i', sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile); oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize), (const uint8_t *)&duplicateSetSize); } // 每个read都要写到output,只是添加标识,或者删掉这些冗余record ++bamIdx; if (isDup && g_mdArg.REMOVE_DUPLICATES) { continue; } if (isOpticalDup && g_mdArg.REMOVE_SEQUENCING_DUPLICATES) { continue; } if (!g_mdArg.PROGRAM_RECORD_ID.empty() && g_mdArg.ADD_PG_TAG_TO_READS) { uint8_t *oldTagVal = bam_aux_get(bw->b, "PG"); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); bam_aux_append(bw->b, "PG", 'Z', g_mdArg.PROGRAM_RECORD_ID.size() + 1, (const uint8_t *)g_mdArg.PROGRAM_RECORD_ID.c_str()); } if (sam_write1(g_outBamFp, g_outBamHeader, bw->b) < 0) { Error("failed writing sam record to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } } inBuf.ClearAll(); cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl; } bam_destroy1(bp); cout << "Final dup size: " << realDupSize << "\t" << dupIdxQue.Size() << endl; // 计算统计信息 gMetrics.READ_PAIRS_EXAMINED /= 2; gMetrics.READ_PAIR_DUPLICATES /= 2; for (auto &arr : gData.opticalDupIdxArr) gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); gMetrics.READ_PAIR_OPTICAL_DUPLICATES = gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2; gMetrics.ESTIMATED_LIBRARY_SIZE = estimateLibrarySize(gMetrics.READ_PAIRS_EXAMINED - gMetrics.READ_PAIR_OPTICAL_DUPLICATES, gMetrics.READ_PAIRS_EXAMINED - gMetrics.READ_PAIR_DUPLICATES); if (gMetrics.UNPAIRED_READS_EXAMINED + gMetrics.READ_PAIRS_EXAMINED != 0) { gMetrics.PERCENT_DUPLICATION = (gMetrics.UNPAIRED_READ_DUPLICATES + gMetrics.READ_PAIR_DUPLICATES * 2) / (double)(gMetrics.UNPAIRED_READS_EXAMINED + gMetrics.READ_PAIRS_EXAMINED * 2); } else { gMetrics.PERCENT_DUPLICATION = 0; } cout << "metrics: \n" << "LIBRARY: " << gMetrics.LIBRARY << "\n" << "UNPAIRED_READS_EXAMINED: " << gMetrics.UNPAIRED_READS_EXAMINED << "\n" << "READ_PAIRS_EXAMINED: " << gMetrics.READ_PAIRS_EXAMINED << "\n" << "SECONDARY_OR_SUPPLEMENTARY_RDS: " << gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS << "\n" << "UNMAPPED_READS: " << gMetrics.UNMAPPED_READS << "\n" << "UNPAIRED_READ_DUPLICATES: " << gMetrics.UNPAIRED_READ_DUPLICATES << "\n" << "READ_PAIR_DUPLICATES: " << gMetrics.READ_PAIR_DUPLICATES << "\n" << "READ_PAIR_OPTICAL_DUPLICATES: " << gMetrics.READ_PAIR_OPTICAL_DUPLICATES << "\n" << "PERCENT_DUPLICATION: " << gMetrics.PERCENT_DUPLICATION << "\n" << "ESTIMATED_LIBRARY_SIZE: " << gMetrics.ESTIMATED_LIBRARY_SIZE << endl; cout << "singleton pos: " << gMetrics.singletonReads.size() << endl; for (auto &itr : gMetrics.DuplicateCountHist) cout << "dup counts: " << itr.first << ": " << itr.second << endl; for (auto &itr : gMetrics.NonOpticalDuplicateCountHist) cout << "non optical dup counts: " << itr.first << ": " << itr.second << endl; for (auto &itr : gMetrics.OpticalDuplicatesCountHist) cout << "optical dup counts: " << itr.first << ": " << itr.second << endl; cout << "dupsize: " << dupIdxQue.Size() << "\t" << opticalIdxQue.Size() << "\t" << repIdxQue.Size() << endl; if (!indexFn.empty() && sam_idx_save(g_outBamFp) < 0) { Error("writing index failed"); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } cout << "write time: " << tw.seconds_elapsed() << " s" << endl; /* 关闭文件,收尾清理 */ sam_close(g_outBamFp); sam_close(g_inBamFp); cout << " 总时间: " << time_all.seconds_elapsed() << endl; // cout << "计算read end: " << tm_arr[0].acc_seconds_elapsed() << endl; Timer::log_time("程序结束"); return 0; }