diff --git a/src/markdup/markdup.cpp b/src/markdup/markdup.cpp index 3916c19..9177e3e 100644 --- a/src/markdup/markdup.cpp +++ b/src/markdup/markdup.cpp @@ -164,6 +164,7 @@ int MarkDuplicates() { spdlog::info("{} optical reads found", opticalIdxQue.Size()); spdlog::info("{} represent reads found", repIdxQue.Size()); // spdlog::info("real dup size: {}", dupIdxQue.RealSize()); + // spdlog::info("real optical dup size: {}", opticalIdxQue.RealSize()); return 0; diff --git a/src/markdup/md_args.h b/src/markdup/md_args.h index 675e001..5ce449b 100644 --- a/src/markdup/md_args.h +++ b/src/markdup/md_args.h @@ -64,7 +64,7 @@ struct MarkDupsArg { int NUM_THREADS = 1; - size_t MAX_MEM = ((size_t)1) << 28; // << 31 // 最小2G + size_t MAX_MEM = ((size_t)1) << 30; // // 最小1G bool DUPLEX_IO = true; // 同时读写 diff --git a/src/markdup/md_pipeline.cpp b/src/markdup/md_pipeline.cpp index 6d9dd71..88dc822 100644 --- a/src/markdup/md_pipeline.cpp +++ b/src/markdup/md_pipeline.cpp @@ -18,6 +18,7 @@ #include "util/yarn.h" using std::vector; +using namespace std; namespace nsgv { @@ -30,40 +31,6 @@ extern DupResult gDupRes; extern PipelineArg gPipe; }; // namespace nsgv -/* 排序 */ -static inline void sortReadEndsArr(vector &arr) { - size_t blockSize = 64 * 1024; - if (arr.size() < blockSize) { - std::sort(arr.begin(), arr.end()); - return; - } - size_t blockNum = (arr.size() + blockSize - 1) / blockSize; - size_t crossNum = 1024; - size_t start, end, i, left, right; - std::sort(arr.begin(), arr.begin() + blockSize); - for (i = 1; i < blockNum; ++i) { - start = i * blockSize; - end = min(start + blockSize, arr.size()); - std::sort(arr.begin() + start, arr.begin() + end); - left = crossNum; - while (!(arr[start - left] < arr[start])) { - left = left << 1; - if (left >= blockSize) { - std::sort(arr.begin(), arr.end()); // 退化到普通排序 - return; - } - } - right = min(crossNum, end - start - 1); - - while (!(arr[start - 1] < arr[start + right])) { - right = min(right << 1, end - start - 1); - if (right == end - start - 1) - break; - } - std::sort(arr.begin() + start - left, arr.begin() + start + right); - } -} - /* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ static void markDupsForPairs(vector &vpRe, DPSet *dupIdx, MDSet *opticalDupIdx, DPSet *repIdx, MDSet *notDupIdx = nullptr, @@ -71,7 +38,6 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup if (vpRe.size() < 2) { return; } - int maxScore = 0; const ReadEnds *pBest = nullptr; /** All read ends should have orientation FF, FR, RF, or RR **/ @@ -110,14 +76,6 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup } for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best - - // if (1555341360 == pe->read1IndexInFile) { - // for (auto p : vpRe) { - // cout << "zzh find: " << p->read1IndexInFile << '\t' << p->read2IndexInFile << endl; - // } - // cout << "split" << endl; - // } - dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, @@ -177,7 +135,7 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, /* 找到与readend pos相等的所有readend */ static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { - auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::PairsLittleThan); // 只比对位点 + auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::CorLittleThan); // 只比对位点 dst->insert(dst->end(), range.first, range.second); } @@ -232,19 +190,6 @@ static void processFrags(vector &readEnds, DPSet *dupIdx, MDS } } -/* 单线程markdup (第二步)*/ -static void markdups(MarkDupDataArg *arg) { - auto &p = *arg; - p.fragDupIdx.clear(); - p.pairDupIdx.clear(); - p.pairOpticalDupIdx.clear(); - p.pairRepIdx.clear(); - /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - // 先处理 pair - processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx); - // 再处理frag - processFrags(p.frags, &p.fragDupIdx); -} /* 获取交叉部分的数据 */ static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, @@ -272,7 +217,10 @@ static inline void getIntersectData(vector &leftArr, vector } dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); - std::sort(dst->begin(), dst->end()); + if (isPairCmp) + std::sort(dst->begin(), dst->end(), ReadEnds::PairLittleThan); + else + std::sort(dst->begin(), dst->end(), ReadEnds::FragLittleThan); } /* 将frags重叠部分的dup idx放进数据中 */ @@ -290,59 +238,7 @@ static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬ } } -/* 最后合并数据并排序 */ -template -static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, - vector &cacheDupIdx1, vector &midArr1) { - // midArr.resize(0); - // cacheDupIdx.resize(0); - - vector cacheDupIdx; - vector midArr; - - cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); - std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); - - auto ai = dupArr.begin(); - auto ae = dupArr.end(); - auto bi = cacheDupIdx.begin(); - auto be = cacheDupIdx.end(); - - T val = 0; - while (ai != ae && bi != be) { - if (*ai < *bi) { - val = *ai++; - } else if (*bi < *ai) { - val = *bi++; - } else { - val = *bi++; // 相等的时候取后面的作为结果 - ai++; - } - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - while (ai != ae) { - val = *ai++; - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - while (bi != be) { - val = *bi++; - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - // spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); - // dupArr = midArr; - dupArr.clear(); - dupArr.assign(midArr.begin(), midArr.end()); - spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); -} - // for step 2 generate read ends - // multi-thread generate read ends static void mtGenerateReadEnds(void *data, long idx, int tid) { auto &p = *(PipelineArg *)data; @@ -369,9 +265,6 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { if (bw->GetReadUnmappedFlag()) { if (bw->b->core.tid == -1) // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - //if (p.genREOrder >= 3719) { - // cout << "inner core.tid check\t" << bw->b->core.tid << '\t' << i << "\t" << start_id << '\t' << end_id << endl; - //} break; } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 ReadEnds fragEnd; @@ -379,9 +272,6 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { frags.push_back(fragEnd); // 添加进frag集合 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 string key = bw->query_name(); - // if ("ERR194147.780864524" == key) { - // cout << "zzh find: " << key << '\t' << fragEnd.read1IndexInFile << '\t' << fragEnd.read2IndexInFile << endl; - // } if (unpairedDic.find(key) == unpairedDic.end()) { unpairedDic[key] = {taskSeq, fragEnd}; } else { // 找到了pairend @@ -396,25 +286,15 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { PROF_END(tprof[TP_gen][tid], gen); PROF_START(sort_frag); - // sortReadEndsArr(frags); - sort(frags.begin(), frags.end()); + sort(frags.begin(), frags.end(), ReadEnds::FragLittleThan); PROF_END(tprof[TP_sort_frag][tid], sort_frag); PROF_START(sort_pair); - sort(pairs.begin(), pairs.end()); + sort(pairs.begin(), pairs.end(), ReadEnds::PairLittleThan); PROF_END(tprof[TP_sort_pair][tid], sort_pair); - - // if (p.genREOrder >= 3719) { - // cout << "zzh genRE size: " << p.genREOrder << '\t' << tid << '\t' << frags.size() << '\t' << pairs.size() - // << '\t' << start_id << '\t' << end_id << '\t' << bams.size() << endl; - // } } static void doGenRE(PipelineArg &pipeArg) { - // return; - //if (pipeArg.genREOrder < 895) return; - //if (pipeArg.genREOrder < 440) return; - GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; ReadData &readData = pipeArg.readData; @@ -434,36 +314,17 @@ static void doGenRE(PipelineArg &pipeArg) { for (auto &val : genREData.unpairedDicArr[i]) { const string &key = val.first; const ReadEnds &fragEnd = val.second.unpairedRE; - - // if ("ERR194147.783448001" == key) { - // cout << "zzh find in doGen: " << key << '\t' << fragEnd.read1IndexInFile << '\t' << fragEnd.read2IndexInFile - // << endl; - // } - if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) { genREData.unpairedDic[key] = {readData.taskSeq, fragEnd}; - // if (fragEnd.read1IndexInFile == 1524046492 || fragEnd.read2IndexInFile == 1524046492) { - // cout << "zzh not find paired: " << key << "\t" << fragEnd.read1IndexInFile << '\t' - // << fragEnd.read2IndexInFile << endl; - // } } else { // 找到了pairend auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); pairs.push_back(pairedEnds); - // if (pairedEnds.read1IndexInFile == 1524046492 || pairedEnds.read2IndexInFile == 1524046492) { - // cout << "zzh find paired: " << key << '\t' << pairedEnds.read1IndexInFile << '\t' - // << pairedEnds.read2IndexInFile << "\t" << fragEnd.read1IndexInFile << '\t' - // << fragEnd.read2IndexInFile << '\t' << endl; - // } genREData.unpairedDic.erase(key); // 删除找到的pairend } } } - sort(pairs.begin(), pairs.end()); - // if (pipeArg.genREOrder >= 3719) { - // cout << "zzh genRE size: " << pipeArg.genREOrder << '\t' << pairs.size() << '\t' << genREData.unpairedDic.size() - // << '\t' << readData.bams.size() << '\t' << testNum << endl; - // } + sort(pairs.begin(), pairs.end(), ReadEnds::PairLittleThan); } // end for step 2 generate read ends @@ -479,37 +340,26 @@ static void doSort(PipelineArg &pipeArg) { smd.pairs.clear(); smd.frags.clear(); const ReadEnds *pRE; - ReadEndsHeap pairsHeap, fragsHeap; + ReadEndsHeap fragsHeap; + ReadEndsHeap pairsHeap; PROF_START(sort_pair); pairsHeap.Init(&genREData.pairsArr); - // if (pipeArg.sortOrder >= 3719) { - // cout << "zzh sort pair size: " << pairsHeap.Size() << endl; - // } while ((pRE = pairsHeap.Pop()) != nullptr) { smd.pairs.push_back(*pRE); } PROF_END(gprof[GP_sort_pair], sort_pair); PROF_START(sort_frag); fragsHeap.Init(&genREData.fragsArr); - // if (pipeArg.sortOrder >= 3719) { - // cout << "zzh sort frag size: " << fragsHeap.Size() << endl; - // } while ((pRE = fragsHeap.Pop()) != nullptr) { smd.frags.push_back(*pRE); } PROF_END(gprof[GP_sort_frag], sort_frag); - // if (pipeArg.sortOrder >= 929) { - // cout << "zzh sort size: " << pipeArg.sortOrder << '\t' << smd.pairs.size() << '\t' << smd.frags.size() << '\t' - // << genREData.pairsArr.size() << '\t' << genREData.fragsArr.size() << endl; - // } } // for step-4 sort static void doMarkDup(PipelineArg &pipeArg) { - // return; MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; mdData.taskSeq = pipeArg.markDupOrder; - cout << "zzh markdup: " << mdData.taskSeq << '\t' << pipeArg.markDupOrder << '\t' << pipeArg.intersectOrder << endl; mdData.clear(); auto tmpPtr = mdData.dataPtr; @@ -525,10 +375,6 @@ static void doMarkDup(PipelineArg &pipeArg) { PROF_START(markdup_frag); processFrags(smd.frags, &mdData.fragDupIdx); PROF_END(gprof[GP_markdup_frag], markdup_frag); - // if (mdData.taskSeq >= 929) { - // cout << "zzh markdup size: " << mdData.taskSeq << '\t' << smd.pairs.size() << '\t' << smd.frags.size() << '\t' - // << smd.unpairedDic.size() << '\t' << mdData.pairDupIdx.size() << '\t' << mdData.fragDupIdx.size() << endl; - // } } template @@ -599,7 +445,7 @@ static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) { getIntersectData(lsm.pairs, csm.pairs, &reArr, true); processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx); - refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, lp, cp); + refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, cp, lp); // 放在cp里,因为后面global里可能有相同的dup,防止多次出现 } // 在相邻的数据块之间寻找未匹配的readends, 找到匹配的放到lp里 @@ -607,15 +453,6 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { auto &interPairedData = lp.ckeyReadEndsMap; SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - - //if (lp.taskSeq >= 1856) - // return; // - //if (lp.taskSeq >= 1800) { - // cout << "zzh find in findUnpairedInDatas: " << lp.taskSeq << '\t' << cp.taskSeq << '\t' - // << lsm.unpairedDic.size() << '\t' << csm.unpairedDic.size() << '\t' << lp.ckeyReadEndsMap.size() << '\t' - // << cp.ckeyReadEndsMap.size() << endl; - //} - for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // 遍历上一个任务中的每个未匹配的read auto &lastUnpair = *itr; auto &readName = lastUnpair.first; @@ -626,22 +463,8 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { auto &curRE = curUnpairInfo.unpairedRE; modifyPairedEnds(curRE, &lastRE); // lastRE当做找到匹配后,完整的ReadEnds CalcKey ck(lastRE); - // if (lp.taskSeq == 1856) { - // cout << "zzh ck: " << ck.Read1Pos() << ' ' << ck.Read2Pos() - // << "\t find: " << (interPairedData.find(ck) != interPairedData.end()) << '\t' << interPairedData.size() << endl; - // } auto &pairArr = interPairedData[ck]; pairArr.push_back(lastRE); - - // if (lastRE.read1IndexInFile == 1555341360 || lastRE.read2IndexInFile == 1555341360 || - // lastRE.read1IndexInFile == 1555341145 || lastRE.read2IndexInFile == 1555341145) { - // cout << "zzh find in lp: " << pairArr.size() << '\t' << readName << '\t' << lastRE.read1IndexInFile - // << '\t' << lastRE.read2IndexInFile << '\t' << ck.Read1Pos() << '\t' << ck.Read2Pos() << endl; - // for (auto &p : pairArr) { - // cout << "reads in arr: " << p.read1IndexInFile << '\t' << p.read2IndexInFile << '\t' << p.score - // << endl; - // } - // } // 从dict中清除配对后的readends csm.unpairedDic.erase(readName); itr = lsm.unpairedDic.erase(itr); @@ -655,12 +478,6 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { auto &interPairedData = g.ckeyReadEndsMap; SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - - if (lp.taskSeq >= 929) { - cout << "zzh findUnpairedInGlobal size: " << lp.taskSeq << '\t' << lsm.unpairedDic.size() << '\t' - << g.unpairedDic.size() << '\t' << g.ckeyReadEndsMap.size() << endl; - } - for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // 遍历上一个任务中的每个未匹配的read auto &lastUnpair = *itr; auto &readName = lastUnpair.first; @@ -673,14 +490,6 @@ static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { CalcKey ck(gRE); auto &pairArr = interPairedData[ck]; pairArr.push_back(gRE); - // if (gRE.read1IndexInFile == 1555341360 || gRE.read2IndexInFile == 1555341360 || - // gRE.read1IndexInFile == 1555341145 || gRE.read2IndexInFile == 1555341145) { - // cout << "zzh find in global: " << pairArr.size() << '\t' << readName << '\t' << gRE.read1IndexInFile << '\t' << gRE.read2IndexInFile - // << '\t' << ck.Read1Pos() << '\t' << ck.Read2Pos() << endl; - // for (auto &p : pairArr) { - // cout << "reads in arr: " << p.read1IndexInFile << '\t' << p.read2IndexInFile << '\t' << p.score << endl; - // } - // } // 从dict中清除配对后的readends g.unpairedDic.erase(readName); itr = lsm.unpairedDic.erase(itr); @@ -688,9 +497,6 @@ static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { ++itr; } } - for (auto &unPair : lsm.unpairedDic) { - g.unpairedDic.insert(unPair); - } } static void putDupinfoToGlobal(IntersectData &g, MarkDupData &lp) { @@ -714,57 +520,37 @@ static void putDupinfoToGlobal(IntersectData &g, MarkDupData &lp) { // for step-5 handle intersect data static void doIntersect(PipelineArg &pipeArg) { // spdlog::info("intersect order: {}", pipeArg.intersectOrder); - // return; - // last, current, next - const int kInitIntersectOrder = 2; + const int kInitIntersectOrder = 1; IntersectData &g = pipeArg.intersectData; - MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 2) % pipeArg.MARKBUFNUM]; - MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; - MarkDupData &np = pipeArg.markDupData[(pipeArg.intersectOrder) % pipeArg.MARKBUFNUM]; + MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; + MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder) % pipeArg.MARKBUFNUM]; SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - SortMarkData &nsm = *(SortMarkData *)np.dataPtr; - - // if (pipeArg.sortOrder >= 929) { - // cout << "zzh intersect size: " << pipeArg.intersectOrder << '\t' << lp.taskSeq << '\t' - // << lsm.pairs.size() << '\t' << lsm.frags.size() << '\t' << lsm.unpairedDic.size()<< '\t' - // << csm.pairs.size() << '\t' << csm.frags.size() << '\t' << csm.unpairedDic.size()<< '\t' - // << nsm.pairs.size() << '\t' << nsm.frags.size() << '\t' << nsm.unpairedDic.size()<< '\t' - // << endl; - // } // 处理相邻数据块之间重叠的部分 - if (pipeArg.intersectOrder == kInitIntersectOrder) { // 第一次运行,需要处理lp和cp - processIntersectFragPairs(lp, cp); - } - processIntersectFragPairs(cp, np); - // if (pipeArg.sortOrder >= 929) { - // cout << "zzh intersect lp taskSeq: " << lp.taskSeq << '\t' << pipeArg.markDupData[0].taskSeq << '\t' - // << pipeArg.markDupData[1].taskSeq << '\t' << pipeArg.markDupData[2].taskSeq << '\t' - // << pipeArg.markDupData[3].taskSeq << '\t' << endl; - // } + processIntersectFragPairs(lp, cp); // 检查确保lp和np之间没有数据交叉 - int64_t lastRightPos = 0, curLeftPos = INT64_MAX, curRightPos = INT64_MAX, nextLeftPos = INT64_MAX; - if (lsm.frags.size() > 0) lastRightPos = lsm.frags.back().posKey; - if (csm.frags.size() > 0) { - curLeftPos = csm.frags[0].posKey; - curRightPos = csm.frags.back().posKey; + int64_t lastLeft = INT64_MIN, lastRight = INT64_MAX, curLeft = INT64_MAX, curRight = INT64_MAX; + if (lsm.pairs.size() > 0) { + lastLeft = lsm.frags[0].Left(); + lastRight = lsm.frags.back().Left(); } - if (nsm.frags.size() > 0) nextLeftPos = nsm.frags[0].posKey; - if (lastRightPos >= nextLeftPos) { - spdlog::error("previous data can not contain readends included by next data block! {} {}", lastRightPos, nextLeftPos); + if (csm.pairs.size() > 0) { + curLeft = csm.frags[0].Left(); + curRight = csm.frags.back().Left(); } - // 在相邻数据块之间查找之前未匹配的readends - if (pipeArg.intersectOrder == kInitIntersectOrder) { // 第一次运行,需要处理lp和cp - findUnpairedInDatas(lp, cp); + + if (g.rightPos >= curLeft) { + spdlog::error("previous data can not contain readends included by next data block! {} {} {} {} {} {}", + lp.taskSeq, cp.taskSeq, g.rightPos, curLeft, lsm.pairs.size(), csm.pairs.size()); } - findUnpairedInDatas(lp, np); // 找到的匹配放到lp里 - findUnpairedInDatas(cp, np); // 找到的匹配放到cp里 - // 把lp中未匹配的放到global里保存 - findUnpairedInGlobal(g, lp); + g.rightPos = lastRight; + + findUnpairedInDatas(lp, cp); // 找到的匹配放到lp里 + findUnpairedInGlobal(g, cp); // 把cp中未匹配的放到global里保存 // 处理lp中的新找到的匹配 TaskSeqDupInfo t; @@ -772,46 +558,58 @@ static void doIntersect(PipelineArg &pipeArg) { auto &ckVal = *itr; auto &ck = ckVal.first; auto &pairArr = ckVal.second; - getEqualRE(pairArr[0], lsm.pairs, &pairArr); - getEqualRE(pairArr[0], csm.pairs, &pairArr); - // 在cp的ckeyReadEndsMap里找一下 - auto cpKeyItr = cp.ckeyReadEndsMap.find(ck); - if (cpKeyItr != cp.ckeyReadEndsMap.end()) { - auto &cpPairArr = cpKeyItr->second; - pairArr.insert(pairArr.end(), cpPairArr.begin(), cpPairArr.end()); - cp.ckeyReadEndsMap.erase(cpKeyItr); - } - if (ck.Read2Pos() <= curRightPos) { // 必须在当前数据块的范围内,否则放到global里 - sort(pairArr.begin(), pairArr.end()); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, - &t.notRepIdx); + getEqualRE(pairArr[0], lsm.pairs, &pairArr); // 如果不在计算范围内,会放在global里 + if (ck.Right() <= lastRight) { // 必须在当前数据块的范围内, 才进行处理 + if (ck.Left() >= curLeft) { // 在交叉的范围内才去加上这些在cp中的数据 + getEqualRE(pairArr[0], csm.pairs, &pairArr); + } + // 在global里找一找ck + auto gitr = g.ckeyReadEndsMap.find(ck); + if (gitr != g.ckeyReadEndsMap.end()) { + auto &gPairArr = gitr->second; + pairArr.insert(pairArr.end(), gPairArr.begin(), gPairArr.end()); + g.ckeyReadEndsMap.erase(gitr); + } + sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); itr = lp.ckeyReadEndsMap.erase(itr); } else { ++itr; } } + // 处理找到匹配的global数据 for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { auto &ckVal = *itr; auto &ck = ckVal.first; - if (ck.Read2Pos() < curLeftPos) { // 只有在这个范围内,对应位点的所有reads才完全都包含了 - auto &pairArr = ckVal.second; - sort(pairArr.begin(), pairArr.end()); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); + auto &pairArr = ckVal.second; + if (ck.Left() >= lastLeft) { + getEqualRE(pairArr[0], lsm.pairs, &pairArr); + } + if (ck.Right() <= lastRight) { // 只有在这个范围内,对应位点的所有reads才完全都包含了 + sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); itr = g.ckeyReadEndsMap.erase(itr); } else { ++itr; } } + + // 剩余的在lp中没处理的放到global里 for (auto &ckVal : lp.ckeyReadEndsMap) { - g.ckeyReadEndsMap.insert(ckVal); + auto &pairArr = g.ckeyReadEndsMap[ckVal.first]; + pairArr.insert(pairArr.end(), ckVal.second.begin(), ckVal.second.end()); + } lp.ckeyReadEndsMap.clear(); // 更新一下冗余结果 refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); // 处理g中新找到的匹配 - putDupinfoToGlobal(g, lp); + + for (auto &unPair : lsm.unpairedDic) { + g.unpairedDic.insert(unPair); + } } static void *pipeRead(void *data) { @@ -845,10 +643,6 @@ static void *pipeRead(void *data) { inBamBuf.ClearAll(); // 清理上一轮读入的数据 pipeArg.readOrder += 1; // for next yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 - - if (pipeArg.readOrder >= 3719) { - cout << "zzh read size: " << pipeArg.readData.bams.size() << endl; - } } spdlog::info("total reads processed {}, last order {}", readNumSum, pipeArg.readOrder); return 0; @@ -976,7 +770,7 @@ static void *pipeMarkDup(void *data) { cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl; yarn::POSSESS(pipeArg.markDupSig); pipeArg.markDupFinish = 1; - yarn::TWIST(pipeArg.markDupSig, yarn::TO, 3 + pipeArg.MARKBUFNUM); + yarn::TWIST(pipeArg.markDupSig, yarn::TO, 2 + pipeArg.MARKBUFNUM); break; } /* 冗余检测 readends */ @@ -995,11 +789,12 @@ static void *pipeMarkDup(void *data) { } static void *pipeIntersect(void *data) { PipelineArg &pipeArg = *(PipelineArg *)data; - pipeArg.intersectOrder = 2; + const int kInitIntersectOrder = 1; + pipeArg.intersectOrder = kInitIntersectOrder; while (1) { PROF_START(intersect_wait); yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, 2); // 等待有数据 + yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, kInitIntersectOrder); // 等待有数据 yarn::RELEASE(pipeArg.markDupSig); PROF_END(gprof[GP_intersect_wait], intersect_wait); @@ -1031,49 +826,29 @@ static void *pipeIntersect(void *data) { /* 当所有任务结束后,global data里还有未处理的数据 */ static void processLastData(PipelineArg &pipeArg) { IntersectData &g = pipeArg.intersectData; - MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 2) % pipeArg.MARKBUFNUM]; - MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; - + MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; - SortMarkData &csm = *(SortMarkData *)cp.dataPtr; - - spdlog::info("last taskseq: lp {}, cp {}, {}, {}", lp.taskSeq, cp.taskSeq, lp.pairDupIdx.size(), cp.pairDupIdx.size()); - spdlog::info("last data size: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); - spdlog::info("last pair frags: {}, {}, {}, {}", lsm.frags.size(), lsm.pairs.size(), csm.frags.size(), csm.pairs.size()); - - // 把lp中未匹配的放到global里保存 - findUnpairedInGlobal(g, lp); - findUnpairedInGlobal(g, cp); - - spdlog::info("last data size after: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); - - // 处理lp中的新找到的匹配 - TaskSeqDupInfo t; - for (auto &ckVal : lp.ckeyReadEndsMap) { - auto &ck = ckVal.first; - auto &pairArr = ckVal.second; - getEqualRE(pairArr[0], lsm.pairs, &pairArr); - getEqualRE(pairArr[0], csm.pairs, &pairArr); - sort(pairArr.begin(), pairArr.end()); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); + int64_t lastLeft = INT64_MIN; + if (lsm.pairs.size() > 0) { + lastLeft = lsm.frags[0].Left(); } // 处理找到匹配的global数据 + TaskSeqDupInfo t; for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { auto &ckVal = *itr; auto &ck = ckVal.first; auto &pairArr = ckVal.second; - sort(pairArr.begin(), pairArr.end()); + if (ck.Left() >= lastLeft) { + getEqualRE(pairArr[0], lsm.pairs, &pairArr); + } + sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan ); processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); itr = g.ckeyReadEndsMap.erase(itr); } // 更新一下冗余结果 - refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); + refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp); // 处理g中新找到的匹配 - putDupinfoToGlobal(g, lp); - putDupinfoToGlobal(g, cp); - - spdlog::info("last data size: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); } static void parallelPipeline() { diff --git a/src/markdup/md_pipeline.h b/src/markdup/md_pipeline.h index 5dbfb3b..4cfd189 100644 --- a/src/markdup/md_pipeline.h +++ b/src/markdup/md_pipeline.h @@ -113,6 +113,7 @@ struct DupResult { struct IntersectData { UnpairedNameMap unpairedDic; // 用来寻找pair end CkeyReadEndsMap ckeyReadEndsMap; + int64_t rightPos = 0; // 每个task对应一个vector vector> &dupIdxArr; @@ -141,7 +142,7 @@ struct IntersectData { struct PipelineArg { static const int GENBUFNUM = 2; static const int SORTBUFNUM = 2; - static const int MARKBUFNUM = 4; + static const int MARKBUFNUM = 3; uint64_t readOrder = 0; uint64_t genREOrder = 0; uint64_t sortOrder = 0; @@ -212,14 +213,20 @@ struct REArrIdIdx { const ReadEnds *pE = nullptr; }; -struct REGreaterThan { - bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; } +struct REFragGreaterThan { + bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return ReadEnds::FragLittleThan(*b.pE, *a.pE); } }; +struct REPairGreaterThan { + bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return ReadEnds::PairLittleThan(*b.pE, *a.pE); + } +}; + +template struct ReadEndsHeap { // 将冗余索引和他对应的task vector对应起来 vector> *arr2d; - priority_queue, REGreaterThan> minHeap; + priority_queue, CMP> minHeap; uint64_t popNum = 0; int Init(vector> *_arr2d) { diff --git a/src/markdup/md_types.h b/src/markdup/md_types.h index 064aead..d594cfc 100644 --- a/src/markdup/md_types.h +++ b/src/markdup/md_types.h @@ -32,38 +32,70 @@ struct CalcKey { int32_t read1Coordinate = -1; int32_t read2ReferenceIndex = -1; int32_t read2Coordinate = -1; + int64_t left = -1; + int64_t right = -1; CalcKey() {} + static CalcKey MaxCK() { + CalcKey ck; + ck.left = ck.right = INT64_MAX; + return ck; + } + static CalcKey MinCK() { + CalcKey ck; + ck.left = ck.right = INT64_MIN; + return ck; + } + CalcKey(const ReadEnds &re) { orientation = re.orientation; read1ReferenceIndex = re.read1ReferenceIndex; read1Coordinate = re.read1Coordinate; read2ReferenceIndex = re.read2ReferenceIndex; read2Coordinate = re.read2Coordinate; + left = Read1Pos(); + right = Read2Pos(); } int64_t Read1Pos() const { return BamWrap::bam_global_pos(read1ReferenceIndex, read1Coordinate); } - int64_t Read2Pos() const { return BamWrap::bam_global_pos(read2ReferenceIndex, read2Coordinate); } + int64_t Left() const { return left; } + int64_t Right() const { return right; } bool operator<(const CalcKey &o) const { int comp = read1ReferenceIndex - o.read1ReferenceIndex; if (comp == 0) comp = read1Coordinate - o.read1Coordinate; - // 需要orientation,因为要跟排序的比较方式和顺序一致 - if (comp == 0) - comp = orientation - o.orientation; if (comp == 0) comp = read2ReferenceIndex - o.read2ReferenceIndex; if (comp == 0) comp = read2Coordinate - o.read2Coordinate; + // 需要orientation,因为要跟排序的比较方式和顺序一致 + if (comp == 0) + comp = orientation - o.orientation; return comp < 0; } + bool operator <= (const CalcKey &o) const { + return *this < o || *this == o; + } bool operator==(const CalcKey &o) const { return read1ReferenceIndex == o.read1ReferenceIndex && read1Coordinate == o.read1Coordinate && orientation == o.orientation && read2ReferenceIndex == o.read2ReferenceIndex && read2Coordinate == o.read2Coordinate; } + bool operator<(const ReadEnds &o) const { + int comp = read1ReferenceIndex - o.read1ReferenceIndex; + if (comp == 0) + comp = read1Coordinate - o.read1Coordinate; + if (comp == 0) + comp = read2ReferenceIndex - o.read2ReferenceIndex; + if (comp == 0) + comp = read2Coordinate - o.read2Coordinate; + // 需要orientation,因为要跟排序的比较方式和顺序一致 + if (comp == 0) + comp = orientation - o.orientation; + return comp < 0; + } std::size_t operator()() const { size_t h1 = read1ReferenceIndex; h1 = (h1 << 40) | (read1Coordinate << 8) | orientation; @@ -247,8 +279,7 @@ struct DupIdxQueue { DupInfo nextDup = dupIdx; auto topIdx = minHeap.top(); - ofstream ofs("na12878.txt"); - ofstream ofs1("na12878-all.txt"); + // ofstream ofs("n.dup"); ofstream ofs1("n-all.dup"); while (dupIdx != -1) { ++len; @@ -264,12 +295,14 @@ struct DupIdxQueue { << endl; } } - ofs1 << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; - ofs << topIdx.dupIdx << endl; + + // ofs << topIdx.dupIdx << endl; ofs1 << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; + dupIdx = nextDup; preTop = topIdx; } - ofs.close(); + // ofs.close(); ofs1.close(); + cout << "RealSize: " << len << endl; return len; } }; \ No newline at end of file diff --git a/src/markdup/read_ends.h b/src/markdup/read_ends.h index 17045ce..d227b83 100644 --- a/src/markdup/read_ends.h +++ b/src/markdup/read_ends.h @@ -11,8 +11,11 @@ Date : 2023/11/3 #include #include -#include +#include +#include +#include +using namespace std; /** * Small interface that provides access to the physical location information * about a cluster. All values should be defaulted to -1 if unavailable. @@ -123,55 +126,85 @@ struct ReadEnds : PhysicalLocation { int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; if (comp == 0) comp = a.read1Coordinate - b.read1Coordinate; - if (comp == 0) - comp = a.orientation - b.orientation; if (compareRead2) { if (comp == 0) comp = a.read2ReferenceIndex - b.read2ReferenceIndex; if (comp == 0) comp = a.read2Coordinate - b.read2Coordinate; } + if (comp == 0) + comp = a.orientation - b.orientation; + return comp < 0; } - // 找某一个位置的所有readend时需要, 只对比位点,不对比orientation - static bool PairsLittleThan(const ReadEnds &a, const ReadEnds &b) { + static bool FragLittleThan(const ReadEnds &a, const ReadEnds &b) { int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; if (comp == 0) comp = a.read1Coordinate - b.read1Coordinate; - // 需要orientation,因为要跟排序的比较方式和顺序一致 - if (comp == 0) + if (comp == 0) // 这个放在坐标比较之前,因为在解析bam的时候,可能有的给read2ReferenceIndex赋值了,有的则没赋值 comp = a.orientation - b.orientation; if (comp == 0) comp = a.read2ReferenceIndex - b.read2ReferenceIndex; if (comp == 0) comp = a.read2Coordinate - b.read2Coordinate; + // if (comp == 0) + // comp = a.tile - b.tile; + // if (comp == 0) + // comp = a.x - b.x; // 由于picard的bug,用short类型来表示x,y,导致其可能为负数 + // if (comp == 0) + // comp - a.y - b.y; + if (comp == 0) + comp = (int)(a.read1IndexInFile - b.read1IndexInFile); + if (comp == 0) + comp = (int)(a.read2IndexInFile - b.read2IndexInFile); return comp < 0; } - // 比较函数 - bool operator<(const ReadEnds &o) const { - int comp = read1ReferenceIndex - o.read1ReferenceIndex; + void Print() { + std::cout << read1ReferenceIndex << '\t' << read1Coordinate << '\t' << read2ReferenceIndex << '\t' << read2Coordinate + << '\t' << orientation << '\t' << read1IndexInFile << '\t' << read2IndexInFile << std::endl; + } + + static bool PairLittleThan(const ReadEnds &a, const ReadEnds &b) { + int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; if (comp == 0) - comp = read1Coordinate - o.read1Coordinate; + comp = a.read1Coordinate - b.read1Coordinate; if (comp == 0) - comp = orientation - o.orientation; + comp = a.read2ReferenceIndex - b.read2ReferenceIndex; if (comp == 0) - comp = read2ReferenceIndex - o.read2ReferenceIndex; + comp = a.read2Coordinate - b.read2Coordinate; + if (comp == 0) // 这个放在坐标比较了之后,把坐标范围的放在之前,这样对分段数据块比较好处理 + comp = a.orientation - b.orientation; + // if (comp == 0) + // comp = a.tile - b.tile; + // if (comp == 0) + // comp = a.x - b.x; // 由于picard的bug,用short类型来表示x,y,导致其可能为负数 + // if (comp == 0) + // comp - a.y - b.y; if (comp == 0) - comp = read2Coordinate - o.read2Coordinate; - //if (comp == 0) - // comp = tile - o.tile; - //if (comp == 0) - // comp = x - o.x; - //if (comp == 0) - // comp - y - o.y; + comp = (int)(a.read1IndexInFile - b.read1IndexInFile); if (comp == 0) - comp = (int)(read1IndexInFile - o.read1IndexInFile); - if (comp == 0) - comp = (int)(read2IndexInFile - o.read2IndexInFile); + comp = (int)(a.read2IndexInFile - b.read2IndexInFile); return comp < 0; } + + static bool CorLittleThan(const ReadEnds &a, const ReadEnds &b) { + int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; + if (comp == 0) + comp = a.read1Coordinate - b.read1Coordinate; + if (comp == 0) + comp = a.read2ReferenceIndex - b.read2ReferenceIndex; + if (comp == 0) + comp = a.read2Coordinate - b.read2Coordinate; + if (comp == 0) // 这个放在坐标比较了之后,把坐标范围的放在之前,这样对分段数据块比较好处理 + comp = a.orientation - b.orientation; + return comp < 0; + } + + // for pairs only + int64_t Left() { return BamWrap::bam_global_pos(read1ReferenceIndex, read1Coordinate); } + int64_t Right() { return BamWrap::bam_global_pos(read2ReferenceIndex, read2Coordinate); } }; struct ReadEndsHash {