/* Description: 标记bam文件中的冗余信息,只处理按照坐标排序后的bam,且bam为单一样本数据 Copyright : All right reserved by ICT Author : Zhang Zhonghai Date : 2023/10/23 */ #include "markdups_arg.h" // 有太多define冲突,放到最后include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; using std::cout; #define SMA_TAG_PG "PG" #define BAM_BLOCK_SIZE 2 * 1024 * 1024 #define NO_SUCH_INDEX INT64_MAX static Timer tm_arr[10]; // 用来测试性能 /* 前向声明 */ class ThMarkDupArg; /* 全局本地变量 */ static queue g_qpThMarkDupArg; // 存放线程变量的队列 static lock_t *g_queueFirstLock = NEW_LOCK(-1); // 队列的第一个任务是否完成 static lock_t *g_readyToReadLock = NEW_LOCK(-1); // 通知主线程是否可以进行下一次读取 static vector g_vRnParser; // 每个线程一个read name parser static int g_numDuplicateIndices = 0; // 找到的冗余read总数 static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 static sam_hdr_t *g_outBamHeader; // 输出文件的header static int g_maxJobNum = 0; // 每次读取新的数据后,新增的任务数量 static int g_jobNumForRead = 0; // 任务数量降到当前值时开始下一轮读取 static volatile int64_t g_bamLoadedNum = 0; // 已经读入的read总数 static volatile int64_t g_bamWritenNum = 0; // 已经处理完,写入输出文件的read总数 static vector g_vDupIdx; // 线程内部计算得出的 static vector g_vOpticalDupIdx; static set g_sDupIdxLatter; static set g_sOpticalDupIdxLatter; /* 参数对象作为全局对象,免得多次作为参数传入函数中 */ static GlobalArg &g_gArg = GlobalArg::Instance(); static MarkDupsArg g_mdArg; /* * 计算read的分数 */ static int16_t computeDuplicateScore(BamWrap &bw) { int16_t score = 0; switch (g_mdArg.DUPLICATE_SCORING_STRATEGY) { case ns_md::SUM_OF_BASE_QUALITIES: // two (very) long reads worth of high-quality bases can go over Short.MAX_VALUE/2 // and risk overflow. score += (int16_t)min(bw.GetSumOfBaseQualities(), INT16_MAX / 2); break; case ns_md::TOTAL_MAPPED_REFERENCE_LENGTH: if (!bw.GetReadUnmappedFlag()) // no need to remember the score since this scoring mechanism is symmetric score = (int16_t)min(bw.GetReferenceLength(), INT16_MAX / 2); break; case ns_md::RANDOM: // The RANDOM score gives the same score to both reads so that they get filtered together. // it's not critical do use the readName since the scores from both ends get added, but it seem // to be clearer this way. score += (short)(Murmur3::Instance().HashUnencodedChars(bw.query_name()) & 0b11111111111111); // subtract Short.MIN_VALUE/4 from it to end up with a number between // 0 and Short.MAX_VALUE/2. This number can be then discounted in case the read is // not passing filters. We need to stay far from overflow so that when we add the two // scores from the two read mates we do not overflow since that could cause us to chose a // failing read-pair instead of a passing one. score -= INT16_MIN / 4; default: break; } // make sure that filter-failing records are heavily discounted. (the discount can happen twice, once // for each mate, so need to make sure we do not subtract more than Short.MIN_VALUE overall.) score += bw.GetReadFailsVendorQualityCheckFlag() ? (int16_t)(INT16_MIN / 2) : 0; return score; } /* * Builds a read ends object that represents a single read. 用来表示一个read的特征结构 */ static void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) { auto &k = *pKey; auto &bc = bw.b->core; k.read1ReferenceIndex = bc.tid; k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; k.read1IndexInFile = index; k.score = computeDuplicateScore(bw); // Doing this lets the ends object know that it's part of a pair if (bw.GetReadPairedFlag() && !bw.GetMateUnmappedFlag()) { k.read2ReferenceIndex = bc.mtid; } // Fill in the location information for optical duplicates rnParser.AddLocationInformation(bw.query_name(), pKey); // cout << k.tile << ' ' << k.x << ' ' << k.y << endl; // 计算位置key k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation; } /** * Takes a list of ReadEndsForMarkDuplicates objects and identify the representative read based on * quality score. For all members of the duplicate set, add the read1 index-in-file of the representative * read to the records of the first and second in a pair. This value becomes is used for * the 'DI' tag. */ static void addRepresentativeReadIndex(vector &vpRe) { } /* 处理一组pairend的readends,标记冗余 */ static void markDuplicatePairs(vector &vpRe, set *psDupIdx, set *psOpticalDupIdx) { if (vpRe.size() < 2) { if (vpRe.size() == 1) { // addSingletonToCount(libraryIdGenerator); } return; } int maxScore = 0; ReadEnds *pBestRe = nullptr; /** All read ends should have orientation FF, FR, RF, or RR **/ for (auto pe: vpRe) // 找分数最高的readend { if (pe->score > maxScore || pBestRe == nullptr) { maxScore = pe->score; pBestRe = pe; } } if (!g_mdArg.READ_NAME_REGEX.empty()) // 检查光学冗余 { // trackOpticalDuplicates } for (auto pe: vpRe) // 对非best read标记冗余 { if (pe != pBestRe) // 非best { psDupIdx->insert(pe->read1IndexInFile); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) psDupIdx->insert(pe->read2IndexInFile); // 添加read2 } } if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) { addRepresentativeReadIndex(vpRe); } } /* 处理一组非paired的readends,标记冗余 */ static void markDuplicateFragments(vector &vpRe, bool containsPairs, set *psDupIdx, set *psOpticalDupIdx) { if (containsPairs) { for (auto pe: vpRe) { if (!pe->IsPaired()) { psDupIdx->insert(pe->read1IndexInFile); } } } else { int maxScore = 0; ReadEnds *pBest = nullptr; for (auto pe : vpRe) { if (pe->score > maxScore || pBest == nullptr) { maxScore = pe->score; pBest = pe; } } for (auto pe : vpRe) { if (pe != pBest) { psDupIdx->insert(pe->read1IndexInFile); } } } } /* 多线程处理冗余参数结构体 */ struct ThMarkDupArg { int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 long seq; // 当前任务在所有任务的排序 bool more; // 后面还有任务 volatile bool finish; // 当前任务有没有处理完 vector vBam; // 存放待处理的bam read map> mvPair; // 以冗余位置为索引,保存所有pairend reads map> mvFrag; // 保存所有reads,包括pairend map> msDupIdx; // 冗余read的索引 map> msOpticalDupIdx; // optical冗余read的索引 unordered_map umReadEnds; // 用来寻找pair end }; /* * 多线程查找和标记冗余函数 */ void thread_markdups(void *arg, int tid) { auto &p = *(ThMarkDupArg *)arg; /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read { BamWrap *bw = p.vBam[i]; const int64_t bamIdx = p.bamStartIdx + i; if (bw->GetReadUnmappedFlag()) { if (bw->b->core.tid == -1) // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). break; } else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 { ReadEnds fragEnd; buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd); p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 { string key = bw->query_name(); if (p.umReadEnds.find(key) == p.umReadEnds.end()) { p.umReadEnds[key] = fragEnd; } else // 找到了pairend { auto pairedEnds = p.umReadEnds.at(key); p.umReadEnds.erase(key); // 删除找到的pairend const int matesRefIndex = fragEnd.read1ReferenceIndex; const int matesCoordinate = fragEnd.read1Coordinate; // Set orientationForOpticalDuplicates, which always goes by the first then the second end for the strands. NB: must do this // before updating the orientation later. if (bw->GetFirstOfPairFlag()) { pairedEnds.orientationForOpticalDuplicates = ReadEnds::GetOrientationByte(bw->GetReadNegativeStrandFlag(), pairedEnds.orientation == ReadEnds::R); } else { pairedEnds.orientationForOpticalDuplicates = ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, bw->GetReadNegativeStrandFlag()); } // If the other read is actually later, simply add the other read's data as read2, else flip the reads if (matesRefIndex > pairedEnds.read1ReferenceIndex || (matesRefIndex == pairedEnds.read1ReferenceIndex && matesCoordinate >= pairedEnds.read1Coordinate)) { pairedEnds.read2ReferenceIndex = matesRefIndex; pairedEnds.read2Coordinate = matesCoordinate; pairedEnds.read2IndexInFile = bamIdx; pairedEnds.orientation = ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, bw->GetReadNegativeStrandFlag()); // if the two read ends are in the same position, pointing in opposite directions, // the orientation is undefined and the procedure above // will depend on the order of the reads in the file. // To avoid this, we set it explicitly (to FR): if (pairedEnds.read2ReferenceIndex == pairedEnds.read1ReferenceIndex && pairedEnds.read2Coordinate == pairedEnds.read1Coordinate && pairedEnds.orientation == ReadEnds::RF) { pairedEnds.orientation = ReadEnds::FR; } } else { pairedEnds.read2ReferenceIndex = pairedEnds.read1ReferenceIndex; pairedEnds.read2Coordinate = pairedEnds.read1Coordinate; pairedEnds.read2IndexInFile = pairedEnds.read1IndexInFile; pairedEnds.read1ReferenceIndex = matesRefIndex; pairedEnds.read1Coordinate = matesCoordinate; pairedEnds.read1IndexInFile = bamIdx; pairedEnds.orientation = ReadEnds::GetOrientationByte(bw->GetReadNegativeStrandFlag(), pairedEnds.orientation == ReadEnds::R); } pairedEnds.score += computeDuplicateScore(*bw); p.mvPair[pairedEnds.posKey].push_back(pairedEnds); } } } } /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ // 先处理 pair int dupNum = 0; vector vRePotentialDup; // 有可能是冗余的reads for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 { if (e.second.size() > 1) // 有潜在的冗余 { vRePotentialDup.clear(); ReadEnds *pReadEnd = nullptr; for (auto &re : e.second) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) vRePotentialDup.push_back(&re); else { markDuplicatePairs(vRePotentialDup, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); vRePotentialDup.clear(); vRePotentialDup.push_back(&re); pReadEnd = &re; } } markDuplicatePairs(vRePotentialDup, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); } } // 再处理frag bool containsPairs = false; bool containsFrags = false; for (auto &e : p.mvFrag) { if (e.second.size() > 1) // 有潜在的冗余 { vRePotentialDup.clear(); ReadEnds *pReadEnd = nullptr; for (auto &re : e.second) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { vRePotentialDup.push_back(&re); containsPairs = containsPairs || re.IsPaired(); containsFrags = containsFrags || !re.IsPaired(); } else { if (vRePotentialDup.size() > 1 && containsFrags) { markDuplicateFragments(vRePotentialDup, containsPairs, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); } vRePotentialDup.clear(); vRePotentialDup.push_back(&re); pReadEnd = &re; containsPairs = re.IsPaired(); containsFrags = !re.IsPaired(); } } if (vRePotentialDup.size() > 1 && containsFrags) { markDuplicateFragments(vRePotentialDup, containsPairs, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); } } } // cout << tid << '\t' << "dup: " << dupNum << endl; // cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl; /* 本段数据处理完成,告诉输出线程 */ POSSESS(g_queueFirstLock); p.finish = true; // cout << tid << ": process: " << p.seq << endl; auto front = g_qpThMarkDupArg.front(); if (front->finish) { TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务 } else { RELEASE(g_queueFirstLock); } } /* * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 */ void thread_write(void *) { bool more = false; long seq = 0; long unPairedNum = 0; POSSESS(g_queueFirstLock); WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成 auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据 auto umUnpairedReadEnds = lastP->umReadEnds; // 还未找到pair的read auto p = lastP; g_qpThMarkDupArg.pop(); // 删除队首 TWIST(g_queueFirstLock, TO, seq); // 解锁 more = lastP->more; // 是否还有下一个任务 while (more) // 循环处理,将结果写入文件 { POSSESS(g_queueFirstLock); if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 { RELEASE(g_queueFirstLock); continue; } WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成 p = g_qpThMarkDupArg.front(); if (!p->finish) // 有可能这个任务没有完成,是下边那个TWIST导致进到这里,因为这一段代码可能运行比较快 { TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 continue; } g_qpThMarkDupArg.pop(); TWIST(g_queueFirstLock, TO, seq + 1); /* 处理结果数据 */ // cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl; for (auto &e : p->umReadEnds) // 在当前任务中找有没有与上一个任务中没匹配的read,相匹配的pair { if (umUnpairedReadEnds.find(e.first) != umUnpairedReadEnds.end()) umUnpairedReadEnds.erase(e.first); // 找到了pair else umUnpairedReadEnds.insert(e); // 没有pair,则添加 } /* 更新写入read数量和状态 */ POSSESS(g_readyToReadLock); g_bamWritenNum += lastP->vBam.size(); // cout << "write: " << g_qpThMarkDupArg.size() << endl; if (g_qpThMarkDupArg.size() <= g_jobNumForRead) { TWIST(g_readyToReadLock, TO, 1); } else { RELEASE(g_readyToReadLock); } /* 准备下一轮循环 */ delete lastP; more = p->more; lastP = p; seq++; } unPairedNum = umUnpairedReadEnds.size(); cout << "Finally unpaired read num: " << unPairedNum << endl; // 处理最后一个数据 POSSESS(g_readyToReadLock); g_bamWritenNum += lastP->vBam.size(); TWIST(g_readyToReadLock, TO, 1); // cout << "last finish: " << seq - 1 << endl; pthread_exit(0); } /* * mark duplicate 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 */ int MarkDuplicates(int argc, char *argv[]) { Timer::log_time("程序开始"); Timer time_all; /* 读取命令行参数 */ g_mdArg.parseArgument(argc, argv, &g_gArg); // 解析命令行参数 if (g_gArg.num_threads < 1) // 线程数不能小于1 g_gArg.num_threads = 1; /* 初始化一些参数和变量*/ g_vRnParser.resize(g_gArg.num_threads); for (auto &parser : g_vRnParser) parser.SetReadNameRegex(g_mdArg.READ_NAME_REGEX); // 用来解析read name中的tile,x,y信息 /* 打开输入bam文件 */ sam_hdr_t *inBamHeader; samFile *inBamFp; inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); if (!inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } hts_set_opt(inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); inBamHeader = sam_hdr_read(inBamFp); // 读取header /* 利用线程池对输入输出文件进行读写 */ htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); return -1; } hts_set_opt(inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); /* 初始化输出文件 */ char modeout[12] = "wb"; sam_open_mode(modeout + 1, g_gArg.out_fn.c_str(), NULL); g_outBamFp = sam_open(g_gArg.out_fn.c_str(), modeout); g_outBamHeader = sam_hdr_dup(inBamHeader); if (sam_hdr_write(g_outBamFp, g_outBamHeader) != 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); return -1; } hts_set_opt(g_outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); // 用同样的线程池处理输出文件 // /* 读取缓存初始化 */ BamBufType inBamBuf(g_gArg.use_asyncio); inBamBuf.Init(inBamFp, inBamHeader, g_gArg.max_mem); /* 循环读入信息,并处理 */ g_maxJobNum = g_gArg.num_threads * 10; // g_maxJobNum = g_gArg.num_threads * 3; g_jobNumForRead = g_gArg.num_threads * 2; int64_t x_all = 0; // for test int64_t jobSeq = 0; int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量,用来计算每个read在整个文件中的索引位置 threadpool thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池 thread *writeth = LAUNCH(thread_write, nullptr); // 启动处理结果的的线程 int bamRemainSize = 0; // 上一轮还剩下的bam数量,包含已经在任务里的和没有放进任务的 int numReadsForEachJob = 0; // 每个线程处理的read数量,第一次读取的时候进行设置 int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量 int curRoundProcessed = 0; // 这一轮放进任务的read数量 while (inBamBuf.ReadStat() >= 0) { /* 读取bam文件中的read */ int readNum = inBamBuf.ReadBam(); if (numReadsForEachJob == 0) numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置 g_bamLoadedNum += readNum; cout << readNum << endl; // 这一轮读取的bam数量 /* 多线程处理 任务数是线程数的10倍 */ tm_arr[0].acc_start(); curRoundProcessed = 0; // 当前轮次已经处理的reads数量 int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量 for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时,新建任务 { int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, true, false, inBamBuf.Slice(startIdx, endIdx)}); POSSESS(g_queueFirstLock); // 加锁 g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 RELEASE(g_queueFirstLock); // 解锁 thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 curRoundProcessed += endIdx - startIdx; numNeedToProcess -= numReadsForEachJob; } processedBamNum += curRoundProcessed; lastRoundUnProcessed = numNeedToProcess; /* 等待可以继续读取的信号 */ POSSESS(g_readyToReadLock); WAIT_FOR(g_readyToReadLock, TO_BE, 1); bamRemainSize = g_bamLoadedNum - g_bamWritenNum; while (bamRemainSize >= inBamBuf.Size() / 2) { // 要保留的多于现在有的bam数量的一半,那就等待write线程继续处理 TWIST(g_readyToReadLock, TO, 0); POSSESS(g_readyToReadLock); WAIT_FOR(g_readyToReadLock, TO_BE, 1); bamRemainSize = g_bamLoadedNum - g_bamWritenNum; } inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads // cout << g_bamLoadedNum << '\t' << g_bamWritenNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl; TWIST(g_readyToReadLock, TO, 0); } /* 数据读完了,放一个空的任务,好让write thread停下来 */ ThMarkDupArg *thArg = nullptr; if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据 { thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false, inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())}); processedBamNum += lastRoundUnProcessed; } else { thArg = new ThMarkDupArg({0, jobSeq++, false, false}); } POSSESS(g_queueFirstLock); // 加锁 g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 RELEASE(g_queueFirstLock); // 解锁 thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 /* 同步所有线程 */ thpool_wait(thpool); thpool_destroy(thpool); JOIN(writeth); cout <<"x_all: " << x_all << endl; cout << "loaded: " << g_bamLoadedNum << endl; cout << "writen: " << g_bamWritenNum << endl; cout << "processedBamNum: " << processedBamNum << endl; /* 标记冗余, 将处理后的结果写入文件 */ /* 关闭文件,收尾清理 */ sam_close(g_outBamFp); sam_close(inBamFp); cout << "read ends size: " << sizeof(ReadEnds) << endl; cout << " 总时间: " << time_all.seconds_elapsed() << endl; cout << "计算read end: " << tm_arr[0].acc_seconds_elapsed() << endl; Timer::log_time("程序结束"); return 0; }