From f84d7bb0dc6c5efd1e2b5589785439e4ee9c94e2 Mon Sep 17 00:00:00 2001 From: zzh Date: Wed, 4 Sep 2024 11:08:03 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BA=86=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E6=9F=90=E4=BA=9B=E6=83=85=E5=86=B5=E4=B8=8B=E5=86=97=E4=BD=99?= =?UTF-8?q?=E4=B8=8D=E5=AE=8C=E5=85=A8=E7=9A=84bug=EF=BC=8C=E5=9C=A8?= =?UTF-8?q?=E6=83=85=E5=86=B5=E4=B8=89=E4=B8=AD=EF=BC=8C=E6=8A=8A=E5=85=A8?= =?UTF-8?q?=E9=83=A8=E6=95=B0=E6=8D=AE=E9=83=BD=E6=94=BE=E5=88=B0=E5=90=8E?= =?UTF-8?q?=E6=9D=A5=E7=9A=84task=E9=87=8C=EF=BC=8C=E5=B0=B1=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 7 +- .vscode/settings.json | 8 +- run.sh | 11 +- src/sam/markdups/markdups.cpp | 21 +- src/sam/markdups/markdups_arg.h | 1 + src/sam/markdups/serial_md.cpp | 358 ++++++++++++++++++++++++-------- src/sam/markdups/serial_md.h | 68 ++++-- 7 files changed, 360 insertions(+), 114 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 2aabd92..a229eb9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,13 +13,14 @@ "program": "${workspaceRoot}/build/bin/picard_cpp", "args": [ "MarkDuplicates", - "--INPUT", "~/data/bam/100w.bam", - "--OUTPUT", "out.bam", + "--INPUT", "~/data/bam/zy_tumor.bam", + "--OUTPUT", "/dev/null", "--METRICS_FILE", "metrics.txt", "--num_threads", "1", - "--max_mem", "1G", + "--max_mem", "2G", "--verbosity", "DEBUG", "--asyncio", "true", + "--READ_NAME_REGEX", "" ], "cwd": "${workspaceFolder}", // 当前工作路径:当前文件所在的工作空间 } diff --git a/.vscode/settings.json b/.vscode/settings.json index 90de365..9b14081 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -95,6 +95,12 @@ "netfwd": "cpp", "timer": "cpp", "__nullptr": "cpp", - "__node_handle": "c" + "__node_handle": "c", + "__hash_table": "cpp", + "__tree": "cpp", + "queue": "cpp", + "stack": "cpp", + "__bit_reference": "cpp", + "__string": "cpp" } } \ No newline at end of file diff --git a/run.sh b/run.sh index c60a5f5..14a8e05 100755 --- a/run.sh +++ b/run.sh @@ -1,20 +1,21 @@ #input=~/data/bam/zy_normal.bam -#input=~/data/bam/zy_tumor.bam -input=~/data/bam/100w.bam +input=~/data/bam/zy_tumor.bam +#input=~/data/bam/100w.bam #input=~/data/bam/1kw.sam #input=~/data/bam/n1kw.sam time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ MarkDuplicates \ --INPUT $input \ - --OUTPUT ~/data/bam/out.bam \ + --OUTPUT /dev/null \ --INDEX_FORMAT BAI \ --num_threads 1 \ --max_mem 2G \ --verbosity DEBUG \ - --asyncio true #\ + --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" + #--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX "" #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" - + #--READ_NAME_REGEX "" # --INPUT /mnt/d/data/100w.bam \ # --INPUT /mnt/d/data/zy_normal.bam \ diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index a93051e..2d0e2d6 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -154,13 +154,15 @@ int MarkDuplicates(int argc, char *argv[]) { // BamBufType inBuf(false); BamBufType inBuf(g_gArg.use_asyncio); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); - DupIdxQueue dupIdxQue; + DupIdxQueue dupIdxQue, repIdxQue; dupIdxQue.Init(&gData.dupIdxArr); + repIdxQue.Init(&gData.repIdxArr); Timer tw; cout << "dupsize: " << dupIdxQue.Size() << endl; uint64_t bamIdx = 0; DupInfo dupIdx = dupIdxQue.Pop(); - + DupInfo repIdx = repIdxQue.Pop(); + exit(0); while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); @@ -168,12 +170,21 @@ int MarkDuplicates(int argc, char *argv[]) { for (size_t i = 0; i < inBuf.Size(); ++i) { /* 判断是否冗余 */ if (bamIdx == dupIdx) { - cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl; + //if (dupIdx.dupSet != 0) + // cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl; // 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同 while((dupIdx = dupIdxQue.Pop()) == bamIdx); } - if (bamIdx == -1) { // repressent - + if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent + // cerr << bamIdx << " " << repIdx.repIdx << " " << repIdx.dupSet << endl; + while (repIdx == bamIdx || repIdx.dupSet == 0) { + if (repIdxQue.Size() > 0) + repIdx = repIdxQue.Pop(); + else { + repIdx = -1; + break; + } + } } if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); diff --git a/src/sam/markdups/markdups_arg.h b/src/sam/markdups/markdups_arg.h index a308d9e..1d167ad 100644 --- a/src/sam/markdups/markdups_arg.h +++ b/src/sam/markdups/markdups_arg.h @@ -214,6 +214,7 @@ struct MarkDupsArg /* "The scoring strategy for choosing the non-duplicate among candidates." */ ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::SUM_OF_BASE_QUALITIES; + //ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::TOTAL_MAPPED_REFERENCE_LENGTH; /* "The program record ID for the @PG record(s) created by this program. Set to null to disable " + "PG record creation. This string may have a suffix appended to avoid collision with other " + diff --git a/src/sam/markdups/serial_md.cpp b/src/sam/markdups/serial_md.cpp index 62359b7..a9283bb 100644 --- a/src/sam/markdups/serial_md.cpp +++ b/src/sam/markdups/serial_md.cpp @@ -10,7 +10,6 @@ #include #include -#include #include #include "dup_metrics.h" @@ -19,7 +18,6 @@ #include "shared_args.h" using std::cout; -using std::set; using std::vector; /* 查找 */ @@ -65,8 +63,9 @@ static inline void sortReadEndsArr(vector &arr) { } /* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ -static void markDupsForPairs(vector &vpRe, set *dupIdx, set *opticalDupIdx = nullptr, - set *notDupIdx = nullptr, set *notOpticalDupIdx = nullptr) { +static void markDupsForPairs(vector &vpRe, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { if (vpRe.size() < 2) { if (vpRe.size() == 1) { // addSingletonToCount(libraryIdGenerator); @@ -78,7 +77,8 @@ static void markDupsForPairs(vector &vpRe, set *dupId } int maxScore = 0; const ReadEnds *pBest = nullptr; -// int maxOperateTime = 0; + // bool print = false; + // int maxOperateTime = 0; /** All read ends should have orientation FF, FR, RF, or RR **/ for (auto pe : vpRe) { // 找分数最高的readend // maxOperateTime = max(maxOperateTime, pe->oprateTime); @@ -87,7 +87,21 @@ static void markDupsForPairs(vector &vpRe, set *dupId maxScore = pe->score; pBest = pe; } + /* + if (pe->read1IndexInFile == 255830545) { + print = true; + } + */ } + /* + if (print) { + cout << "mark pair find: " << endl; + for (auto pe : vpRe) { + cout << pe->read1IndexInFile << " " << pe->read2IndexInFile << " " << pe->score << endl; + } + cout << "mark pair end: " << endl; + } + */ // cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" << pBest->read2IndexInFile << endl; // if (maxOperateTime == 0) ++zzhtestnum; if (notDupIdx != nullptr) { @@ -130,15 +144,25 @@ static void markDupsForPairs(vector &vpRe, set *dupId } } } - // 在输出的bam文件中添加tag + // 在输出的bam文件中添加tag, best作为dupset的代表 if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) { - // addRepresentativeReadIndex(vpRe); // 每次都更新就行,用最新的覆盖之前的(如果之前有) + repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + if (notRepIdx != nullptr) { + for (auto pe : vpRe) { + if (pe != pBest) { + notRepIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + notRepIdx->insert(pe->read2IndexInFile); + } + } + } } } /* 处理一组非paired的readends,标记冗余 */ -static void markDupsForFrags(vector &vpRe, bool containsPairs, set *dupIdx, - set *notDupIdx = nullptr) { +static void markDupsForFrags(vector &vpRe, bool containsPairs, DPSet *dupIdx, + MDSet *notDupIdx = nullptr) { if (containsPairs) { for (auto pe : vpRe) { if (!pe->IsPaired()) { @@ -182,7 +206,7 @@ static void generateReadEnds(SerailMarkDupArg *arg) { p.unpairedPosArr.clear(); /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - // set reSet; + // MDSet reSet; // ReadEnds lastRe; for (int i = 0; i < p.bams.size(); ++i) { // 循环处理每个read @@ -227,30 +251,37 @@ static void generateReadEnds(SerailMarkDupArg *arg) { unpairArrInfo.taskSeq = p.taskSeq; unpairArrInfo.readNameSet.insert(e.first); } + if (p.taskSeq == 98) { + for (auto &re : p.unpairedPosArr[14293757783047].pairArr) { + cout << "task 99 unpair reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl; + } + } // cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << // endl; } /* 处理pairs */ -static void processPairs(vector &readEnds, set *dupIdx, set *opticalDupIdx, - set *notDupIdx = nullptr, set *notOpticalDupIdx = nullptr) { +static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { + // return; vector vpCache; // 有可能是冗余的reads const ReadEnds *pReadEnd = nullptr; for (auto &re : readEnds) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 vpCache.push_back(&re); // 处理一个潜在的冗余组 else { - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); // 不一样 + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); // 不一样 vpCache.clear(); vpCache.push_back(&re); pReadEnd = &re; } } - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); } /* 处理frags */ -static void processFrags(vector &readEnds, set *dupIdx, set *notDupIdx = nullptr) { +static void processFrags(vector &readEnds, DPSet *dupIdx, MDSet *notDupIdx = nullptr) { bool containsPairs = false; bool containsFrags = false; vector vpCache; // 有可能是冗余的reads @@ -282,9 +313,11 @@ static void markdups(SerailMarkDupArg *arg) { p.pairDupIdx.clear(); p.pairOpticalDupIdx.clear(); p.fragDupIdx.clear(); + p.pairRepIdx.clear(); /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ // 先处理 pair - processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx); + processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx); + // cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl; // 再处理frag processFrags(p.frags, &p.fragDupIdx); @@ -294,7 +327,7 @@ static void markdups(SerailMarkDupArg *arg) { static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, bool isPairCmp = false) { if (leftArr.empty() || rightArr.empty()) { - cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl; + // cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl; return; } const size_t leftEndIdx = leftArr.size() - 1; @@ -321,7 +354,7 @@ static inline void getIntersectData(vector &leftArr, vector } /* 将frags重叠部分的dup idx放进数据中 */ -static inline void refreshFragDupIdx(set &dupIdx, set ¬DupIdx, SerailMarkDupArg *lastArg, +static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬DupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { auto &lp = *lastArg; auto &p = *curArg; @@ -336,50 +369,56 @@ static inline void refreshFragDupIdx(set &dupIdx, set ¬DupI } /* 将pairs重叠部分的dup idx放进数据中 */ -static inline void refreshPairDupIdx(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, - set ¬OpticalDupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { +static inline void refreshPairDupIdx(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { auto &lp = *lastArg; auto &p = *curArg; - for (auto idx : dupIdx) { - lp.pairDupIdx.insert(idx); - p.pairDupIdx.erase(idx); - } - for (auto idx : opticalDupIdx) { - lp.pairOpticalDupIdx.insert(idx); - p.pairOpticalDupIdx.erase(idx); - } - for (auto idx : notDupIdx) { - lp.pairDupIdx.erase(idx); - //lp.pairOpticalDupIdx.erase(idx); - p.pairDupIdx.erase(idx); - //p.pairOpticalDupIdx.erase(idx); - } - for (auto idx : notOpticalDupIdx) { - lp.pairOpticalDupIdx.erase(idx); - p.pairOpticalDupIdx.erase(idx); - } + for (auto idx : dupIdx) { lp.pairDupIdx.insert(idx); p.pairDupIdx.erase(idx); } + for (auto idx : opticalDupIdx) { lp.pairOpticalDupIdx.insert(idx); p.pairOpticalDupIdx.erase(idx); } + for (auto idx : repIdx) { lp.pairRepIdx.insert(idx); p.pairRepIdx.erase(idx); } + // for (auto idx : notDupIdx) { + // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; + // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; + // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; + // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; + // lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx); + // } + for (auto idx : notOpticalDupIdx) { lp.pairOpticalDupIdx.erase(idx); p.pairOpticalDupIdx.erase(idx); } + for (auto idx : notRepIdx) { lp.pairRepIdx.erase(idx); p.pairRepIdx.erase(idx); } } // 用来分别处理dup和optical dup -static void refeshTaskDupInfo(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, - set ¬OpticalDupIdx, set &latterDupIdx, - set &latterOpticalDupIdx, set &latterNotDupIdx, - set &latterNotOpticalDupIdx) { - for (auto idx : dupIdx) latterDupIdx.insert(idx); +static void refeshTaskDupInfo(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + DPSet &latterDupIdx, MDSet &latterOpticalDupIdx, DPSet &latterRepIdx, + MDSet &latterNotDupIdx, MDSet &latterNotOpticalDupIdx, MDSet &latterNotRepIdx) { + for (auto idx : dupIdx) { + latterDupIdx.insert(idx); + // latterNotDupIdx.erase(idx); // 后来的更新为准 + } for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); + for (auto idx : repIdx) latterRepIdx.insert(idx); for (auto idx : notDupIdx) latterNotDupIdx.insert(idx); for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx); + for (auto idx : notRepIdx) latterNotRepIdx.insert(idx); } /* 最后合并数据并排序 */ -template -static void refeshFinalTaskDupInfo(set &dupIdx, set ¬DupIdx, vector &dupArr) { - vector midArr; +template +static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, + vector &cacheDupIdx, vector &midArr) { + midArr.resize(0); + cacheDupIdx.resize(0); + cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); + std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); auto ai = dupArr.begin(); - auto bi = dupIdx.begin(); auto ae = dupArr.end(); - auto be = dupIdx.end(); + auto bi = cacheDupIdx.begin(); + auto be = cacheDupIdx.end(); + //auto bi = dupIdx.begin(); + //auto be = dupIdx.end(); T val = 0; while (ai != ae && bi != be) { @@ -388,8 +427,8 @@ static void refeshFinalTaskDupInfo(set &dupIdx, set ¬DupIdx, vect } else if (*bi < *ai) { val = *bi++; } else { - val = *ai++; - bi++; + val = *bi++; // 相等的时候取后面的作为结果 + ai++; } if (notDupIdx.find(val) == notDupIdx.end()) { midArr.push_back(val); @@ -417,9 +456,12 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur auto &g = *gDataArg; vector reArr; - set dupIdx; - set notOpticalDupIdx; - set notDupIdx; + DPSet dupIdx; + MDSet opticalDupIdx; + DPSet repIdx; + MDSet notOpticalDupIdx; + MDSet notDupIdx; + MDSet notRepIdx; // 先处理重叠的frags getIntersectData(lp.frags, p.frags, &reArr); processFrags(reArr, &dupIdx, ¬DupIdx); @@ -429,15 +471,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur reArr.clear(); dupIdx.clear(); notDupIdx.clear(); - set opticalDupIdx; getIntersectData(lp.pairs, p.pairs, &reArr, true); - processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx, ¬OpticalDupIdx); - refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx, &lp, &p); + processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx); + refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, &lp, &p); + //cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl; + //cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl; // 处理之前未匹配的部分 map recalcPos; - set alreadyAdd; // 与该位点相同的pair都添加到数组里了 - set addToGlobal; + CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 + MDSet addToGlobal; int64_t prevLastPos = 0, nextFirstPos = 0; if (lp.frags.size() > 0) prevLastPos = lp.frags.back().posKey; @@ -446,8 +489,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur // cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl; for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read auto &readName = prevUnpair.first; + // if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" || + // readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") { + // cout << "read name found: " << lp.taskSeq << endl; + // } auto &prevPosInfo = prevUnpair.second; auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + // if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546 + // || prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) { + // cout << prevFragEnd.read1IndexInFile << "\t" << readName << endl; + // } if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read auto &nextPosInfo = p.unpairedDic[readName]; @@ -463,6 +514,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; + // if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546 || + // prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) { + // cout << "find in p: " << lp.taskSeq << "\t" << prevFragEnd.read1IndexInFile << "\t" << readName << endl; + // if (nextUnpairInfoP != nullptr) + // cout << "next p: " << nextUnpairInfoP->unpairedNum << endl; + // if (prevUnpairInfoP != nullptr) + // cout << "prev p: " << prevUnpairInfoP->unpairedNum << endl; + // cout << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; + // } + // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 // 1. // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; @@ -519,11 +580,13 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur prevPairArr.push_back(prevFragEnd); if (addDataToPos) { getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + getEqualRE(prevFragEnd, p.pairs, &nextPairArr); } // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) recalcPos[ck] = nextPosInfo.taskSeq; std::sort(prevPairArr.begin(), prevPairArr.end()); + std::sort(nextPairArr.begin(), nextPairArr.end()); } else { // next task在该位点没有unpair,那就把数据放到prev task里 auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr prevPairArr.push_back(prevFragEnd); @@ -549,6 +612,10 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur } p.unpairedDic.erase(readName); // 在next task里删除该read } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read + // if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" || + // readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") { + // cout << "find in g: " << prevFragEnd.read1IndexInFile << "\t" << readName << endl; + // } auto &remainPosInfo = g.unpairedDic[readName]; auto remainFragEnd = remainPosInfo.unpairedRE; int64_t remainPosKey = remainFragEnd.posKey; @@ -565,11 +632,14 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur int64_t prevPosKey = prevFragEnd.posKey; g.unpairedDic.insert(prevUnpair); addToGlobal.insert(prevPosKey); + // if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546) { + // cout << "here find" << endl; + // } } } map taskChanged; - set posProcessed; + MDSet posProcessed; for (auto &e : recalcPos) { auto posKey = e.first.read1Pos; if (posProcessed.find(posKey) != posProcessed.end()) @@ -583,49 +653,79 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur pairArrP = &g.unpairedPosArr[posKey].pairArr; else pairArrP = &lp.unpairedPosArr[posKey].pairArr; - processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx, &t.notOpticalDupIdx); + // if (taskSeq == 98) cout << "handle recalc pairs: " << taskSeq << "\t" << posKey << endl; + // if (p.taskSeq == 163) { + // cout << "final" << endl; + // } + processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); if (taskSeq < lp.taskSeq) g.unpairedPosArr.erase(posKey); } + // if (lp.taskSeq > 98) { + // // exit(0); + // } + // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) - for (auto posKey : addToGlobal) g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; + for (auto posKey : addToGlobal) { + // if (posKey == 14293757783047) { + // for (auto &re: lp.unpairedPosArr[posKey].pairArr) { + // cout << "lp reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl; + // } + // cout << "found in g: " << lp.taskSeq << "\t" << lp.unpairedPosArr[posKey].unpairedNum << "\t" << lp.unpairedPosArr[posKey].pairArr.size() << endl; + // } + g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; + } // 更新结果 for (auto &e : taskChanged) { auto taskSeq = e.first; auto &t = e.second; + // cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl; if (taskSeq < lp.taskSeq) { - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], - g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, + g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], + g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq]); } else if (taskSeq == lp.taskSeq) { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &lp, &p); + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &lp, &p); } else { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &p, &lp); // 把结果放到p中 + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &p, &lp); // 把结果放到p中 } } // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << // g.unpairedPosArr.size() << endl; cout << "calc g time: " << // t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据 - g.latterDupIdxArr.push_back(set()); - g.latterOpticalDupIdxArr.push_back(set()); - g.latterNotDupIdxArr.push_back(set()); - g.latterNotOpticalDupIdxArr.push_back(set()); + g.latterDupIdxArr.push_back(DPSet()); + g.latterOpticalDupIdxArr.push_back(MDSet()); + g.latterRepIdxArr.push_back(DPSet()); + g.latterNotDupIdxArr.push_back(MDSet()); + g.latterNotOpticalDupIdxArr.push_back(MDSet()); + g.latterNotRepIdxArr.push_back(MDSet()); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + // cout << vIdx.size() << endl; + // zzhtestnum += vIdx.size(); g.opticalDupIdxArr.push_back(vector()); auto &vOpticalIdx = g.opticalDupIdxArr.back(); vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); } /* 当所有任务结束后,global data里还有未处理的数据 */ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { + // cout << "last task start" << endl; auto &lp = *task; auto &g = *gDataArg; // 遗留的未匹配的pair @@ -643,6 +743,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { remainUnpairInfo.pairArr.push_back(remainFragEnd); g.unpairedDic.erase(readName); + } else { + g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 } } map taskChanged; @@ -654,7 +756,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { if (arr.size() > 1) { std::sort(arr.begin(), arr.end()); - processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + // cout << "last task before mark pair" << endl; + processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notRepIdx); } } // 更新结果 @@ -663,28 +766,104 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { for (auto &e : taskChanged) { auto taskSeq = e.first; auto &t = e.second; - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], - g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); + // cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl; + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, + g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], + g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq]); } - cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; + // cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; g.unpairedPosArr.clear(); g.unpairedDic.clear(); +/* + int taskSeq = 0; + for (auto &arr : g.dupIdxArr) { + cout << taskSeq << "\t" << arr.size(); + if (taskSeq < (int)g.dupIdxArr.size() - 1) + cout << "\t" << g.latterDupIdxArr[taskSeq].size() << "\t" << g.latterNotDupIdxArr[taskSeq].size() << endl; + else + cout << endl; + // if (taskSeq == 98) { + // vector v; + // v.insert(v.end(), g.latterDupIdxArr[taskSeq].begin(), g.latterDupIdxArr[taskSeq].end()); + // std::sort(v.begin(), v.end()); + // for (auto &idx : v) + // cout << idx.idx << " " << idx.repIdx << " " << idx.dupSet << endl; + // v.clear(); + // v.insert(v.end(), g.latterNotDupIdxArr[taskSeq].begin(), g.latterNotDupIdxArr[taskSeq].end()); + // std::sort(v.begin(), v.end()); + // for (auto &idx : v) cout << idx << endl; + // } + taskSeq++; + } +*/ // 将dupidx放进全局数据 - for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]); + vector cacheDupIdx; + vector midArr; + vector intCacheDupIdx; + vector intMidArr; + for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) { + /* + if (i == 98) { + cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl; + int inlatter = 0; + int innotdup = 0; + int latterinnotdup = 0; + for (auto idx : g.dupIdxArr[i]) { + if (g.latterDupIdxArr[i].find(idx) != g.latterDupIdxArr[i].end()) { + ++inlatter; + } + } + for (auto idx : g.dupIdxArr[i]) { + if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) { + cout << idx.idx << endl; + ++innotdup; + } + } + for (auto idx : g.latterDupIdxArr[i]) { + if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) { + ++latterinnotdup; + } + } + cout << inlatter << "\t" << innotdup << "\t" << latterinnotdup << endl; + } + */ + refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr); + /* + if (i == 98) { + cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl; + } + */ + } for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i]); + refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], intCacheDupIdx, intMidArr); + for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + // cout << "last " << vIdx.size() << endl; + // zzhtestnum += vIdx.size(); + /* + for (auto &arr : g.dupIdxArr) { + cout << taskSeq << "\t" << arr.size() << endl; + taskSeq++; + } + */ g.opticalDupIdxArr.push_back(vector()); auto &vOpticalIdx = g.opticalDupIdxArr.back(); vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); } /* 串行处理数据,标记冗余 */ @@ -771,17 +950,20 @@ void serialMarkDups() { int64_t opticalDupNum = 0; map dup; -// int taskSeq = 0; -// for (auto &arr : gData.dupIdxArr) { -// for (auto idx : arr) { -// if (dup.find(idx.idx) != dup.end()) { -// // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' -// // << idx << endl; -// } -// dup[idx.idx] = taskSeq; -// } -// taskSeq++; -// } + + int taskSeq = 0; + for (auto &arr : gData.dupIdxArr) { + for (auto idx : arr) { + if (dup.find(idx.idx) != dup.end()) { + // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' + // << idx << endl; + } + dup[idx.idx] = taskSeq; + } + // cout << taskSeq << "\t" << arr.size() << endl; + taskSeq++; + } + // #include // ofstream out("tumor_dup.txt"); @@ -794,7 +976,7 @@ void serialMarkDups() { for (auto &arr : gData.dupIdxArr) dupNum += arr.size(); for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); - cout << "dup num : " << dupNum << '\t' << dup.size() << endl; + cout << "dup num : " << dupNum << '\t' << dup.size() << "\t" << zzhtestnum << endl; cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; diff --git a/src/sam/markdups/serial_md.h b/src/sam/markdups/serial_md.h index 78fc550..f4782bb 100644 --- a/src/sam/markdups/serial_md.h +++ b/src/sam/markdups/serial_md.h @@ -2,13 +2,16 @@ #include #include +#include #include #include #include #include +#include using std::set; +using std::unordered_set; using std::string; using std::vector; @@ -28,6 +31,16 @@ struct CalcKey { comp = (int)(read2Pos - o.read2Pos); return comp < 0; } + bool operator==(const CalcKey &o) const { return read1Pos == o.read1Pos && read2Pos == o.read2Pos; } + std::size_t operator()(const CalcKey &o) const { + return std::hash()(read1Pos) ^ std::hash()(read2Pos); + } +}; + +struct CalcKeyHash { + std::size_t operator()(const CalcKey &o) const { + return std::hash()(o.read1Pos) ^ std::hash()(o.read2Pos); + } }; /* 用来记录冗余索引相关的信息 */ @@ -36,6 +49,7 @@ struct DupInfo { int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引 int16_t dupSet = 0; // dup set size + DupInfo() : DupInfo(-1, 0, 0) { } DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) { } DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {} bool operator<(const DupInfo &o) const { @@ -49,12 +63,38 @@ struct DupInfo { } }; +struct DupInfoHash { + std::size_t operator()(const DupInfo &o) const { return std::hash()(o.idx); } +}; + +struct DupInfoEqual { + bool operator()(const DupInfo &o1, const DupInfo &o2) const { return o1.idx == o2.idx; } + bool operator()(const DupInfo &o1, const int64_t &o2) const { return o1.idx == o2; } + bool operator()(const int64_t &o1, const DupInfo &o2) const { return o1 == o2.idx; } +}; + +template +// using MDSet = set; +// using MDSet = unordered_set; +using MDSet = tsl::robin_set; + +template +// using DPSet = set; +// using DPSet = unordered_set; +using DPSet = tsl::robin_set; + +template +//using CalcSet = set; +using CalcSet = tsl::robin_set; + /* 当遗留数据在当前任务找到了pair read后,进行冗余计算时候存放结果的数据结构 */ struct TaskSeqDupInfo { - set dupIdx; - set opticalDupIdx; - set notDupIdx; - set notOpticalDupIdx; + DPSet dupIdx; + MDSet opticalDupIdx; + DPSet repIdx; + MDSet notDupIdx; + MDSet notOpticalDupIdx; + MDSet notRepIdx; }; /* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */ @@ -62,7 +102,7 @@ struct UnpairedPosInfo { int unpairedNum = 0; int64_t taskSeq; vector pairArr; - set readNameSet; + MDSet readNameSet; }; // typedef unordered_map UnpairedNameMap; // typedef unordered_map UnpairedPositionMap; @@ -77,9 +117,10 @@ struct SerailMarkDupArg { vector bams; // 存放待处理的bam read vector pairs; // 成对的reads vector frags; // 暂未找到配对的reads - set pairDupIdx; // pair的冗余read的索引 - set pairOpticalDupIdx; // optical冗余read的索引 - set fragDupIdx; // frag的冗余read的索引 + DPSet pairDupIdx; // pair的冗余read的索引 + MDSet pairOpticalDupIdx; // optical冗余read的索引 + DPSet fragDupIdx; // frag的冗余read的索引 + DPSet pairRepIdx; // pair的dupset代表read的索引 UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 }; @@ -92,12 +133,15 @@ struct GlobalDataArg { // 每个task对应一个vector vector> dupIdxArr; vector> opticalDupIdxArr; + vector> repIdxArr; // 用来存放后续计算的数据 - vector> latterDupIdxArr; - vector> latterOpticalDupIdxArr; - vector> latterNotDupIdxArr; - vector> latterNotOpticalDupIdxArr; + vector> latterDupIdxArr; + vector> latterOpticalDupIdxArr; + vector> latterRepIdxArr; + vector> latterNotDupIdxArr; + vector> latterNotOpticalDupIdxArr; + vector> latterNotRepIdxArr; }; // 串行运行mark duplicate