diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..febf709 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,15 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${workspaceFolder}/**" + ], + "defines": [], + "cStandard": "c17", + "cppStandard": "gnu++17", + "intelliSenseMode": "linux-gcc-x64" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/run.sh b/run.sh index 3ee5811..a2fc5c3 100755 --- a/run.sh +++ b/run.sh @@ -12,3 +12,4 @@ time /home/zzh/work/GeneKit/picard_cpp/build/bin/picard_cpp \ # --INPUT /mnt/d/data/100w.bam \ # --INPUT /mnt/d/data/zy_normal.bam \ # zy_tumor +# tumor_region \ No newline at end of file diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 8ec880c..6dba1d5 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -39,7 +39,7 @@ using std::cout; #define BAM_BLOCK_SIZE 2 * 1024 * 1024 #define NO_SUCH_INDEX INT64_MAX -static Timer tm_arr[10]; // 用来测试性能 +static Timer tm_arr[20]; // 用来测试性能 /* 全局本地变量 */ static vector g_vRnParser; // 每个线程一个read name parser static samFile *g_inBamFp; // 输入的bam文件 @@ -91,7 +91,8 @@ int MarkDuplicates(int argc, char *argv[]) htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 // htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); htsPoolRead.pool = hts_tpool_init(16); - htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); + // htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); + htsPoolWrite.pool = hts_tpool_init(16); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); @@ -150,30 +151,34 @@ int MarkDuplicates(int argc, char *argv[]) // BamBufType inBuf(false); // inBuf(g_gArg.use_asyncio); BamBufType inBuf(g_gArg.use_asyncio); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); -// while(inBuf.ReadStat() >= 0) -// { -// size_t readNum = inBuf.ReadBam(); -// cout << "read: " << readNum << endl; -// for (size_t i = 0; i < inBuf.Size(); ++i) -// { -// /* 判断是否冗余 */ -// if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) -// { -// Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); -// sam_close(g_outBamFp); -// sam_close(g_inBamFp); -// return -1; -// } -// } -// inBuf.ClearAll(); -// } -// if (sam_idx_save(g_outBamFp) < 0) -// { -// Error("writing index failed"); -// sam_close(g_outBamFp); -// sam_close(g_inBamFp); -// return -1; -// } + Timer tw; + while (inBuf.ReadStat() >= 0) + { + Timer tw1; + size_t readNum = inBuf.ReadBam(); + cout << "read: " << readNum << endl; + for (size_t i = 0; i < inBuf.Size(); ++i) + { + /* 判断是否冗余 */ + if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) + { + Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); + sam_close(g_outBamFp); + sam_close(g_inBamFp); + return -1; + } + } + inBuf.ClearAll(); + cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl; + } + if (sam_idx_save(g_outBamFp) < 0) + { + Error("writing index failed"); + sam_close(g_outBamFp); + sam_close(g_inBamFp); + return -1; + } + cout << "write time: " << tw.seconds_elapsed() << " s" << endl; /* 关闭文件,收尾清理 */ sam_close(g_outBamFp); diff --git a/src/sam/markdups/serial_md.h b/src/sam/markdups/serial_md.h index d5ddd6d..5aa7f56 100644 --- a/src/sam/markdups/serial_md.h +++ b/src/sam/markdups/serial_md.h @@ -8,12 +8,21 @@ struct UnpairedREInfo ReadEnds unpairedRE; }; -struct GlobalUnpairedInfo +/* 对于一个pair数据,一个完整的计算点,包含read1的比对位置和read2的比对位置 */ +struct CalcKey { - int64_t taskSeq; - vector reArr; + int64_t read1Pos; + int64_t read2Pos; + bool operator<(const CalcKey &o) const + { + int comp = (int)(read1Pos - o.read1Pos); + if (comp == 0) + comp = (int)(read2Pos - o.read2Pos); + return comp < 0; + } }; +/* 当遗留数据在当前任务找到了pair read后,进行冗余计算时候存放结果的数据结构 */ struct TaskSeqDupInfo { set dupIdx; @@ -21,11 +30,19 @@ struct TaskSeqDupInfo set notDupIdx; }; +/* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */ +struct UnpairedPosInfo +{ + int unpairedNum = 0; + int64_t taskSeq; + vector pairArr; + set readNameSet; +}; // typedef unordered_map UnpairedNameMap; -// typedef unordered_map> UnpairedPositionMap; +// typedef unordered_map UnpairedPositionMap; -typedef tsl::robin_map UnpairedNameMap; -typedef tsl::robin_map> UnpairedPositionMap; +typedef tsl::robin_map UnpairedNameMap; // 以read name为索引,保存未匹配的pair read +typedef tsl::robin_map UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量 /* 单线程处理冗余参数结构体 */ struct SerailMarkDupArg @@ -45,29 +62,29 @@ struct SerailMarkDupArg /* 全局保留的数据,因为有些paired数据比对到了不同的染色体,相距甚远 */ struct GlobalDataArg { - set pairDupIdx; // pair的冗余read的索引 - set pairOpticalDupIdx; // optical冗余read的索引 - set notDupIdx; // 不是冗余 - //unordered_map unpairedDic; // 用来寻找pair end - //unordered_map> unpairedPosArr; UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedPositionMap unpairedPosArr; // 每个task对应一个vector vector> dupIdxArr; vector> opticalDupIdxArr; + + // 用来存放后续计算的数据 + vector> latterDupIdxArr; + vector> latterOpticalDupIdxArr; + vector> latterNotDupIdxArr; }; static GlobalDataArg gData; /* 查找 */ -template -static inline Itr binaryFind(Itr first, Itr last, const T &val) -{ - first = std::lower_bound(first, last, val); - return (first != last && *first == val) ? first : last; -} +// template +// static inline Itr binaryFind(Itr first, Itr last, const T &val) +// { +// first = std::lower_bound(first, last, val); +// return (first != last && *first == val) ? first : last; +// } /* 排序 */ static inline void sortReadEndsArr(vector &arr) @@ -101,9 +118,6 @@ static inline void sortReadEndsArr(vector &arr) if (right == end - start - 1) break; } - // cout << "sort: " << left << ' ' << right << ' ' - // << arr[start - left].posKey << ' ' << arr[start - 1].posKey << ' ' - // << arr[start].posKey << ' ' << arr[start + right].posKey << endl; std::sort(arr.begin() + start - left, arr.begin() + start + right); } } @@ -203,7 +217,7 @@ static void markDupsForFrags(vector &vpRe, /* 找到与readend pos相等的所有readend */ static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { - auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::pairsCmp); + auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::pairsLittleThan); // 只比对位点 dst->insert(dst->end(), range.first, range.second); } @@ -251,40 +265,29 @@ static void generateReadEnds(SerailMarkDupArg *arg) { auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); - // if (pairedEnds.read1IndexInFile == 94 || pairedEnds.read1IndexInFile == 95) - // { - // cout << "pair score: " << pairedEnds.read1IndexInFile << ' ' << pairedEnds.score << endl; - // } - // if (pairedEnds.read1IndexInFile == 94) - // { - // lastRe = pairedEnds; - // } - // if (pairedEnds.read1IndexInFile == 95) - // { - // cout << "compare: " << (lastRe < pairedEnds) << ' ' << (pairedEnds < lastRe) << endl; - // - // } p.pairs.push_back(pairedEnds); p.unpairedDic.erase(key); // 删除找到的pairend } } } } - // cout << "sort frags" << endl; + tm_arr[9].acc_start(); sortReadEndsArr(p.frags); // sort(p.frags.begin(), p.frags.end()); + tm_arr[9].acc_end(); // cout << "sort pairs" << endl; - // sortReadEndsArr(p.pairs); + tm_arr[10].acc_start(); sort(p.pairs.begin(), p.pairs.end()); - // cout << "unpaired num: " << p.unpairedDic.size() << endl; - - // 把未匹配的pair对应的每个位点的pairs记录下来 + tm_arr[10].acc_end(); + // 记录位点上的未匹配的read个数 for (auto &e : p.unpairedDic) { - auto &unpair = e.second; - auto posKey = unpair.unpairedRE.posKey; - if (p.unpairedPosArr.find(posKey) == p.unpairedPosArr.end()) - getEqualRE(unpair.unpairedRE, p.pairs, &p.unpairedPosArr[posKey]); + auto posKey = e.second.unpairedRE.posKey; + auto &unpairArrInfo = p.unpairedPosArr[posKey]; + unpairArrInfo.unpairedNum++; + unpairArrInfo.taskSeq = p.taskSeq; + unpairArrInfo.readNameSet.insert(e.first); } + cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << endl; } /* 处理pairs */ @@ -362,14 +365,17 @@ static void markdups(SerailMarkDupArg *arg) } /* 获取交叉部分的数据 */ -static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst) +static inline void getIntersectData(vector &leftArr, + vector &rightArr, + vector *dst, + bool isPairCmp = false) { const size_t leftEndIdx = leftArr.size() - 1; const size_t rightStartIdx = 0; size_t leftSpan = 0; size_t rightSpan = 0; - while (!(leftArr[leftEndIdx - leftSpan] < rightArr[rightStartIdx])) + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { leftSpan += 1; if (leftSpan > leftEndIdx) @@ -379,7 +385,7 @@ static inline void getIntersectData(vector &leftArr, vector } } - while (!(leftArr[leftEndIdx] < rightArr[rightSpan])) + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { rightSpan += 1; if (rightSpan == rightArr.size() - 1) @@ -390,7 +396,7 @@ static inline void getIntersectData(vector &leftArr, vector std::sort(dst->begin(), dst->end()); } -/* 将重叠部分的dup idx放进数据中 */ +/* 将frags重叠部分的dup idx放进数据中 */ static inline void refreshFragDupIdx(set &dupIdx, set ¬DupIdx, SerailMarkDupArg * lastArg, @@ -410,6 +416,7 @@ static inline void refreshFragDupIdx(set &dupIdx, } } +/* 将pairs重叠部分的dup idx放进数据中 */ static inline void refreshPairDupIdx(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, @@ -437,104 +444,72 @@ static inline void refreshPairDupIdx(set &dupIdx, } } -/* 处理未匹配的部分 */ -static inline void processUnpairedPosForCalc(UnpairedNameMap &lpUnpairedDic, - UnpairedPositionMap &lpUnpairedPosArr, - UnpairedNameMap &pUnpairedDic, - UnpairedPositionMap &pUnpairedPosArr, - vector &pairs, - map &recalcPos, - bool addToLast = false) -{ - recalcPos.clear(); - for (auto itr = pUnpairedDic.begin(); itr != pUnpairedDic.end();) - { - auto &readName = itr->first; - - if (lpUnpairedDic.find(readName) != lpUnpairedDic.end()) - { - auto &posInfo = lpUnpairedDic[readName]; - auto posKey = posInfo.unpairedRE.posKey; - auto &posReArr = lpUnpairedPosArr[posKey]; - - modifyPairedEnds(itr->second.unpairedRE, &posInfo.unpairedRE); - posKey = posInfo.unpairedRE.posKey; - if (recalcPos.find(posKey) == recalcPos.end()) // 如果之前没有这个位点 - getEqualRE(posInfo.unpairedRE, pairs, &posReArr); - recalcPos[posKey] = posInfo.taskSeq; - posReArr.push_back(posInfo.unpairedRE); - std::sort(posReArr.begin(), posReArr.end()); - - lpUnpairedDic.erase(readName); - itr = pUnpairedDic.erase(itr); - } - else - { - if (addToLast) // 将数据添加进遗留数据中 - { - auto posKey = itr->second.unpairedRE.posKey; - lpUnpairedDic[readName] = itr->second; - lpUnpairedPosArr[posKey] = pUnpairedPosArr[posKey]; - } - ++itr; - } - } -} - // 用来分别处理dup和optical dup -static void refeshTaskDupInfo(vector &addDup, - map &ndPosVal, - set &dupIdx, +static void refeshTaskDupInfo(set &dupIdx, + set &opticalDupIdx, set ¬DupIdx, - vector &dupArr) + set &latterDupIdx, + set &latterOpticalDupIdx, + set &latterNotDupIdx) { - addDup.clear(); - ndPosVal.clear(); - // 去除之前有的,重复的 - for (auto i = dupIdx.begin(); i != dupIdx.end();) + for (auto idx : dupIdx) + latterDupIdx.insert(idx); + for (auto idx : opticalDupIdx) + latterOpticalDupIdx.insert(idx); + for (auto idx : notDupIdx) + latterNotDupIdx.insert(idx); +} + +/* 最后合并数据并排序 */ +static void refeshFinalTaskDupInfo(set &dupIdx, + set ¬DupIdx, + vector &dupArr) +{ + vector midArr; + + auto ai = dupArr.begin(); + auto bi = dupIdx.begin(); + auto ae = dupArr.end(); + auto be = dupIdx.end(); + + int64_t val = 0; + while (ai != ae && bi != be) { - auto itr = binaryFind(dupArr.begin(), dupArr.end(), *i); - if (itr != dupArr.end()) + if (*ai < *bi) { - i = dupIdx.erase(i); + val = *ai++; + } + else if (*bi < *ai) + { + val = *bi++; } else { - ++i; + val = *ai++; + bi++; } - } - // 添加现有的 - auto di = dupIdx.begin(); - for (auto nidx : notDupIdx) - { - auto itr = binaryFind(dupArr.begin(), dupArr.end(), nidx); - if (itr != dupArr.end()) + if (notDupIdx.find(val) == notDupIdx.end()) { - ndPosVal[itr - dupArr.begin()] = *di++; + midArr.push_back(val); } } - - while (di != dupIdx.end()) - addDup.push_back(*di++); - - for (auto pos : ndPosVal) - dupArr[pos.first] = pos.second; - dupArr.insert(dupArr.end(), addDup.begin(), addDup.end()); - std::sort(dupArr.begin(), dupArr.end()); -} - -/* 将遗留的冗余信息添加进对应的任务数据中 */ -static void addDupInfoToTask(map &seqTaskChanged, GlobalDataArg *gDataArg) -{ - auto &g = *gDataArg; - // 更新遗留的结果 - vector addDup; - map ndPosVal; - for (auto &e : seqTaskChanged) + while (ai != ae) { - refeshTaskDupInfo(addDup, ndPosVal, e.second.dupIdx, e.second.notDupIdx, g.dupIdxArr[e.first]); - refeshTaskDupInfo(addDup, ndPosVal, e.second.opticalDupIdx, e.second.notDupIdx, g.opticalDupIdxArr[e.first]); + val = *ai++; + if (notDupIdx.find(val) == notDupIdx.end()) + { + midArr.push_back(val); + } } + while (bi != be) + { + val = *bi++; + if (notDupIdx.find(val) == notDupIdx.end()) + { + midArr.push_back(val); + } + } + dupArr = midArr; } /* 处理相邻的两个任务,有相交叉的数据 */ @@ -557,51 +532,211 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur dupIdx.clear(); notDupIdx.clear(); set opticalDupIdx; - getIntersectData(lp.pairs, p.pairs, &reArr); + getIntersectData(lp.pairs, p.pairs, &reArr, true); processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx); refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p); // 处理之前未匹配的部分 - map recalcPos; - processUnpairedPosForCalc(lp.unpairedDic, - lp.unpairedPosArr, - p.unpairedDic, - p.unpairedPosArr, - p.pairs, - recalcPos); - for (auto &e : recalcPos) + map recalcPos; + set alreadyAdd; // 与该位点相同的pair都添加到数组里了 + set addToGlobal; + int64_t prevLastPos = 0, nextFirstPos = 0; + if (lp.frags.size() > 0) + prevLastPos = lp.frags.back().posKey; + if (p.frags.size() > 0) + nextFirstPos = p.frags[0].posKey; + // cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl; + for (auto &prevUnpair : lp.unpairedDic) // 遍历上一个任务中的每个未匹配的read { - auto posKey = e.first; - dupIdx.clear(); - notDupIdx.clear(); - opticalDupIdx.clear(); - processPairs(lp.unpairedPosArr[posKey], &dupIdx, &opticalDupIdx, ¬DupIdx); - refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p); - } + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - // 遗留的未匹配的pair - processUnpairedPosForCalc(g.unpairedDic, - g.unpairedPosArr, - lp.unpairedDic, - lp.unpairedPosArr, - lp.pairs, - recalcPos, - true); - map seqTaskChanged; + if (p.unpairedDic.find(readName) != p.unpairedDic.end()) // 在当前这个任务里找到了这个未匹配的read + { + auto &nextPosInfo = p.unpairedDic[readName]; + auto &nextFragEnd = nextPosInfo.unpairedRE; + int64_t prevPosKey = prevFragEnd.posKey; + modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read + int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey); + CalcKey ck = {prevPosKey, nextPosKey}; + UnpairedPosInfo *prevUnpairInfoP = nullptr; + UnpairedPosInfo *nextUnpairInfoP = nullptr; + if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end()) + prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; + if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) + nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; + + // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 + // 1. prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; + // 2. prevpos在交叉部分之前,nextpos在交叉部分,需要获取lp中的相等read pair进行重新计算 + // 复杂情况1. g中包含prevPosKey对应的unpair,p中有对应的pair,此时应该把这些pair考虑进去 + // 3. prevpos在交叉部分,nextpos在交叉部分之后,需要获取p中的相等read pair进行重新计算 + // 复杂情况2. p中是否包含prevPosKey对应的unpair + // 4. prevpos在交叉部分,nextpos在交叉部分,需要获取lp和p中的相等read pair进行重新计算 + + bool addDataToPos = true; + if (alreadyAdd.find(ck) != alreadyAdd.end()) + { + addDataToPos = false; // 之前已经添加过了,后面就不用再添加数据了 + } + else + alreadyAdd.insert(ck); + + if (prevPosKey < nextFirstPos) // prevpos在交叉部分之前 + { + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (nextPosKey <= prevLastPos && addDataToPos) // 第二种情况 + { + getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); + } + // 第一种情况,第二种情况下都会出现,复杂情况一 + auto gPosInfo = g.unpairedPosArr.find(prevPosKey); + if (gPosInfo != g.unpairedPosArr.end()) // 可能g和p有匹配的,刚好和该位点一致 + { + auto &gUnpairInfo = gPosInfo->second; + auto pPosInfo = p.unpairedPosArr.find(nextPosKey); + if (pPosInfo != p.unpairedPosArr.end()) + { + auto &pUnpairInfo = pPosInfo->second; + for (auto &rn : gUnpairInfo.readNameSet) // 遍历每一个readname,看是否有匹配的 + { + if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) + { + auto pe = g.unpairedDic[rn].unpairedRE; + auto fe = p.unpairedDic[rn].unpairedRE; + modifyPairedEnds(fe, &pe); + prevPairArr.push_back(pe); + g.unpairedDic.erase(rn); + p.unpairedDic.erase(rn); + // cout << "找到了!" << rn << endl; + } + } + } + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + else // prevpos在交叉部分 + { + if (nextPosKey > prevLastPos) // nextpos在交叉部分之后 + { // 第三种情况 + if (nextUnpairInfoP != nullptr) // 且在pos点,next task有unpair,这样才把这些数据放到next task里 + { + auto &nextPairArr = nextUnpairInfoP->pairArr; + nextPairArr.push_back(prevFragEnd); + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) + { + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + } + recalcPos[ck] = nextPosInfo.taskSeq; // 将数据放到next task里, (这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + else // next task在该位点没有unpair,那就把数据放到prev task里 + { + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) // 第二种情况 + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } + else + { // 第四种情况 + if (prevUnpairInfoP == nullptr) { + prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; + prevUnpairInfoP->taskSeq = lp.taskSeq; + } + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) + { + getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } + p.unpairedDic.erase(readName); // 在next task里删除该read + } + else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在遗留数据中找到了匹配的read + { + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + auto &remainPairArr = remainUnpairInfo.pairArr; + remainPairArr.push_back(remainFragEnd); + CalcKey ck = {remainPosKey, prevFragEnd.posKey}; + recalcPos[ck] = remainPosInfo.taskSeq; + std::sort(remainPairArr.begin(), remainPairArr.end()); + + g.unpairedDic.erase(readName); + } + else // 都没找到,那就保存到遗留数据里 + { + int64_t prevPosKey = prevFragEnd.posKey; + g.unpairedDic.insert(prevUnpair); + addToGlobal.insert(prevPosKey); + } + } + for (auto posKey : addToGlobal) // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 + g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; + + map taskChanged; + set posProcessed; for (auto &e : recalcPos) { - auto posKey = e.first; - auto seqNum = e.second; - auto &t = seqTaskChanged[seqNum]; + auto posKey = e.first.read1Pos; + if (posProcessed.find(posKey) != posProcessed.end()) + continue; + posProcessed.insert(posKey); + auto taskSeq = e.second; + auto &t = taskChanged[taskSeq]; // 在对应的任务包含的dup idx里修改结果数据 - processPairs(g.unpairedPosArr[posKey], &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); - g.unpairedPosArr.erase(posKey); + vector *pairArrP = nullptr; + if (taskSeq < lp.taskSeq) + pairArrP = &g.unpairedPosArr[posKey].pairArr; + else + pairArrP = &lp.unpairedPosArr[posKey].pairArr; + processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + if (taskSeq < lp.taskSeq) + g.unpairedPosArr.erase(posKey); } - addDupInfoToTask(seqTaskChanged, &g); + // 更新结果 - cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; + for (auto &e : taskChanged) + { + auto taskSeq = e.first; + auto &t = e.second; + if (taskSeq < lp.taskSeq) + { + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, + g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]); + } + else if (taskSeq == lp.taskSeq) + { + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &lp, &p); + } + else + { + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &p, &lp); // 把结果放到p中 + } + } + // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; + // cout << "calc g time: " << t.seconds_elapsed() << " s" << endl; // 将dupidx放进全局数据 + g.latterDupIdxArr.push_back(set()); + g.latterOpticalDupIdxArr.push_back(set()); + g.latterNotDupIdxArr.push_back(set()); + g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); @@ -617,32 +752,60 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { auto &lp = *task; auto &g = *gDataArg; - - map recalcPos; // 遗留的未匹配的pair - processUnpairedPosForCalc(g.unpairedDic, - g.unpairedPosArr, - lp.unpairedDic, - lp.unpairedPosArr, - lp.pairs, - recalcPos, - true); - map seqTaskChanged; - for (auto &e : recalcPos) + for (auto &prevUnpair : lp.unpairedDic) // 遍历上一个任务中的每个未匹配的read + { + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + + if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在遗留数据中找到了匹配的read + { + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + + remainUnpairInfo.pairArr.push_back(remainFragEnd); + g.unpairedDic.erase(readName); + } + } + map taskChanged; + for (auto &e : g.unpairedPosArr) { auto posKey = e.first; - auto seqNum = e.second; - auto &t = seqTaskChanged[seqNum]; - // 在对应的任务包含的dup idx里修改结果数据 - processPairs(g.unpairedPosArr[posKey], &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); - g.unpairedPosArr.erase(posKey); - } - // 更新遗留的结果 - addDupInfoToTask(seqTaskChanged, &g); + auto taskSeq = e.second.taskSeq; + auto &t = taskChanged[taskSeq]; + auto &arr = g.unpairedPosArr[posKey].pairArr; - cout << "last unpair info: " << g.unpairedPosArr.size() << '\t' << g.unpairedDic.size() << endl; + if (arr.size() > 1) + { + std::sort(arr.begin(), arr.end()); + processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + } + } + // 更新结果 + vector addDup; + map ndPosVal; + for (auto &e : taskChanged) + { + auto taskSeq = e.first; + auto &t = e.second; + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, + g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]); + } + + cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; + g.unpairedPosArr.clear(); + g.unpairedDic.clear(); // 将dupidx放进全局数据 + for (int i = 0; i < g.dupIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]); + for (int i = 0; i < g.opticalDupIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotDupIdxArr[i], g.opticalDupIdxArr[i]); + g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); @@ -662,7 +825,7 @@ static void serialMarkDups() BamBufType inBamBuf(g_gArg.use_asyncio); inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); // BamBufType inBamBuf(false); - // inBamBuf.Init(g_inBamFp, g_inBamHeader, 20 * 1024 * 1024); + // inBamBuf.Init(g_inBamFp, g_inBamHeader, 100 * 1024 * 1024); int64_t processedBamNum = 0; SerailMarkDupArg smdArg1, smdArg2; @@ -689,13 +852,13 @@ static void serialMarkDups() tm_arr[0].acc_start(); Timer t1; generateReadEnds(curArgP); - cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; + //cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[0].acc_end(); tm_arr[1].acc_start(); t1.reinit(); markdups(curArgP); - cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; + //cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[1].acc_end(); if (!isFirstRound) @@ -703,7 +866,7 @@ static void serialMarkDups() tm_arr[2].acc_start(); t1.reinit(); handleIntersectData(lastArgP, curArgP, &gData); - cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; + //cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; // addTaskIdxToSet(lastArgP, &gData); tm_arr[2].acc_end(); } @@ -724,20 +887,42 @@ static void serialMarkDups() // break; } } + // cout << "here" << endl; tm_arr[3].acc_start(); // 处理剩下的全局数据 handleLastTask(lastArgP, &gData); + // cout << "here 2" << endl; tm_arr[3].acc_end(); tm_arr[5].acc_end(); // 统计所有冗余index数量 int64_t dupNum = 0; - unordered_set dup; + set dup; + + // int taskSeq = 0; + // for (auto &arr : gData.dupIdxArr) + // { + // for (auto idx : arr) { + // if (dup.find(idx) != dup.end()) + // { + // cout << "dup index: " << taskSeq << '\t' << idx << endl; + // } + // dup.insert(idx); + // } + // taskSeq++; + // } + // #include + // ofstream out("tumor_dup.txt"); + // for (auto idx : dup) + // { + // out << idx << endl; + // } + // out.close(); + for (auto &arr : gData.dupIdxArr) - for (auto idx : arr) - dup.insert(idx); - dupNum += dup.size(); - cout << "dup num : " << dupNum << endl; + dupNum += arr.size(); + + cout << "dup num : " << dupNum << '\t' << dup.size() << endl; cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; @@ -747,6 +932,8 @@ static void serialMarkDups() cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl; cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl; cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; + cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; + cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; Timer::log_time("serial end "); diff --git a/src/sam/utils/read_ends.h b/src/sam/utils/read_ends.h index d9d76ce..1ec6b0e 100644 --- a/src/sam/utils/read_ends.h +++ b/src/sam/utils/read_ends.h @@ -101,15 +101,6 @@ struct ReadEnds : PhysicalLocation return areComparable; } - // 找某一个位置的所有readend时需要 - static bool pairsCmp(const ReadEnds &lhs, const ReadEnds &rhs) - { - int comp = lhs.read1ReferenceIndex - rhs.read1ReferenceIndex; - if (comp == 0) - comp = lhs.read1Coordinate - rhs.read1Coordinate; - return comp < 0; - } - /* 比对方向是否正向 */ bool IsPositiveStrand() const { @@ -127,6 +118,30 @@ struct ReadEnds : PhysicalLocation return orientation == R; } + // 对于相交的数据进行比对,a是否小于b,根据AreComparableForDuplicates函数得来 + static inline bool ReadLittleThan(const ReadEnds &a, const ReadEnds &b, bool compareRead2 = false) + { + int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; + if (comp == 0) + comp = a.read1Coordinate - b.read1Coordinate; + if (comp == 0) + comp = a.orientation - b.orientation; + if (compareRead2) + { + if (comp == 0) + comp = a.read2ReferenceIndex - b.read2ReferenceIndex; + if (comp == 0) + comp = a.read2Coordinate - b.read2Coordinate; + } + return comp < 0; + } + + // 找某一个位置的所有readend时需要 + static bool pairsLittleThan(const ReadEnds &lhs, const ReadEnds &rhs) + { + return ReadLittleThan(lhs, rhs, true); + } + // 比较函数 bool operator < (const ReadEnds &o) const {