/* Description: 标记bam文件中的冗余信息,只处理按照坐标排序后的bam,且bam为单一样本数据 Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "markdups_arg.h" #include "md_funcs.h" #include "parallel_md.h" #include "serial_md.h" #include "shared_args.h" #include "dup_metrics.h" using namespace std; using std::cout; #define SMA_TAG_PG "PG" #define BAM_BLOCK_SIZE 16L * 1024 * 1024 #define NO_SUCH_INDEX INT64_MAX Timer tm_arr[20]; // 用来测试性能 /* 全局本地变量 */ vector g_vRnParser; // 每个线程一个read name parser samFile *g_inBamFp; // 输入的bam文件 sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息 samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 sam_hdr_t *g_outBamHeader; // 输出文件的header /* 参数对象作为全局对象,免得多次作为参数传入函数中 */ GlobalArg &g_gArg = GlobalArg::Instance(); static MarkDupsArg g_mdArg_; MarkDupsArg &g_mdArg = g_mdArg_; static GlobalDataArg gData_; GlobalDataArg &gData = gData_; DuplicationMetrics gMetrics_; DuplicationMetrics &gMetrics = gMetrics_; int zzhtestnum = 0; set zzhopticalSet; vector zzhopticalArr; /* * mark duplicate * 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 */ int MarkDuplicates(int argc, char *argv[]) { Timer::log_time("程序开始"); Timer time_all; /* 读取命令行参数 */ g_mdArg.parseArgument(argc, argv, &g_gArg); // 解析命令行参数 if (g_gArg.num_threads < 1) g_gArg.num_threads = 1; // 线程数不能小于1 /* 初始化一些参数和变量*/ g_vRnParser.resize(g_gArg.num_threads); for (auto &parser : g_vRnParser) parser.SetReadNameRegex(g_mdArg.READ_NAME_REGEX); // 用来解析read name中的tile,x,y信息 /* 打开输入bam文件 */ g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); if (!g_inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header // 获取样本名称(libraryId) gMetrics.LIBRARY = sam_hdr_line_name(g_inBamHeader, "RG", 0); /* 利用线程池对输入输出文件进行读写 */ htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 //htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); //htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); htsPoolRead.pool = hts_tpool_init(32); htsPoolWrite.pool = hts_tpool_init(32); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); sam_close(g_inBamFp); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); /* 初始化输出文件 */ char modeout[12] = "wb"; sam_open_mode(modeout + 1, g_gArg.out_fn.c_str(), NULL); g_outBamFp = sam_open(g_gArg.out_fn.c_str(), modeout); g_outBamHeader = sam_hdr_dup(g_inBamHeader); // 用同样的线程池处理输出文件 hts_set_opt(g_outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); /* 冗余检查和标记 */ if (g_gArg.num_threads == 1) { serialMarkDups(); // 串行运行 } else { parallelMarkDups(); // 并行运行 } /* 标记冗余, 将处理后的结果写入文件 */ sam_close(g_inBamFp); // 重新打开bam文件 g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); if (!g_inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header if (sam_hdr_write(g_outBamFp, g_outBamHeader) != 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } // 输出index文件 // string indexFn = g_gArg.out_fn + ".csi"; // 现在索引都是csi格式的 string indexFn = g_gArg.out_fn + ".bai"; // min_shift = 0 是bai格式 int index_min_shift = 0; if (g_mdArg.INDEX_FORMAT == ns_md::IndexFormat::CSI) { indexFn = g_gArg.out_fn + ".csi"; index_min_shift = 14; } if (sam_idx_init(g_outBamFp, g_outBamHeader, 0 /*csi 14*/, indexFn.c_str()) < 0) { Error("failed to open index \"%s\" for writing", indexFn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } // 读取输入文件 // BamBufType inBuf(false); BamBufType inBuf(g_gArg.use_asyncio); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); DupIdxQueue idxQue; idxQue.Init(&gData.dupIdxArr); Timer tw; cout << "dupsize: " << idxQue.Size() << endl; uint64_t bamIdx = 0; uint64_t dupIdx = idxQue.Pop(); cout << "dup arr size: " << gData.dupIdxArr.size() << endl; cout << "first dup: " << dupIdx << endl; while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); // cout << "read: " << readNum << endl; for (size_t i = 0; i < inBuf.Size(); ++i) { /* 判断是否冗余 */ if (bamIdx == dupIdx) { // cerr << bamIdx << endl; dupIdx = idxQue.Pop(); } if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } ++bamIdx; } inBuf.ClearAll(); cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl; } cout << "dupsize: " << idxQue.Size() << endl; if (sam_idx_save(g_outBamFp) < 0) { Error("writing index failed"); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } cout << "write time: " << tw.seconds_elapsed() << " s" << endl; /* 关闭文件,收尾清理 */ sam_close(g_outBamFp); sam_close(g_inBamFp); cout << " 总时间: " << time_all.seconds_elapsed() << endl; // cout << "计算read end: " << tm_arr[0].acc_seconds_elapsed() << endl; Timer::log_time("程序结束"); return 0; }