/* Description: 标记bam文件中的冗余信息 Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include "markdups_arg.h" #include #include #include #include #include #include #include #include #include "htslib/thread_pool.h" #include #include #include #include using namespace std; #define BAM_BLOCK_SIZE 2 * 1024 * 1024 /* 前向声明 */ class ThMarkDupArg; /* 全局本地变量 */ static queue qpThMarkDupArg; // 存放线程变量的队列 static lock *queueFirstLock = new_lock(-1); // 队列的第一个任务是否完成 /* 多线程处理冗余参数结构体 */ struct ThMarkDupArg { vector *pvBam; int startIdx; // 闭区间 int endIdx; // 开区间 long seq; // 当前任务在所有任务的排序 bool more; // 后面还有任务 volatile bool finish; // 当前任务有没有处理完 set sDupIdx; // 冗余read的索引 }; /* * 多线程查找和标记冗余函数 */ void thread_markdups(void *arg) { auto &p = *(ThMarkDupArg *)arg; p.sDupIdx.insert(1); /* 处理数据 */ /* 本段数据处理完成,告诉输出线程 */ possess(queueFirstLock); p.finish = true; cout << "process: " << p.seq << endl; auto front = qpThMarkDupArg.front(); if (front->finish) { twist(queueFirstLock, TO, front->seq); } else { release(queueFirstLock); } } /* * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 */ void thread_write(void *) { bool more = false; long seq = 0; possess(queueFirstLock); wait_for(queueFirstLock, TO_BE, seq++); // 等待首个任务完成 auto lastP = qpThMarkDupArg.front(); // 取队首的数据 qpThMarkDupArg.pop(); // 删除队首 twist(queueFirstLock, TO, seq); more = lastP->more; while (more) // 循环处理,将结果写入文件 { possess(queueFirstLock); if (qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 { release(queueFirstLock); continue; } wait_for(queueFirstLock, TO_BE, seq); // 等待任务完成 auto p = qpThMarkDupArg.front(); if (!p->finish) // 有可能这个任务没有完成,是下边那个twist导致进到这里,因为这一段代码可能运行比较快 { twist(queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 continue; } qpThMarkDupArg.pop(); twist(queueFirstLock, TO, seq + 1); /* 处理结果数据 */ cout << "finish: " << seq - 1 << endl; /* 准备下一轮循环 */ delete lastP; more = p->more; lastP = p; seq++; } // 处理最后一个数据 cout << "finish: " << seq - 1 << endl; pthread_exit(0); } /* * Builds a read ends object that represents a single read. */ static void buildReadEnds(BamWrap &bw, int64_t index, ReadEnds *pKey) { auto &k = *pKey; auto &bc = bw.b->core; k.read1ReferenceIndex = bc.tid; k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; k.read1IndexInFile = index; } /* * mark duplicate 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 */ int MarkDuplicates(int argc, char *argv[]) { Timer::log_time("程序开始"); Timer time_all; /* 初始化参数 */ GlobalArg &gArg = GlobalArg::Instance(); MarkDupsArg mdArg; vector vAuxVar; mdArg.parseArgument(argc, argv, &gArg); // 解析命令行参数 // if (gArg.num_threads > 1) // 多线程处理 if (false) { threadpool thpool = thpool_init(gArg.num_threads); // 创建mark dup所需的线程池 thread *writeth = launch(thread_write, nullptr); // 启动处理结果的的线程 for (int i = 0; i < 40; ++i) { ThMarkDupArg *thArg = new ThMarkDupArg({nullptr, i, i * 10, i, true, false}); if (i == 39) thArg->more = false; possess(queueFirstLock); // 加锁 qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 release(queueFirstLock); // 解锁 thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 } /* 同步所有线程 */ thpool_wait(thpool); thpool_destroy(thpool); join(writeth); } else { // 单线程串行处理 /* 打开输入bam文件 */ sam_hdr_t *inBamHeader; samFile *inBamFp; inBamFp = sam_open_format(gArg.in_fn.c_str(), "r", nullptr); if (! inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); inBamHeader = sam_hdr_read(inBamFp); htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 htsThreadPool htsPoolWrite = {NULL, 0}; htsPoolRead.pool = hts_tpool_init(gArg.num_threads); htsPoolWrite.pool = hts_tpool_init(gArg.num_threads); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); return -1; } hts_set_opt(inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); /* 创建输出文件 */ samFile *outBamFp; htsFormat outFormat = {}; hts_parse_format(&outFormat, "bam"); outBamFp = sam_open_format(gArg.out_fn.c_str(), "wb", &outFormat); hts_set_opt(outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); // 用同样的线程池处理输出文件 // /* 读取缓存初始化 */ BamBufType inBamBuf(gArg.use_asyncio); inBamBuf.Init(inBamFp, inBamHeader, gArg.max_mem); /* 循环读入信息,并处理 */ while (inBamBuf.ReadStat() >= 0) { int readNum = inBamBuf.ReadBam(); cout << readNum << endl; // inBamBuf.ClearAll(); // cout << inBamBuf.Size() << endl; inBamBuf.ClearBeforeIdx(inBamBuf.Size()); // break; for (int i = 0; i < inBamBuf.Size(); ++i) { if (sam_write1(outBamFp, inBamHeader, inBamBuf[i]->b) < 0) { Error("failed writing to \"%s\"", gArg.out_fn.c_str()); sam_close(outBamFp); return -1; } } if (readNum == 0) break; } // int res = -1; // bam1_t *b = bam_init1(); // size_t num = 0; // while ((res = sam_read1(inBamFp, inBamHeader, b)) >= 0) // { // ++num; // } // cout << num << endl; /* 为每个read创建ReadEnd信息 */ /* 标记冗余, 将处理后的结果写入文件 */ /* 关闭文件,收尾清理 */ sam_close(outBamFp); sam_close(inBamFp); } // cout << "read ends size: " << sizeof(ReadEnds) << endl; cout << "总时间: " << time_all.seconds_elapsed() << endl; Timer::log_time("程序结束"); return 0; }