From 2d114058a1ca62fac6efffc89fbd975ef3a2a763 Mon Sep 17 00:00:00 2001 From: zzh Date: Mon, 4 Dec 2023 18:02:07 +0800 Subject: [PATCH] =?UTF-8?q?normal=E6=B2=A1=E9=97=AE=E9=A2=98=E4=BA=86?= =?UTF-8?q?=EF=BC=8Ctumor=E7=BB=93=E6=9E=9C=E8=BF=98=E4=B8=8D=E4=B8=80?= =?UTF-8?q?=E6=A0=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run.sh | 2 +- src/sam/markdups/markdups.cpp | 11 + src/sam/markdups/md_funcs.h | 139 ++++-- src/sam/markdups/parallel_md.h | 540 +------------------- src/sam/markdups/serial_md.h | 866 +++++++++++++++++++++++---------- src/sam/utils/read_ends.h | 9 + 6 files changed, 717 insertions(+), 850 deletions(-) diff --git a/run.sh b/run.sh index a90cc86..3ee5811 100755 --- a/run.sh +++ b/run.sh @@ -1,6 +1,6 @@ time /home/zzh/work/GeneKit/picard_cpp/build/bin/picard_cpp \ MarkDuplicates \ - --INPUT /mnt/d/data/zy_normal.bam \ + --INPUT /mnt/d/data/zy_tumor.bam \ --OUTPUT /mnt/d/data/out.bam \ --num_threads 1 \ --max_mem 4G \ diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 3d8ebbc..8ec880c 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -40,6 +40,17 @@ using std::cout; #define NO_SUCH_INDEX INT64_MAX static Timer tm_arr[10]; // 用来测试性能 +/* 全局本地变量 */ +static vector g_vRnParser; // 每个线程一个read name parser +static samFile *g_inBamFp; // 输入的bam文件 +static sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息 +static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 +static sam_hdr_t *g_outBamHeader; // 输出文件的header + +/* 参数对象作为全局对象,免得多次作为参数传入函数中 */ +static GlobalArg &g_gArg = GlobalArg::Instance(); +static MarkDupsArg g_mdArg; + #include "md_funcs.h" #include "serial_md.h" diff --git a/src/sam/markdups/md_funcs.h b/src/sam/markdups/md_funcs.h index b6c06e2..89655b8 100644 --- a/src/sam/markdups/md_funcs.h +++ b/src/sam/markdups/md_funcs.h @@ -1,29 +1,71 @@ +#include /* 前向声明 */ -class ThMarkDupArg; -/* 全局本地变量 */ -static queue g_qpThMarkDupArg; // 存放线程变量的队列 -static lock_t *g_queueFirstLock = NEW_LOCK(-1); // 队列的第一个任务是否完成 -static lock_t *g_readyToReadLock = NEW_LOCK(-1); // 通知主线程是否可以进行下一次读取 -static vector g_vRnParser; // 每个线程一个read name parser -static int g_numDuplicateIndices = 0; // 找到的冗余read总数 -static samFile *g_inBamFp; // 输入的bam文件 -static sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息 -static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 -static sam_hdr_t *g_outBamHeader; // 输出文件的header -static int g_maxJobNum = 0; // 每次读取新的数据后,新增的任务数量 -static int g_jobNumForRead = 0; // 任务数量降到当前值时开始下一轮读取 -static volatile int64_t g_bamLoadedNum = 0; // 已经读入的read总数 -static volatile int64_t g_bamUsedNum = 0; // 已经处理完,写入输出文件的read总数 -static vector g_vDupIdx; // 线程内部计算得出的 -static vector g_vOpticalDupIdx; -static set g_sDupIdxLatter; -static set g_sOpticalDupIdxLatter; -static bool g_serialProcess = false; // 是否串行运行 +/* 存放readend或者冗余idx,避免频繁的开辟和释放内存 */ +template +struct DupContainer +{ + vector> arr; // 类似map> 或 map> + vector pos; // arr中每个元素对应的position + // unordered_map idx; // 某个位点对应在vector中的坐标 + tsl::robin_map idx; // 某个位点对应在vector中的坐标 + int64_t size = 0; // 实际使用的空间 + int64_t capacity = 0; // 内存容量 + inline void Init() + { + idx.clear(); + size = 0; + } + inline void SortAtPos(int64_t p) // 这里的pos表示位点 + { + if (idx.find(p) != idx.end()) + { + const int64_t i = idx.at(p); + std::sort(arr[i].begin(), arr[i].end()); + } + } + inline void SortAtIdx(int64_t i) // 这里的i表示vector的索引 + { + std::sort(arr[i].begin(), arr[i].end()); + } + inline void RemoveAtPos(int64_t p) + { + if (idx.find(p) != idx.end()) + { + const int64_t i = idx.at(p); + arr[i].clear(); + } + } + inline void RemoveAtIdx(int64_t i) // 这里的i表示vector的索引 + { + arr[i].clear(); + } + inline void AddAtPos(int64_t p, T &val) + { + AtPos(p).push_back(val); + } + + inline vector &AtPos(int64_t p) + { + if (idx.find(p) != idx.end()) + { + const int64_t i = idx.at(p); + return arr[i]; + } -/* 参数对象作为全局对象,免得多次作为参数传入函数中 */ -static GlobalArg &g_gArg = GlobalArg::Instance(); -static MarkDupsArg g_mdArg; + if (size >= capacity) + { + capacity += 1; + arr.push_back(vector()); + pos.push_back(0); + } + const int64_t i = size++; + idx[p] = i; + pos[i] = p; + arr[i].clear(); + return arr[i]; + } +}; /* 清除key位置的数据 */ static inline void clearIdxAtPos(int64_t key, map> *pmsIdx) @@ -119,8 +161,8 @@ static void addRepresentativeReadIndex(vector &vpRe) /* 处理一组pairend的readends,标记冗余 */ static void markDuplicatePairs(int64_t posKey, vector &vpRe, - map> *pmsDupIdx, - map> *pmsOpticalDupIdx) + DupContainer *dupIdx, + DupContainer *opticalDupIdx) { if (vpRe.size() < 2) { @@ -131,8 +173,9 @@ static void markDuplicatePairs(int64_t posKey, return; } // cout << "pos:" << posKey + 1 << ";size:" << vpRe.size() << endl; - auto &sDupIdx = (*pmsDupIdx)[posKey]; - auto &sOpticalDupIdx = (*pmsOpticalDupIdx)[posKey]; + auto &vDupIdx = dupIdx->AtPos(posKey); + auto &vOpticalDupIdx = opticalDupIdx->AtPos(posKey); + int maxScore = 0; const ReadEnds *pBest = nullptr; /** All read ends should have orientation FF, FR, RF, or RR **/ @@ -152,9 +195,9 @@ static void markDuplicatePairs(int64_t posKey, { if (pe != pBest) // 非best { - sDupIdx.insert(pe->read1IndexInFile); // 添加read1 + vDupIdx.push_back(pe->read1IndexInFile); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) - sDupIdx.insert(pe->read2IndexInFile); // 添加read2 + vDupIdx.push_back(pe->read2IndexInFile); // 添加read2 } } @@ -168,9 +211,9 @@ static void markDuplicatePairs(int64_t posKey, static void markDuplicateFragments(int64_t posKey, vector &vpRe, bool containsPairs, - map> *pmsDupIdx) + DupContainer *dupIdx) { - auto &sDupIdx = (*pmsDupIdx)[posKey]; + auto &vDupIdx = dupIdx->AtPos(posKey); if (containsPairs) { @@ -178,7 +221,7 @@ static void markDuplicateFragments(int64_t posKey, { if (!pe->IsPaired()) { - sDupIdx.insert(pe->read1IndexInFile); + vDupIdx.push_back(pe->read1IndexInFile); } } } @@ -199,7 +242,7 @@ static void markDuplicateFragments(int64_t posKey, { if (pe != pBest) { - sDupIdx.insert(pe->read1IndexInFile); + vDupIdx.push_back(pe->read1IndexInFile); } } } @@ -207,45 +250,47 @@ static void markDuplicateFragments(int64_t posKey, /* 处理位于某个坐标的pairend reads */ static inline void handlePairs(int64_t posKey, - set &sReadEnds, + vector &readEnds, vector &vpCache, - map> *pmsDupIdx, - map> *pmsOpticalDupIdx) + DupContainer *dupIdx, + DupContainer *opticalDupIdx) { - if (sReadEnds.size() > 1) // 有潜在的冗余 + if (readEnds.size() > 1) // 有潜在的冗余 { vpCache.clear(); +// std::sort(readEnds.begin(), readEnds.end()); const ReadEnds *pReadEnd = nullptr; - for (auto &re : sReadEnds) + for (auto &re : readEnds) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 vpCache.push_back(&re); // 处理一个潜在的冗余组 else { - markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx); // 不一样 + markDuplicatePairs(posKey, vpCache, dupIdx, opticalDupIdx); // 不一样 vpCache.clear(); vpCache.push_back(&re); pReadEnd = &re; } } - markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx); + markDuplicatePairs(posKey, vpCache, dupIdx, opticalDupIdx); } } /* 处理位于某个坐标的 reads */ static inline void handleFrags( int64_t posKey, - set &sReadEnds, + vector &readEnds, vector &vpCache, - map> *pmsDupIdx) + DupContainer *dupIdx) { - if (sReadEnds.size() > 1) // 有潜在的冗余 + if (readEnds.size() > 1) // 有潜在的冗余 { vpCache.clear(); +// std::sort(readEnds.begin(), readEnds.end()); const ReadEnds *pReadEnd = nullptr; bool containsPairs = false; bool containsFrags = false; - for (auto &re : sReadEnds) + for (auto &re : readEnds) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { @@ -257,7 +302,7 @@ static inline void handleFrags( { if (vpCache.size() > 1 && containsFrags) { - markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx); + markDuplicateFragments(posKey, vpCache, containsPairs, dupIdx); } vpCache.clear(); vpCache.push_back(&re); @@ -268,13 +313,13 @@ static inline void handleFrags( } if (vpCache.size() > 1 && containsFrags) { - markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx); + markDuplicateFragments(posKey, vpCache, containsPairs, dupIdx); } } } /* 对找到的pairend read end添加一些信息 */ -static inline void modifyPairedEnds(ReadEnds &fragEnd, ReadEnds *pPairedEnds) +static inline void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds) { auto &pairedEnds = *pPairedEnds; int64_t bamIdx = fragEnd.read1IndexInFile; diff --git a/src/sam/markdups/parallel_md.h b/src/sam/markdups/parallel_md.h index 3dd77ca..00fb0f5 100644 --- a/src/sam/markdups/parallel_md.h +++ b/src/sam/markdups/parallel_md.h @@ -1,540 +1,4 @@ - - -/* 多线程处理冗余参数结构体 */ -struct ThMarkDupArg +void parallelMarkDups() { - int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 - long seq; // 当前任务在所有任务的排序 - bool more; // 后面还有任务 - volatile bool finish; // 当前任务有没有处理完 - vector vBam; // 存放待处理的bam read - map> mvPair; // 以冗余位置为索引,保存所有pairend reads - map> mvFrag; // 保存所有reads,包括pairend - map> msPairDupIdx; // pair的冗余read的索引 - map> msPairOpticalDupIdx; // optical冗余read的索引 - map> msFragDupIdx; // frag的冗余read的索引 - map> msFragOpticalDupIdx; // 这个好像没用 - unordered_map umReadEnds; // 用来寻找pair end -}; - -/* - * 多线程查找和标记冗余函数 - */ -void thread_markdups(void *arg, int tid) -{ - auto &p = *(ThMarkDupArg *)arg; - - /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read - { - BamWrap *bw = p.vBam[i]; - const int64_t bamIdx = p.bamStartIdx + i; - if (bw->GetReadUnmappedFlag()) - { - if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - break; - } - else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 - { - ReadEnds fragEnd; - buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd); - - // if (fragEnd.read1Coordinate == 69662) - // { - // cout << fragEnd.read1Coordinate << '\t' << bw->GetUnclippedEnd() << '\t' - // << bw->GetUnclippedStart() << '\t' << bw->query_name() << '\t' - // << bw->cigar_str() << '\t' << bw->b->core.pos << '\t' << - // bw->b->core.mpos << endl; - // } - - p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合 - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 - { - // if (bw->b->core.pos == 69813 || bw->b->core.pos == 69884) - // { - // cout << fragEnd.read1Coordinate << '\t' << bw->query_name() << endl; - // } - - string key = bw->query_name(); - if (p.umReadEnds.find(key) == p.umReadEnds.end()) - { - p.umReadEnds[key] = fragEnd; - } - else // 找到了pairend - { - auto &pairedEnds = p.umReadEnds.at(key); - modifyPairedEnds(fragEnd, &pairedEnds); - p.mvPair[pairedEnds.posKey].push_back(pairedEnds); - p.umReadEnds.erase(key); // 删除找到的pairend - // if (pairedEnds.read1Coordinate == 69662) - //{ - // cout << pairedEnds.posKey << endl; - // cout << pairedEnds.read1Coordinate << '\t' - // << pairedEnds.read2Coordinate << '\t' - // << (int)pairedEnds.orientation << endl; - // //<< fragEnd.read1Coordinate << '\t' - // //<< fragEnd.posKey << '\t' - // //<< bw->b->core.tid << '\t' << bw->b->core.pos << '\t' - // //<< bw->GetUnclippedEnd() << '\t' << bw->GetUnclippedStart() << endl; - // } - } - } - } - } - /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - unordered_set usUnpairedPos; // 该位置有还未找到pair的read - for (auto &ele : p.umReadEnds) - { - usUnpairedPos.insert(ele.second.posKey); - } - // 先处理 pair - int dupNum = 0; - vector vRePotentialDup; // 有可能是冗余的reads - for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 - { - // 如果这个位置里所有的read,没有缺少pair的,那么就处理,否则跳过 - // if (usUnpairedPos.find(e.first) == usUnpairedPos.end()) - // handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); - } - // 再处理frag - for (auto &e : p.mvFrag) - { - // handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx, &p.msFragOpticalDupIdx); - } - - // cout << tid << '\t' << "dup: " << dupNum << endl; - // cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl; - /* 本段数据处理完成,告诉输出线程 */ - if (!g_serialProcess) - { - POSSESS(g_queueFirstLock); - p.finish = true; - // cout << tid << ": process: " << p.seq << endl; - auto front = g_qpThMarkDupArg.front(); - if (g_qpThMarkDupArg.size() > 0) // 表明是并行处理 - { - if (front->finish) - { - TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务 - } - else - { - RELEASE(g_queueFirstLock); - } - } - } -} - -/* - * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 - * 1. pre和cur,重叠的部分合并进pre,删除cur中保存的对应的数据和结果 - * 2. 保留全局没有匹配上的pairend数据,从pre结果中删除 - * 3. 每次循环,处理一次mvUnpaired数据,如果对应的位置没有未匹配的,则计算结果,然后删除掉该位置 - */ -void thread_merge(void *) -{ - bool more = false; // 是否还有下一个任务 - long seq = 0; // 任务序列号 - long unPairedNum = 0; // 没有找到pair的总数量 - long dupReadNum = 0; // 冗余read总数量 - POSSESS(g_queueFirstLock); - WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成 - auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据 - - // unordered_map umUnpairedReadEnds; // 还未找到pair的read - map> mvUnPaired; // 这些坐标对应的reads里,有还没找到pair的 - unordered_set usUnpairedPos; // 上一轮中不需要计算的点(unpaired) - unordered_map> mvPair; // 上一轮中遗留的需要重新计算的pair(包括重叠和没paired) - unordered_map> mvFrag; // 上一轮中需要重新计算的frag(与这一轮位置有重叠的) - vector vRePotentialDup; // 有可能是冗余的reads - map> msPairDupIdx; // pair的冗余read的索引 - map> msPairOpticalDupIdx; // optical冗余read的索引 - auto umUnpairedReadEnds = lastP->umReadEnds; - auto p = lastP; - g_qpThMarkDupArg.pop(); // 删除队首 - TWIST(g_queueFirstLock, TO, seq); // 解锁 - more = lastP->more; // 是否还有下一个任务 - while (more) // 循环处理,将结果写入文件 - { - POSSESS(g_queueFirstLock); - if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 - { - RELEASE(g_queueFirstLock); - continue; - } - WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成 - p = g_qpThMarkDupArg.front(); - if (!p->finish) // 有可能这个任务没有完成,是下边那个TWIST导致进到这里,因为这一段代码可能运行比较快 - { - TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 - continue; - } - g_qpThMarkDupArg.pop(); - TWIST(g_queueFirstLock, TO, seq + 1); - /* 处理结果数据 */ - // cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl; - // 找paired中重叠的部分 - mvPair.clear(); - mvFrag.clear(); - usUnpairedPos.clear(); - - int64_t lastPairPos = lastP->mvPair.rbegin()->first; // 上一轮read最后到达的坐标 - for (auto &pair : p->mvPair) - { - const int64_t pos = pair.first; - if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 - break; - mvPair.insert(pair); // 保存此轮任务当前位点的数据 - clearIdxAtPos(pos, &p->msPairDupIdx); // 清除该位点的冗余结果 - clearIdxAtPos(pos, &p->msPairOpticalDupIdx); - if (lastP->mvPair.find(pos) != lastP->mvPair.end()) // 上一个任务同一个位点也有数据 - { - mvPair[pos].insert(mvPair[pos].end(), lastP->mvPair[pos].begin(), lastP->mvPair[pos].end()); - clearIdxAtPos(pos, &lastP->msPairDupIdx); // 清除该位点的冗余结果 - clearIdxAtPos(pos, &lastP->msPairOpticalDupIdx); - } - } - // 找上一轮中没配对的pair - for (auto itr = p->umReadEnds.begin(); itr != p->umReadEnds.end();) // 在当前任务中找有没有与上一个任务中没匹配的read,相匹配的pair - { - auto key = itr->second.posKey; - auto &fragEnd = itr->second; - if (lastP->umReadEnds.find(itr->first) != lastP->umReadEnds.end()) - { - auto &pairedEnds = lastP->umReadEnds.at(itr->first); - modifyPairedEnds(fragEnd, &pairedEnds); - mvPair[key].push_back(pairedEnds); - lastP->umReadEnds.erase(itr->first); // 删除 - if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) - umUnpairedReadEnds.erase(itr->first); - itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read - } - else - { - if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) // 前边一直没找到paired的数据,在这一轮找到了 - { - auto &pairedEnds = umUnpairedReadEnds.at(itr->first); - modifyPairedEnds(fragEnd, &pairedEnds); - mvUnPaired[key].push_back(pairedEnds); - umUnpairedReadEnds.erase(itr->first); // 删除 - itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read - } - else // 新的没配对的 - { - umUnpairedReadEnds.insert(*itr); // 没有pair,则添加 - auto &v = p->mvPair[key]; - mvUnPaired[key].insert(mvUnPaired[key].end(), v.begin(), v.end()); - ++itr; - } - } - } - // 计算上一轮还需要计算的pair - for (auto &e : lastP->umReadEnds) - usUnpairedPos.insert(e.second.posKey); - for (auto &e : mvPair) - { - //if (usUnpairedPos.find(e.first) == usUnpairedPos.end()) - // handlePairs(e.first, e.second, vRePotentialDup, &lastP->msPairDupIdx, &lastP->msPairOpticalDupIdx); - } - - // 计算多轮之后遗留的pair - usUnpairedPos.clear(); - for (auto &e : umUnpairedReadEnds) - usUnpairedPos.insert(e.second.posKey); - for (auto itr = mvUnPaired.begin(); itr != mvUnPaired.end();) - { - if (usUnpairedPos.find(itr->first) == usUnpairedPos.end()) - { - // handlePairs(itr->first, itr->second, vRePotentialDup, &msPairDupIdx, &msPairOpticalDupIdx); - // itr = mvUnPaired.erase(itr); - } - else - { - ++itr; - } - } - - // 找上一轮重叠的frag - int64_t lastFragPos = lastP->mvFrag.rbegin()->first; - for (auto &pair : p->mvFrag) - { - const int64_t pos = pair.first; - if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 - break; - mvFrag.insert(pair); // 保存此轮任务当前位点的数据 - clearIdxAtPos(pos, &p->msFragDupIdx); // 清除该位点的冗余结果 - if (lastP->mvFrag.find(pos) != lastP->mvFrag.end()) // 上一个任务同一个位点也有数据 - { - mvFrag[pos].insert(mvFrag[pos].end(), lastP->mvFrag[pos].begin(), lastP->mvFrag[pos].end()); - clearIdxAtPos(pos, &lastP->msFragDupIdx); // 上一个任务该位点有冗余结果 - } - } - for (auto &e : mvFrag) - { - // handleFrags(e.first, e.second, vRePotentialDup, &lastP->msFragDupIdx, &lastP->msFragOpticalDupIdx); - } - - // 计算冗余数量 - for (auto &ele : lastP->msPairDupIdx) - dupReadNum += ele.second.size(); - for (auto &ele : lastP->msFragDupIdx) - dupReadNum += ele.second.size(); - - /* 更新处理完的read数量和状态 */ - POSSESS(g_readyToReadLock); - g_bamUsedNum += lastP->vBam.size(); - // cout << "write: " << g_qpThMarkDupArg.size() << endl; - if (g_qpThMarkDupArg.size() <= g_jobNumForRead) - { - TWIST(g_readyToReadLock, TO, 1); - } - else - { - RELEASE(g_readyToReadLock); - } - /* 准备下一轮循环 */ - delete lastP; - more = p->more; - lastP = p; - seq++; - // cout << "unpaired: " << lastP->umReadEnds.size() << endl; - } - - unPairedNum = umUnpairedReadEnds.size(); - // 计算冗余数量 - for (auto &ele : lastP->msPairDupIdx) - dupReadNum += ele.second.size(); - for (auto &ele : lastP->msFragDupIdx) - dupReadNum += ele.second.size(); - - // 多轮遗留的 - for (auto &ele : msPairDupIdx) - dupReadNum += ele.second.size(); - - // cout << "Finally unpaired read num: " << unPairedNum << endl; - // cout << "Finally dulicate read num: " << dupReadNum << endl; - // cout << "last unpaired num: " << lastP->umReadEnds.size() << endl; - int unpairedReads = 0; - for (auto &e : mvUnPaired) - unpairedReads += e.second.size(); - // cout << "mvUnpaired size: " << mvUnPaired.size() << endl; - // cout << "mvUnpaired vector size: " << unpairedReads << endl; - - // 最后一个数据处理完了 - POSSESS(g_readyToReadLock); - g_bamUsedNum += lastP->vBam.size(); - TWIST(g_readyToReadLock, TO, 1); - // cout << "last finish: " << seq - 1 << endl; - delete lastP; - pthread_exit(0); -} - -/* 串行运行 */ -void serialProcessData(BamBufType &inBamBuf) -{ - ThMarkDupArg p({0, - 0, - true, - true, - inBamBuf.Slice(0, inBamBuf.Size())}); - thread_markdups(&p, 0); - /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - unordered_set usUnpairedPos; // 该位置有还未找到pair的read - for (auto &ele : p.umReadEnds) - { - usUnpairedPos.insert(ele.second.posKey); - } - // 先处理 pair - int dupNum = 0; - vector vRePotentialDup; // 有可能是冗余的reads - for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 - { - // 如果这个位置里所有的read,没有缺少pair的,那么就处理,否则跳过 - // if (usUnpairedPos.find(e.first) != usUnpairedPos.end()) - // handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); - } - - // int64_t estimateDupNum = 0; - // for (auto &e : p.mvPair) - // { - // int FF = 0; - // int FR = 0; - // int RF = 0; - // int RR = 0; - // for (auto &re : e.second) - // { - // if (re.orientation == ReadEnds::FF) - // FF++; - // else if (re.orientation == ReadEnds::FR) - // FR++; - // else if (re.orientation == ReadEnds::RF) - // RF++; - // else - // RR++; - // } - // if (FF > 1) { - // estimateDupNum += (FF - 1); - // } - // if (FR > 1) - // { - // estimateDupNum += (FR - 1); - // } - // if (RF > 1) - // { - // estimateDupNum += (RF - 1); - // } - // if (RR > 1) - // { - // estimateDupNum += (RR - 1); - // } - // } - - // cout << "Estimate dup num: " << estimateDupNum << endl; - - // cout << "Finally unpaired read num: " << p.umReadEnds.size() << endl; - int dupReadNum = 0; - for (auto &ele : p.msPairDupIdx) - dupReadNum += ele.second.size(); - cout << "pair dulicate read num: " << dupReadNum << endl; - for (auto &ele : p.msFragDupIdx) - dupReadNum += ele.second.size(); - cout << "Finally dulicate read num: " << dupReadNum << endl; - - // int pairNum = 0, fragNum = 0; - // for (auto &e : p.mvPair) - // pairNum += e.second.size(); - // for (auto &e : p.mvFrag) - // fragNum += e.second.size(); - // cout << "pair num: " << pairNum << endl; - // cout << "frag num: " << fragNum << endl; -} - -static void parallelMarkDups() -{ - - // /* 读取缓存初始化 */ - BamBufType inBamBuf(g_gArg.use_asyncio); - inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); - - /* 循环读入信息,并处理 */ - g_maxJobNum = g_gArg.num_threads * 10; - // g_maxJobNum = g_gArg.num_threads * 3; - g_jobNumForRead = g_gArg.num_threads * 2; - - int64_t x_all = 0; // for test - int64_t jobSeq = 0; - int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量,用来计算每个read在整个文件中的索引位置 - threadpool thpool; - thread *mergeth; - if (g_gArg.num_threads == 1) - { - g_serialProcess = true; - } - else - { - thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池 - mergeth = LAUNCH(thread_merge, nullptr); // 启动处理结果的的线程,合并所有线程的结果 - } - const int minReadNum = 1000000; // 最小并行处理的read数量 - int bamRemainSize = 0; // 上一轮还剩下的bam数量,包含已经在任务里的和没有放进任务的 - int numReadsForEachJob = 0; // 每个线程处理的read数量,第一次读取的时候进行设置 - int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量 - int curRoundProcessed = 0; // 这一轮放进任务的read数量 - while (inBamBuf.ReadStat() >= 0) - { - /* 读取bam文件中的read */ - size_t readNum = inBamBuf.ReadBam(); - cout << readNum << endl; // 这一轮读取的bam数量 - - // if (readNum <= minReadNum) - // { - // serialProcess = true; - // // 调用串行运行代码 - // serialProcessData(inBamBuf); - // break; - // } - if (g_serialProcess) - { - tm_arr[0].acc_start(); - serialProcessData(inBamBuf); - tm_arr[0].acc_end(); - inBamBuf.ClearAll(); - continue; - } - - if (numReadsForEachJob == 0) - numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置 - g_bamLoadedNum += readNum; - - /* 多线程处理 任务数是线程数的10倍 */ - - curRoundProcessed = 0; // 当前轮次已经处理的reads数量 - int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量 - for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时,新建任务 - { - int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; - int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; - - ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, - jobSeq++, - true, - false, - inBamBuf.Slice(startIdx, endIdx)}); - POSSESS(g_queueFirstLock); // 加锁 - g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 - RELEASE(g_queueFirstLock); // 解锁 - thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 - curRoundProcessed += endIdx - startIdx; - numNeedToProcess -= numReadsForEachJob; - } - processedBamNum += curRoundProcessed; - lastRoundUnProcessed = numNeedToProcess; - - /* 等待可以继续读取的信号 */ - POSSESS(g_readyToReadLock); - WAIT_FOR(g_readyToReadLock, TO_BE, 1); - bamRemainSize = g_bamLoadedNum - g_bamUsedNum; - - while (bamRemainSize >= inBamBuf.Size() / 2) - { // 要保留的多于现在有的bam数量的一半,那就等待write线程继续处理 - TWIST(g_readyToReadLock, TO, 0); - POSSESS(g_readyToReadLock); - WAIT_FOR(g_readyToReadLock, TO_BE, 1); - bamRemainSize = g_bamLoadedNum - g_bamUsedNum; - } - inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads - // cout << g_bamLoadedNum << '\t' << g_bamUsedNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl; - TWIST(g_readyToReadLock, TO, 0); - } - if (!g_serialProcess) - { - /* 数据读完了,放一个空的任务,好让write thread停下来 */ - ThMarkDupArg *thArg = nullptr; - if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据 - { - thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false, - inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())}); - processedBamNum += lastRoundUnProcessed; - } - else - { - thArg = new ThMarkDupArg({0, jobSeq++, false, false}); - } - POSSESS(g_queueFirstLock); // 加锁 - g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 - RELEASE(g_queueFirstLock); // 解锁 - thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 - - /* 同步所有线程 */ - thpool_wait(thpool); - thpool_destroy(thpool); - JOIN(mergeth); - } - inBamBuf.FreeMemory(); - - cout << "x_all: " << x_all << endl; - cout << "loaded: " << g_bamLoadedNum << endl; - cout << " used: " << g_bamUsedNum << endl; - cout << "processedBamNum: " << processedBamNum << endl; + } \ No newline at end of file diff --git a/src/sam/markdups/serial_md.h b/src/sam/markdups/serial_md.h index 65efc5e..d5ddd6d 100644 --- a/src/sam/markdups/serial_md.h +++ b/src/sam/markdups/serial_md.h @@ -1,58 +1,210 @@ +#include +#include + +/* 存放未匹配readend相同位点的所有readend */ +struct UnpairedREInfo +{ + int64_t taskSeq; + ReadEnds unpairedRE; +}; + +struct GlobalUnpairedInfo +{ + int64_t taskSeq; + vector reArr; +}; + +struct TaskSeqDupInfo +{ + set dupIdx; + set opticalDupIdx; + set notDupIdx; +}; + +// typedef unordered_map UnpairedNameMap; +// typedef unordered_map> UnpairedPositionMap; + +typedef tsl::robin_map UnpairedNameMap; +typedef tsl::robin_map> UnpairedPositionMap; + /* 单线程处理冗余参数结构体 */ struct SerailMarkDupArg { - int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 - vector vBam; // 存放待处理的bam read - map> msPair; // 以冗余位置为索引,保存所有pairend reads - map> msFrag; // 保存所有reads,包括pairend - map> msPairDupIdx; // pair的冗余read的索引 - map> msPairOpticalDupIdx; // optical冗余read的索引 - map> msFragDupIdx; // frag的冗余read的索引 - unordered_map unpairedDic; // 用来寻找pair end + int64_t taskSeq; + int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 + vector bams; // 存放待处理的bam read + vector pairs; + vector frags; + set pairDupIdx; // pair的冗余read的索引 + set pairOpticalDupIdx; // optical冗余read的索引 + set fragDupIdx; // frag的冗余read的索引 + UnpairedNameMap unpairedDic; // 用来寻找pair end + UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 }; /* 全局保留的数据,因为有些paired数据比对到了不同的染色体,相距甚远 */ struct GlobalDataArg { - map> msPair; // 以冗余位置为索引,保存所有pairend reads - map> msPairDupIdx; // pair的冗余read的索引 - map> msPairOpticalDupIdx; // optical冗余read的索引 - unordered_map unpairedDic; // 用来寻找pair end - set dupIdx; - unordered_set opticalDupIdx; - map> test; + set pairDupIdx; // pair的冗余read的索引 + set pairOpticalDupIdx; // optical冗余read的索引 + set notDupIdx; // 不是冗余 + //unordered_map unpairedDic; // 用来寻找pair end + //unordered_map> unpairedPosArr; + UnpairedNameMap unpairedDic; // 用来寻找pair end + UnpairedPositionMap unpairedPosArr; + + // 每个task对应一个vector + vector> dupIdxArr; + vector> opticalDupIdxArr; }; static GlobalDataArg gData; -/* 删除某个位点的pairend的冗余idx */ -static void rmPairIdxAtPos(const int64_t pos, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) + +/* 查找 */ +template +static inline Itr binaryFind(Itr first, Itr last, const T &val) { - delIdxAtPos(pos, &curArg->msPairDupIdx); // 删除该位点的冗余结果 - delIdxAtPos(pos, &curArg->msPairOpticalDupIdx); - clearIdxAtPos(pos, &lastArg->msPairDupIdx); // 清除该位点的冗余结果 - clearIdxAtPos(pos, &lastArg->msPairOpticalDupIdx); + first = std::lower_bound(first, last, val); + return (first != last && *first == val) ? first : last; } -static void clearPairIdxAtPos(const int64_t pos, SerailMarkDupArg *task) +/* 排序 */ +static inline void sortReadEndsArr(vector &arr) { - clearIdxAtPos(pos, &task->msPairDupIdx); // 删除该位点的冗余结果 - clearIdxAtPos(pos, &task->msPairOpticalDupIdx); + size_t blockSize = 64 * 1024; + blockSize = min(blockSize, arr.size()); + size_t blockNum = (arr.size() + blockSize - 1) / blockSize; + size_t crossNum = 1024; + size_t start, end, i, left, right; + std::sort(arr.begin(), arr.begin() + blockSize); + for (i = 1; i < blockNum; ++i) + { + start = i * blockSize; + end = min(start + blockSize, arr.size()); + std::sort(arr.begin() + start, arr.begin() + end); + left = crossNum; + while (!(arr[start - left] < arr[start])) + { + left = left << 1; + if (left >= blockSize) + { + std::sort(arr.begin(), arr.end()); // 退化到普通排序 + return; + } + } + right = min(crossNum, end - start - 1); + + while (!(arr[start - 1] < arr[start + right])) + { + right = min(right << 1, end - start - 1); + if (right == end - start - 1) + break; + } + // cout << "sort: " << left << ' ' << right << ' ' + // << arr[start - left].posKey << ' ' << arr[start - 1].posKey << ' ' + // << arr[start].posKey << ' ' << arr[start + right].posKey << endl; + std::sort(arr.begin() + start - left, arr.begin() + start + right); + } } -static void clearPairIdxAtPos(const int64_t pos, - map> *dupIdx, - map> *opticalDupIdx) +/* 处理一组pairend的readends,标记冗余 */ +static void markDupsForPairs(vector &vpRe, + set *dupIdx, + set *opticalDupIdx, + set *notDupIdx = nullptr) { - clearIdxAtPos(pos, dupIdx); // 删除该位点的冗余结果 - clearIdxAtPos(pos, opticalDupIdx); + if (vpRe.size() < 2) + { + if (vpRe.size() == 1) + { + // addSingletonToCount(libraryIdGenerator); + } + return; + } + int maxScore = 0; + const ReadEnds *pBest = nullptr; + /** All read ends should have orientation FF, FR, RF, or RR **/ + for (auto pe : vpRe) // 找分数最高的readend + { + if (pe->score > maxScore || pBest == nullptr) + { + maxScore = pe->score; + pBest = pe; + } + } + if (notDupIdx != nullptr) + { + notDupIdx->insert(pBest->read1IndexInFile); + notDupIdx->insert(pBest->read2IndexInFile); + } + if (!g_mdArg.READ_NAME_REGEX.empty()) // 检查光学冗余 + { + // trackOpticalDuplicates + } + for (auto pe : vpRe) // 对非best read标记冗余 + { + if (pe != pBest) // 非best + { + dupIdx->insert(pe->read1IndexInFile); // 添加read1 + if (pe->read2IndexInFile != pe->read1IndexInFile) + dupIdx->insert(pe->read2IndexInFile); // 添加read2 + } + } + + // if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) + // { + // addRepresentativeReadIndex(vpRe); + // } } -/* 删除某个位点的frag的冗余idx */ -static void rmFragIdxAtPos(const int64_t pos, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) +/* 处理一组非paired的readends,标记冗余 */ +static void markDupsForFrags(vector &vpRe, + bool containsPairs, + set *dupIdx, + set *notDupIdx = nullptr) { - delIdxAtPos(pos, &curArg->msFragDupIdx); // 删除该位点的冗余结果 - clearIdxAtPos(pos, &lastArg->msFragDupIdx); // 清除该位点的冗余结果 + if (containsPairs) + { + for (auto pe : vpRe) + { + if (!pe->IsPaired()) + { + dupIdx->insert(pe->read1IndexInFile); + } + } + } + else + { + int maxScore = 0; + const ReadEnds *pBest = nullptr; + for (auto pe : vpRe) + { + if (pe->score > maxScore || pBest == nullptr) + { + maxScore = pe->score; + pBest = pe; + } + } + if (notDupIdx != nullptr) + { + notDupIdx->insert(pBest->read1IndexInFile); + } + for (auto pe : vpRe) + { + if (pe != pBest) + { + dupIdx->insert(pe->read1IndexInFile); + } + } + } +} + +/* 找到与readend pos相等的所有readend */ +static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) +{ + auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::pairsCmp); + dst->insert(dst->end(), range.first, range.second); } /* 单线程生成readends (第一步)*/ @@ -60,10 +212,20 @@ static void generateReadEnds(SerailMarkDupArg *arg) { auto &p = *arg; auto &rnParser = g_vRnParser[0]; + + p.pairs.clear(); + p.frags.clear(); + p.unpairedDic.clear(); + p.unpairedPosArr.clear(); + /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read + set reSet; + + ReadEnds lastRe; + + for (int i = 0; i < p.bams.size(); ++i) // 循环处理每个read { - BamWrap *bw = p.vBam[i]; + BamWrap *bw = p.bams[i]; const int64_t bamIdx = p.bamStartIdx + i; if (bw->GetReadUnmappedFlag()) { @@ -74,65 +236,304 @@ static void generateReadEnds(SerailMarkDupArg *arg) else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 { ReadEnds fragEnd; + tm_arr[8].acc_start(); buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); - // if (fragEnd.posKey == 3547574 || fragEnd.posKey == 3547930) - // { - // cout << fragEnd.posKey << '\t' << bw->query_name() << endl; - // } - p.msFrag[fragEnd.posKey].insert(fragEnd); // 添加进frag集合 + tm_arr[8].acc_end(); + p.frags.push_back(fragEnd); // 添加进frag集合 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 { string key = bw->query_name(); if (p.unpairedDic.find(key) == p.unpairedDic.end()) { - p.unpairedDic[key] = fragEnd; + p.unpairedDic[key] = {p.taskSeq, fragEnd}; } else // 找到了pairend { - auto &pairedEnds = p.unpairedDic.at(key); + auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); - p.msPair[pairedEnds.posKey].insert(pairedEnds); + // if (pairedEnds.read1IndexInFile == 94 || pairedEnds.read1IndexInFile == 95) + // { + // cout << "pair score: " << pairedEnds.read1IndexInFile << ' ' << pairedEnds.score << endl; + // } + // if (pairedEnds.read1IndexInFile == 94) + // { + // lastRe = pairedEnds; + // } + // if (pairedEnds.read1IndexInFile == 95) + // { + // cout << "compare: " << (lastRe < pairedEnds) << ' ' << (pairedEnds < lastRe) << endl; + // + // } + p.pairs.push_back(pairedEnds); p.unpairedDic.erase(key); // 删除找到的pairend } } } } + // cout << "sort frags" << endl; + sortReadEndsArr(p.frags); + // sort(p.frags.begin(), p.frags.end()); + // cout << "sort pairs" << endl; + // sortReadEndsArr(p.pairs); + sort(p.pairs.begin(), p.pairs.end()); + // cout << "unpaired num: " << p.unpairedDic.size() << endl; + + // 把未匹配的pair对应的每个位点的pairs记录下来 + for (auto &e : p.unpairedDic) { + auto &unpair = e.second; + auto posKey = unpair.unpairedRE.posKey; + if (p.unpairedPosArr.find(posKey) == p.unpairedPosArr.end()) + getEqualRE(unpair.unpairedRE, p.pairs, &p.unpairedPosArr[posKey]); + } +} + +/* 处理pairs */ +static void processPairs(vector &readEnds, + set *dupIdx, + set *opticalDupIdx, + set *notDupIdx = nullptr) +{ + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) + { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 + vpCache.push_back(&re); // 处理一个潜在的冗余组 + else + { + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); // 不一样 + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + } + } + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); +} + +/* 处理frags */ +static void processFrags(vector &readEnds, + set *dupIdx, + set *notDupIdx = nullptr) +{ + bool containsPairs = false; + bool containsFrags = false; + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) + { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) + { + vpCache.push_back(&re); + containsPairs = containsPairs || re.IsPaired(); + containsFrags = containsFrags || !re.IsPaired(); + } + else + { + if (vpCache.size() > 1 && containsFrags) + { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + containsPairs = re.IsPaired(); + containsFrags = !re.IsPaired(); + } + } + if (vpCache.size() > 1 && containsFrags) + { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } } /* 单线程markdup (第二步)*/ static void markdups(SerailMarkDupArg *arg) { auto &p = *arg; + p.pairDupIdx.clear(); + p.pairOpticalDupIdx.clear(); + p.fragDupIdx.clear(); /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - unordered_set usUnpairedPos; // 该位置有还未找到pair的read - for (auto &ele : p.unpairedDic) - { - usUnpairedPos.insert(ele.second.posKey); - } // 先处理 pair - vector vRePotentialDup; // 有可能是冗余的reads - for (auto &e : p.msPair) // 按比对的位置先后进行遍历 - { - handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); - } + processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx); // 再处理frag - for (auto &e : p.msFrag) + processFrags(p.frags, &p.fragDupIdx); +} + +/* 获取交叉部分的数据 */ +static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst) +{ + const size_t leftEndIdx = leftArr.size() - 1; + const size_t rightStartIdx = 0; + size_t leftSpan = 0; + size_t rightSpan = 0; + + while (!(leftArr[leftEndIdx - leftSpan] < rightArr[rightStartIdx])) { - handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx); + leftSpan += 1; + if (leftSpan > leftEndIdx) + { + leftSpan = leftArr.size() - 1; + break; + } + } + + while (!(leftArr[leftEndIdx] < rightArr[rightSpan])) + { + rightSpan += 1; + if (rightSpan == rightArr.size() - 1) + break; + } + dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); + dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); + std::sort(dst->begin(), dst->end()); +} + +/* 将重叠部分的dup idx放进数据中 */ +static inline void refreshFragDupIdx(set &dupIdx, + set ¬DupIdx, + SerailMarkDupArg * lastArg, + SerailMarkDupArg *curArg) +{ + auto &lp = *lastArg; + auto &p = *curArg; + for (auto idx : dupIdx) + { + lp.fragDupIdx.insert(idx); + p.fragDupIdx.erase(idx); + } + for (auto idx : notDupIdx) + { + lp.fragDupIdx.erase(idx); + p.fragDupIdx.erase(idx); } } -// 将pair set中的pair添加进另一个pair set -static void mergeToPairSet(int64_t pos, map> &src, set *dst) +static inline void refreshPairDupIdx(set &dupIdx, + set &opticalDupIdx, + set ¬DupIdx, + SerailMarkDupArg *lastArg, + SerailMarkDupArg *curArg) { - if (src.find(pos) != src.end()) + auto &lp = *lastArg; + auto &p = *curArg; + for (auto idx : dupIdx) { - for (auto &pe : src[pos]) + lp.pairDupIdx.insert(idx); + p.pairDupIdx.erase(idx); + } + for (auto idx : opticalDupIdx) + { + lp.pairOpticalDupIdx.insert(idx); + p.pairOpticalDupIdx.erase(idx); + } + for (auto idx : notDupIdx) + { + lp.pairDupIdx.erase(idx); + lp.pairOpticalDupIdx.erase(idx); + p.pairDupIdx.erase(idx); + p.pairOpticalDupIdx.erase(idx); + } +} + +/* 处理未匹配的部分 */ +static inline void processUnpairedPosForCalc(UnpairedNameMap &lpUnpairedDic, + UnpairedPositionMap &lpUnpairedPosArr, + UnpairedNameMap &pUnpairedDic, + UnpairedPositionMap &pUnpairedPosArr, + vector &pairs, + map &recalcPos, + bool addToLast = false) +{ + recalcPos.clear(); + for (auto itr = pUnpairedDic.begin(); itr != pUnpairedDic.end();) + { + auto &readName = itr->first; + + if (lpUnpairedDic.find(readName) != lpUnpairedDic.end()) { - dst->insert(pe); + auto &posInfo = lpUnpairedDic[readName]; + auto posKey = posInfo.unpairedRE.posKey; + auto &posReArr = lpUnpairedPosArr[posKey]; + + modifyPairedEnds(itr->second.unpairedRE, &posInfo.unpairedRE); + posKey = posInfo.unpairedRE.posKey; + if (recalcPos.find(posKey) == recalcPos.end()) // 如果之前没有这个位点 + getEqualRE(posInfo.unpairedRE, pairs, &posReArr); + recalcPos[posKey] = posInfo.taskSeq; + posReArr.push_back(posInfo.unpairedRE); + std::sort(posReArr.begin(), posReArr.end()); + + lpUnpairedDic.erase(readName); + itr = pUnpairedDic.erase(itr); } - // src.erase(pos); + else + { + if (addToLast) // 将数据添加进遗留数据中 + { + auto posKey = itr->second.unpairedRE.posKey; + lpUnpairedDic[readName] = itr->second; + lpUnpairedPosArr[posKey] = pUnpairedPosArr[posKey]; + } + ++itr; + } + } +} + +// 用来分别处理dup和optical dup +static void refeshTaskDupInfo(vector &addDup, + map &ndPosVal, + set &dupIdx, + set ¬DupIdx, + vector &dupArr) +{ + addDup.clear(); + ndPosVal.clear(); + // 去除之前有的,重复的 + for (auto i = dupIdx.begin(); i != dupIdx.end();) + { + auto itr = binaryFind(dupArr.begin(), dupArr.end(), *i); + if (itr != dupArr.end()) + { + i = dupIdx.erase(i); + } + else + { + ++i; + } + } + // 添加现有的 + auto di = dupIdx.begin(); + for (auto nidx : notDupIdx) + { + auto itr = binaryFind(dupArr.begin(), dupArr.end(), nidx); + if (itr != dupArr.end()) + { + ndPosVal[itr - dupArr.begin()] = *di++; + } + } + + while (di != dupIdx.end()) + addDup.push_back(*di++); + + for (auto pos : ndPosVal) + dupArr[pos.first] = pos.second; + dupArr.insert(dupArr.end(), addDup.begin(), addDup.end()); + std::sort(dupArr.begin(), dupArr.end()); +} + +/* 将遗留的冗余信息添加进对应的任务数据中 */ +static void addDupInfoToTask(map &seqTaskChanged, GlobalDataArg *gDataArg) +{ + auto &g = *gDataArg; + // 更新遗留的结果 + vector addDup; + map ndPosVal; + for (auto &e : seqTaskChanged) + { + refeshTaskDupInfo(addDup, ndPosVal, e.second.dupIdx, e.second.notDupIdx, g.dupIdxArr[e.first]); + refeshTaskDupInfo(addDup, ndPosVal, e.second.opticalDupIdx, e.second.notDupIdx, g.opticalDupIdxArr[e.first]); } } @@ -143,205 +544,113 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur auto &p = *curArg; auto &g = *gDataArg; - vector vRePotentialDup; // 有可能是冗余的reads + vector reArr; + set dupIdx; + set notDupIdx; + // 先处理重叠的frags + getIntersectData(lp.frags, p.frags, &reArr); + processFrags(reArr, &dupIdx, ¬DupIdx); + refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p); - int64_t lastPairPos = 0; - if (lp.msPair.size() > 0) - lastPairPos = lp.msPair.rbegin()->first; // 上一轮read最后到达的坐标 - for (auto &pair : p.msPair) // 重叠的pair + // 再处理重叠的pairs + reArr.clear(); + dupIdx.clear(); + notDupIdx.clear(); + set opticalDupIdx; + getIntersectData(lp.pairs, p.pairs, &reArr); + processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx); + refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p); + + // 处理之前未匹配的部分 + map recalcPos; + processUnpairedPosForCalc(lp.unpairedDic, + lp.unpairedPosArr, + p.unpairedDic, + p.unpairedPosArr, + p.pairs, + recalcPos); + for (auto &e : recalcPos) { - int64_t pos = pair.first; - if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 - break; - if (lp.msPair.find(pos) != lp.msPair.end()) // 上一个任务里也有这个位点,两个任务在相同的点位上都有数据,则需要重新计算该点位 - { - auto &pairedSet = lp.msPair[pos]; - rmPairIdxAtPos(pos, &lp, &p); - for (auto &curPair : pair.second) // 改变了lp当前位点的paired set - pairedSet.insert(curPair); - handlePairs(pos, pairedSet, vRePotentialDup, &lp.msPairDupIdx, &lp.msPairOpticalDupIdx); // 把结果放在上个任务里 - } - } - for (auto &unpair : lp.unpairedDic) // 上一个任务中没有找到匹配的pair - { - auto &readName = unpair.first; - auto &unpairEnd = unpair.second; - - if (p.unpairedDic.find(readName) != p.unpairedDic.end()) // 在当前任务中找到了匹配的pairend - { - auto &fe = p.unpairedDic.at(readName); - modifyPairedEnds(fe, &unpairEnd); - int64_t pos = unpairEnd.posKey; - auto &pairedSet = lp.msPair[pos]; - rmPairIdxAtPos(pos, &lp, &p); - pairedSet.insert(unpairEnd); // 改变了lp当前位点的paired set - mergeToPairSet(pos, p.msPair, &pairedSet); // 将当前任务在该位点的数据合并进pairset - handlePairs(pos, pairedSet, vRePotentialDup, &lp.msPairDupIdx, &lp.msPairOpticalDupIdx); - p.unpairedDic.erase(readName); - } - else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在全局中找 - { - auto &prePe = g.unpairedDic.at(readName); - modifyPairedEnds(unpairEnd, &prePe); - int64_t pos = prePe.posKey; - rmPairIdxAtPos(pos, &lp, &p); - clearPairIdxAtPos(pos, &g.msPairDupIdx, &g.msPairOpticalDupIdx); - auto &prePairSet = g.msPair[pos]; - prePairSet.insert(prePe); - mergeToPairSet(pos, lp.msPair, &prePairSet); - mergeToPairSet(pos, p.msPair, &prePairSet); - - // if (pos == 3547574) - // { - // cout <<"here-1: " << pos << '\t' << prePairSet.size() << '\t' << readName << '\t' - // << (p.msPair.find(pos) != p.msPair.end()) << '\t' - // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' - // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; - // } - - handlePairs(pos, prePairSet, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); - g.unpairedDic.erase(readName); - // g.msPair.erase(pos); - } - else // 插入全局数据中 - { - int64_t pos = unpairEnd.posKey; - rmPairIdxAtPos(pos, &lp, &p); - g.unpairedDic.insert(unpair); - mergeToPairSet(pos, lp.msPair, &g.msPair[pos]); - mergeToPairSet(pos, p.msPair, &g.msPair[pos]); - // if (pos == 3547574) - // { - // cout << "here-3: " << pos << '\t' << g.msPair[pos].size() << '\t' << readName << '\t' - // << lp.msPair[pos].size() << '\t' - // << p.msPair[pos].size() << '\t' - // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; - // } - } + auto posKey = e.first; + dupIdx.clear(); + notDupIdx.clear(); + opticalDupIdx.clear(); + processPairs(lp.unpairedPosArr[posKey], &dupIdx, &opticalDupIdx, ¬DupIdx); + refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p); } - int64_t lastFragPos = 0; - if (lp.msFrag.size() > 0) - lastFragPos = lp.msFrag.rbegin()->first; // 上一轮read最后到达的坐标, frag - for (auto &frag : p.msFrag) // 重叠的frag + // 遗留的未匹配的pair + processUnpairedPosForCalc(g.unpairedDic, + g.unpairedPosArr, + lp.unpairedDic, + lp.unpairedPosArr, + lp.pairs, + recalcPos, + true); + map seqTaskChanged; + for (auto &e : recalcPos) { - const int64_t pos = frag.first; - if (pos > lastFragPos) - break; - if (lp.msFrag.find(pos) != lp.msFrag.end()) // 上一个任务里也有这个位点,两个任务在相同的点位上都有数据,则需要重新计算该点位 - { - auto &fragSet = lp.msFrag[pos]; - rmFragIdxAtPos(pos, &lp, &p); - for (auto &curFrag : frag.second) // 改变了lp当前位点的paired set - fragSet.insert(curFrag); - handleFrags(pos, fragSet, vRePotentialDup, &lp.msFragDupIdx); // 把结果放在上个任务里 - } + auto posKey = e.first; + auto seqNum = e.second; + auto &t = seqTaskChanged[seqNum]; + // 在对应的任务包含的dup idx里修改结果数据 + processPairs(g.unpairedPosArr[posKey], &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + g.unpairedPosArr.erase(posKey); } + addDupInfoToTask(seqTaskChanged, &g); + + cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; + + // 将dupidx放进全局数据 + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); } /* 当所有任务结束后,global data里还有未处理的数据 */ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { - auto &p = *task; + auto &lp = *task; auto &g = *gDataArg; - vector vRePotentialDup; - for (auto &unpair : p.unpairedDic) // 最后一个任务中没有找到匹配的pair + map recalcPos; + // 遗留的未匹配的pair + processUnpairedPosForCalc(g.unpairedDic, + g.unpairedPosArr, + lp.unpairedDic, + lp.unpairedPosArr, + lp.pairs, + recalcPos, + true); + map seqTaskChanged; + for (auto &e : recalcPos) { - auto &readName = unpair.first; - auto &unpairEnd = unpair.second; - if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在全局中找 - { - - auto &prePe = g.unpairedDic.at(readName); - modifyPairedEnds(unpairEnd, &prePe); - int64_t pos = prePe.posKey; - clearPairIdxAtPos(pos, &p); - auto &prePairSet = g.msPair[pos]; - prePairSet.insert(prePe); - mergeToPairSet(pos, p.msPair, &prePairSet); - handlePairs(pos, prePairSet, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); - g.unpairedDic.erase(readName); - - // if (pos == 3547574) - // { - // cout << "here-2: " << pos << '\t' << prePairSet.size() << '\t' << readName << '\t' - // << (p.msPair.find(pos) != p.msPair.end()) << '\t' - // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' - // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; - // } - } - else // 插入全局数据中 - { - int64_t pos = unpairEnd.posKey; - g.unpairedDic.insert(unpair); - mergeToPairSet(pos, p.msPair, &g.msPair[pos]); - // if (pos == 3547574) - // { - // cout << "here-4: " << pos << '\t' << g.msPair[pos].size() << '\t' << readName << '\t' - // << (p.msPair.find(pos) != p.msPair.end()) << '\t' - // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' - // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; - // } - } + auto posKey = e.first; + auto seqNum = e.second; + auto &t = seqTaskChanged[seqNum]; + // 在对应的任务包含的dup idx里修改结果数据 + processPairs(g.unpairedPosArr[posKey], &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + g.unpairedPosArr.erase(posKey); } + // 更新遗留的结果 + addDupInfoToTask(seqTaskChanged, &g); - // 处理剩余的 - for (auto &pe : g.msPair) - { - // if (pe.first == 3547574) - // { - // cout << "here-4: " << pe.first << '\t' << g.msPair[pe.first].size() << endl; - // } - handlePairs(pe.first, pe.second, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); - } -} + cout << "last unpair info: " << g.unpairedPosArr.size() << '\t' << g.unpairedDic.size() << endl; -/* 功能函数,将冗余索引添加进结果中 */ -static void addIdxToSet(map> &taskDupIdx, set *resultSet) -{ - for (auto &idxSet : taskDupIdx) - { - // cout << idxSet.first << '\t' << idxSet.second.size() << endl; - for (auto idx : idxSet.second) - { - resultSet->insert(idx); - gData.test[idxSet.first].insert(idx); - } - } -} + // 将dupidx放进全局数据 + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); -/* 功能函数,将冗余(包括光学冗余)索引添加进结果中 */ -static void addOpticalIdxToSet(map> &taskDupIdx, set *resultSet, unordered_set *opticalResult) -{ - for (auto &idxSet : taskDupIdx) - { - // cout << idxSet.first << '\t' << idxSet.second.size() << endl; - for (auto idx : idxSet.second) - { - resultSet->insert(idx); - opticalResult->insert(idx); - gData.test[idxSet.first].insert(idx); - } - } -} - -/* 将每轮任务得到的冗余index添加到全局结果里 */ -static void addTaskIdxToSet(SerailMarkDupArg *task, GlobalDataArg *gDataArg) -{ - auto &p = *task; - auto &g = *gDataArg; - addIdxToSet(p.msPairDupIdx, &g.dupIdx); - addOpticalIdxToSet(p.msPairOpticalDupIdx, &g.dupIdx, &g.opticalDupIdx); - addIdxToSet(p.msFragDupIdx, &g.dupIdx); -} - -/* 将所有任务结束后,剩余的冗余index添加到全局结果里 */ -static void addGlobalIdxToSet(GlobalDataArg *gDataArg) -{ - auto &g = *gDataArg; - addIdxToSet(g.msPairDupIdx, &g.dupIdx); - addOpticalIdxToSet(g.msPairOpticalDupIdx, &g.dupIdx, &g.opticalDupIdx); + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); } /* 串行处理数据,标记冗余 */ @@ -353,57 +662,82 @@ static void serialMarkDups() BamBufType inBamBuf(g_gArg.use_asyncio); inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); // BamBufType inBamBuf(false); - //inBamBuf.Init(g_inBamFp, g_inBamHeader, 10 * 1024 * 1024); + // inBamBuf.Init(g_inBamFp, g_inBamHeader, 20 * 1024 * 1024); int64_t processedBamNum = 0; - SerailMarkDupArg *lastArgP = nullptr; - SerailMarkDupArg *sMdArg = nullptr; + + SerailMarkDupArg smdArg1, smdArg2; + SerailMarkDupArg *lastArgP = &smdArg1; + SerailMarkDupArg *curArgP = &smdArg2; + + bool isFirstRound = true; + int roundNum = 0; while (inBamBuf.ReadStat() >= 0) { + Timer t_round; // 读取bam文件中的read tm_arr[4].acc_start(); size_t readNum = inBamBuf.ReadBam(); tm_arr[4].acc_end(); cout << "read num: " << readNum << endl; - lastArgP = sMdArg; + // lastArgP = curArgP; tm_arr[6].acc_start(); - sMdArg = new SerailMarkDupArg({processedBamNum, - inBamBuf.GetBamArr()}); + curArgP->taskSeq = roundNum; + curArgP->bamStartIdx = processedBamNum; + curArgP->bams = inBamBuf.GetBamArr(); tm_arr[6].acc_end(); + tm_arr[0].acc_start(); - generateReadEnds(sMdArg); + Timer t1; + generateReadEnds(curArgP); + cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[0].acc_end(); tm_arr[1].acc_start(); - markdups(sMdArg); + t1.reinit(); + markdups(curArgP); + cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[1].acc_end(); - if (lastArgP != nullptr) + if (!isFirstRound) { tm_arr[2].acc_start(); - handleIntersectData(lastArgP, sMdArg, &gData); - addTaskIdxToSet(lastArgP, &gData); + t1.reinit(); + handleIntersectData(lastArgP, curArgP, &gData); + cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; + // addTaskIdxToSet(lastArgP, &gData); tm_arr[2].acc_end(); - tm_arr[7].acc_start(); - delete lastArgP; // 清除用过的数据 - tm_arr[7].acc_end(); + } + else + { + isFirstRound = false; } inBamBuf.ClearAll(); // 清理上一轮读入的数据 processedBamNum += readNum; + + // 交换 + auto tmp = lastArgP; + lastArgP = curArgP; + curArgP = tmp; + cout << "round time: " << t_round.seconds_elapsed() << endl; + roundNum++; + if (roundNum > 9){ +// break; + } } tm_arr[3].acc_start(); // 处理剩下的全局数据 - handleLastTask(sMdArg, &gData); - addTaskIdxToSet(sMdArg, &gData); - addGlobalIdxToSet(&gData); + handleLastTask(lastArgP, &gData); tm_arr[3].acc_end(); - // for (auto &e : gData.test) - // { - // cout << e.first << '\t' << e.second.size() << endl; - // } tm_arr[5].acc_end(); // 统计所有冗余index数量 - cout << "dup num : " << gData.dupIdx.size() << endl; + int64_t dupNum = 0; + unordered_set dup; + for (auto &arr : gData.dupIdxArr) + for (auto idx : arr) + dup.insert(idx); + dupNum += dup.size(); + cout << "dup num : " << dupNum << endl; cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; @@ -412,7 +746,11 @@ static void serialMarkDups() cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl; cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl; cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl; + cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; Timer::log_time("serial end "); + + //for (auto i : gData.dupArr) + // cout << i << endl; } \ No newline at end of file diff --git a/src/sam/utils/read_ends.h b/src/sam/utils/read_ends.h index da5b829..d9d76ce 100644 --- a/src/sam/utils/read_ends.h +++ b/src/sam/utils/read_ends.h @@ -101,6 +101,15 @@ struct ReadEnds : PhysicalLocation return areComparable; } + // 找某一个位置的所有readend时需要 + static bool pairsCmp(const ReadEnds &lhs, const ReadEnds &rhs) + { + int comp = lhs.read1ReferenceIndex - rhs.read1ReferenceIndex; + if (comp == 0) + comp = lhs.read1Coordinate - rhs.read1Coordinate; + return comp < 0; + } + /* 比对方向是否正向 */ bool IsPositiveStrand() const {