diff --git a/src/sam/markdups/pipeline_md.cpp b/src/sam/markdups/pipeline_md.cpp index 36e6588..5877929 100644 --- a/src/sam/markdups/pipeline_md.cpp +++ b/src/sam/markdups/pipeline_md.cpp @@ -22,14 +22,6 @@ using std::cout; using std::vector; -/* 查找 */ -// template -// static inline Itr binaryFind(Itr first, Itr last, const T &val) -// { -// first = std::lower_bound(first, last, val); -// return (first != last && *first == val) ? first : last; -// } - /* 排序 */ static inline void sortReadEndsArr(vector &arr) { size_t blockSize = 64 * 1024; @@ -177,150 +169,6 @@ static void getEqualRE(const ReadEnds &re, vector &src, vectorGetReadUnmappedFlag()) { - if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - break; - } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 - ReadEnds fragEnd; - buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); - p.frags.push_back(fragEnd); // 添加进frag集合 - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 - string key = bw->query_name(); - if (p.unpairedDic.find(key) == p.unpairedDic.end()) { - p.unpairedDic[key] = {mdArg.taskSeq, fragEnd}; - } else { // 找到了pairend - auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - p.pairs.push_back(pairedEnds); - p.unpairedDic.erase(key); // 删除找到的pairend - } - } - } - } - // sortReadEndsArr(p.frags); - sort(p.frags.begin(), p.frags.end()); - sort(p.pairs.begin(), p.pairs.end()); - // cout << tid << "\tthread end" << endl; - // if (mdArg.taskSeq == 163) - // cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl; -} - -static void *threadSortParallelPairs(void *data) { - auto &pd = *(ParallelDataArg *)data; - vector &pairs = pd.threadData[pd.numThread].pairs; - sort(pairs.begin(), pairs.end()); - // 合并所有pairs - pd.pairs2d.clear(); - for (int i = 0; i < pd.numThread; ++i) { - pd.pairs2d.push_back(&pd.threadData[i].pairs); - } - pd.pairs2d.push_back(&pd.threadData[pd.numThread].pairs); - ReadEndsQueue pairsQue; - pairsQue.Init(&pd.pairs2d); - const ReadEnds *pRE; - while ((pRE = pairsQue.Pop()) != nullptr) { - pd.mdArg->pairs.push_back(*pRE); - } - return nullptr; -} - -static void *threadSortParallelFrags(void *data) { - auto &pd = *(ParallelDataArg *)data; - ReadEndsQueue fragsQue; - const ReadEnds *pRE; - pd.frags2d.clear(); - for (int i = 0; i < pd.numThread; ++i) { - pd.frags2d.push_back(&pd.threadData[i].frags); - } - fragsQue.Init(&pd.frags2d); - while ((pRE = fragsQue.Pop()) != nullptr) { - pd.mdArg->frags.push_back(*pRE); - } - return nullptr; -} - -static void *threadCreateUnpairInfo(void *data) { - auto &pd = *(ParallelDataArg *)data; - auto &p = *pd.mdArg; - for (auto &e : p.unpairedDic) { - auto posKey = e.second.unpairedRE.posKey; - auto &unpairArrInfo = p.unpairedPosArr[posKey]; - unpairArrInfo.unpairedNum++; - unpairArrInfo.taskSeq = p.taskSeq; - unpairArrInfo.readNameSet.insert(e.first); - } - return nullptr; -} - -/* 单线程生成readends (第一步)*/ -static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { - auto &p = *arg; - p.pairs.clear(); - p.frags.clear(); - p.unpairedDic.clear(); - p.unpairedPosArr.clear(); - - pdArg.mdArg = arg; - tm_arr[8].acc_start(); - kt_for(g_gArg.num_threads / 2, threadGenerateReadEnds, &pdArg, g_gArg.num_threads); - // tm_arr[8].acc_end(); - // 合并各个线程数据 - // 查找未匹配的frags - - tm_arr[9].acc_start(); - vector &pairs = pdArg.threadData[g_gArg.num_threads].pairs; - pairs.clear(); - for (int i = 0; i < g_gArg.num_threads; ++i) { - for (auto &val : pdArg.threadData[i].unpairedDic) { - const string &key = val.first; - const ReadEnds &fragEnd = val.second.unpairedRE; - if (p.unpairedDic.find(key) == p.unpairedDic.end()) { - p.unpairedDic[key] = {p.taskSeq, fragEnd}; - } else { // 找到了pairend - auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - p.unpairedDic.erase(key); // 删除找到的pairend - } - } - } - threadCreateUnpairInfo(&pdArg); - threadSortParallelPairs(&pdArg); - tm_arr[9].acc_end(); - tm_arr[8].acc_end(); - - tm_arr[11].acc_start(); -// pthread_t tid; -// pthread_create(&tid, nullptr, threadSortParallelFrags, &pdArg); - threadSortParallelFrags(&pdArg); - // tm_arr[12].acc_start(); - // threadSortParallelPairs(&pdArg); - // tm_arr[12].acc_end(); - // tm_arr[13].acc_start(); - // threadCreateUnpairInfo(&pdArg); - // tm_arr[13].acc_end(); -// pthread_join(tid, nullptr); - tm_arr[11].acc_end(); -} - /* 处理pairs */ static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, DPSet *repIdx, MDSet *singletonIdx, MDSet *notDupIdx = nullptr, @@ -382,8 +230,6 @@ static void markdups(MarkDupDataArg *arg) { /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ // 先处理 pair processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx, &p.pairSingletonIdx); - // cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl; - // 再处理frag processFrags(p.frags, &p.fragDupIdx); } @@ -457,10 +303,6 @@ static inline void refreshPairDupIdx(DPSet &dupIdx, MDSet &opt p.pairSingletonIdx.erase(idx); } for (auto idx : notDupIdx) { - // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; - // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; - // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; - // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx); } @@ -486,10 +328,7 @@ static void refeshTaskDupInfo(DPSet &dupIdx, MDSet &opticalDup MDSet &latterSingletonIdx, MDSet &latterNotDupIdx, MDSet &latterNotOpticalDupIdx, MDSet &latterNotRepIdx, MDSet &latterNotSingletonIdx) { - for (auto idx : dupIdx) { - latterDupIdx.insert(idx); - // latterNotDupIdx.erase(idx); // 后来的更新为准 - } + for (auto idx : dupIdx) latterDupIdx.insert(idx); for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); for (auto idx : repIdx) latterRepIdx.insert(idx); for (auto idx : singletonIdx) latterSingletonIdx.insert(idx); @@ -512,8 +351,6 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupI auto ae = dupArr.end(); auto bi = cacheDupIdx.begin(); auto be = cacheDupIdx.end(); - // auto bi = dupIdx.begin(); - // auto be = dupIdx.end(); T val = 0; while (ai != ae && bi != be) { @@ -544,368 +381,6 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupI dupArr = midArr; } -static int64_t llp_frag_end_pos = 0; -static int64_t llp_pair_end_pos = 0; -static int64_t gSingletonNum = 0; -/* 处理相邻的两个任务,有相交叉的数据 */ -static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) { - auto &lp = *lastArg; - auto &p = *curArg; - auto &g = *gDataArg; - - vector reArr; - DPSet dupIdx; - MDSet opticalDupIdx; - DPSet repIdx; - MDSet singletonIdx; - MDSet notOpticalDupIdx; - MDSet notDupIdx; - MDSet notRepIdx; - MDSet notSingletonIdx; - // 先处理重叠的frags - getIntersectData(lp.frags, p.frags, &reArr); - processFrags(reArr, &dupIdx, ¬DupIdx); - refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p); - // cout << "578" << endl; - if (!p.frags.empty()) - if (llp_frag_end_pos >= p.frags.begin()->posKey) { - cout << "frags : " << llp_frag_end_pos << "\t" << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey - << endl; - } - if (!p.pairs.empty()) - if (llp_pair_end_pos >= p.pairs.begin()->posKey) { - cout << "pairs : " << llp_pair_end_pos << "\t" << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey - << endl; - } - if (!p.frags.empty()) - llp_frag_end_pos = lp.frags.rbegin()->posKey; - if (!p.pairs.empty()) - llp_pair_end_pos = lp.pairs.rbegin()->posKey; - - // 再处理重叠的pairs - reArr.clear(); - dupIdx.clear(); - notDupIdx.clear(); - getIntersectData(lp.pairs, p.pairs, &reArr, true); - -// cout << "pairs: " << lp.pairs.size() << "\t" << p.pairs.size() << endl; - processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, - ¬SingletonIdx); -// cout << "size: " << reArr.size() << endl; - refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, - notSingletonIdx, &lp, &p); - -// cout << "pairs size: " << lp.pairs.size() << "\t" << p.pairs.size() << endl; -// cout << "range: " << lp.pairs.front().posKey << "\t" << lp.pairs.back().posKey << "\t" -// << p.pairs.front().posKey << "\t" << p.pairs.back().posKey << endl; -// cout << "intersect size: " << reArr.size() << endl; - - // 处理之前未匹配的部分 - map recalcPos; - CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 - MDSet addToGlobal; - int64_t prevLastPos = 0, nextFirstPos = 0; - if (lp.frags.size() > 0) - prevLastPos = lp.frags.back().posKey; - if (p.frags.size() > 0) - nextFirstPos = p.frags[0].posKey; - for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read - auto &readName = prevUnpair.first; - auto &prevPosInfo = prevUnpair.second; - auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - - if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read - auto &nextPosInfo = p.unpairedDic[readName]; - auto &nextFragEnd = nextPosInfo.unpairedRE; - int64_t prevPosKey = prevFragEnd.posKey; - modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read - int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey); - CalcKey ck = {prevPosKey, nextPosKey}; - UnpairedPosInfo *prevUnpairInfoP = nullptr; - UnpairedPosInfo *nextUnpairInfoP = nullptr; - if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end()) - prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; - if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) - nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; - - // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 - // 1. - // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; - // 2. - // prevpos在交叉部分之前,nextpos在交叉部分,需要获取lp中的相等read pair进行重新计算 - // 复杂情况1. g中包含prevPosKey对应的unpair,p中有对应的pair,此时应该把这些pair考虑进去 - // 3. - // prevpos在交叉部分,nextpos在交叉部分之后,需要获取p中的相等read pair进行重新计算 - // 复杂情况2. p中是否包含prevPosKey对应的unpair - // 4. - // prevpos在交叉部分,nextpos在交叉部分,需要获取lp和p中的相等read pair进行重新计算 - - bool addDataToPos = true; - if (alreadyAdd.find(ck) != alreadyAdd.end()) { - // 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据 - addDataToPos = false; - } else - alreadyAdd.insert(ck); - - if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前 - auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr - prevPairArr.push_back(prevFragEnd); - if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况 - getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); - } - // 第一种情况,第二种情况下都会出现,复杂情况一 - auto gPosInfo = g.unpairedPosArr.find(prevPosKey); - if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致 - auto &gUnpairInfo = gPosInfo->second; - auto pPosInfo = p.unpairedPosArr.find(nextPosKey); - if (pPosInfo != p.unpairedPosArr.end()) { - auto &pUnpairInfo = pPosInfo->second; - for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的 - if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) { - auto pe = g.unpairedDic[rn].unpairedRE; - auto fe = p.unpairedDic[rn].unpairedRE; - modifyPairedEnds(fe, &pe); - prevPairArr.push_back(pe); - g.unpairedDic.erase(rn); - p.unpairedDic.erase(rn); - } - } - } - } - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } else { // prevpos在交叉部分 - if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况 - if (nextUnpairInfoP != nullptr) { // 且在pos点,next task有unpair,这样才把这些数据放到next task里 - auto &nextPairArr = nextUnpairInfoP->pairArr; - nextPairArr.push_back(prevFragEnd); - auto &prevPairArr = prevUnpairInfoP->pairArr; - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) { - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); - getEqualRE(prevFragEnd, p.pairs, &nextPairArr); - } - // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) - recalcPos[ck] = nextPosInfo.taskSeq; - - std::sort(prevPairArr.begin(), prevPairArr.end()); - std::sort(nextPairArr.begin(), nextPairArr.end()); - } else { // next task在该位点没有unpair,那就把数据放到prev task里 - auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) // 第二种情况 - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } - } else { // 第四种情况 - if (prevUnpairInfoP == nullptr) { - prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; - prevUnpairInfoP->taskSeq = lp.taskSeq; - } - auto &prevPairArr = prevUnpairInfoP->pairArr; - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) { - getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); - } - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } - } - p.unpairedDic.erase(readName); // 在next task里删除该read - } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read - auto &remainPosInfo = g.unpairedDic[readName]; - auto remainFragEnd = remainPosInfo.unpairedRE; - int64_t remainPosKey = remainFragEnd.posKey; - modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read - auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; - auto &remainPairArr = remainUnpairInfo.pairArr; - remainPairArr.push_back(remainFragEnd); - CalcKey ck = {remainPosKey, prevFragEnd.posKey}; - recalcPos[ck] = remainPosInfo.taskSeq; - std::sort(remainPairArr.begin(), remainPairArr.end()); - - g.unpairedDic.erase(readName); - } else { // 都没找到,那就保存到遗留数据里 - int64_t prevPosKey = prevFragEnd.posKey; - g.unpairedDic.insert(prevUnpair); - addToGlobal.insert(prevPosKey); - } - } - map taskChanged; - MDSet posProcessed; - for (auto &e : recalcPos) { - auto posKey = e.first.read1Pos; - if (posProcessed.find(posKey) != posProcessed.end()) - continue; - posProcessed.insert(posKey); - auto taskSeq = e.second; - auto &t = taskChanged[taskSeq]; - // 在对应的任务包含的dup idx里修改结果数据 - vector *pairArrP = nullptr; - if (taskSeq < lp.taskSeq) - pairArrP = &g.unpairedPosArr[posKey].pairArr; - else - pairArrP = &lp.unpairedPosArr[posKey].pairArr; - processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, - &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); - if (taskSeq < lp.taskSeq) - g.unpairedPosArr.erase(posKey); - } - - // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 - // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) - for (auto posKey : addToGlobal) { - g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; - } - - // 更新结果 - for (auto &e : taskChanged) { - auto taskSeq = e.first; - auto &t = e.second; - if (taskSeq < lp.taskSeq) { - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, - t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], - g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], - g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], - g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], - g.latterNotSingletonIdxArr[taskSeq]); - } else if (taskSeq == lp.taskSeq) { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, - t.notRepIdx, t.notSingletonIdx, &lp, &p); - } else { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, - t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中 - } - } - // 将dupidx放进全局数据 - g.latterDupIdxArr.push_back(DPSet()); - g.latterOpticalDupIdxArr.push_back(MDSet()); - g.latterRepIdxArr.push_back(DPSet()); - g.latterSingletonIdxArr.push_back(MDSet()); - g.latterNotDupIdxArr.push_back(MDSet()); - g.latterNotOpticalDupIdxArr.push_back(MDSet()); - g.latterNotRepIdxArr.push_back(MDSet()); - g.latterNotSingletonIdxArr.push_back(MDSet()); - - g.dupIdxArr.push_back(vector()); - auto &vIdx = g.dupIdxArr.back(); - lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); - vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); - std::sort(vIdx.begin(), vIdx.end()); - - g.opticalDupIdxArr.push_back(vector()); - auto &vOpticalIdx = g.opticalDupIdxArr.back(); - vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); - std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); - - g.repIdxArr.push_back(vector()); - auto &vRepIdx = g.repIdxArr.back(); - vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); - std::sort(vRepIdx.begin(), vRepIdx.end()); - - g.singletonIdxArr.push_back(vector()); - auto &vSingletonIdx = g.singletonIdxArr.back(); - vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); - std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); - gSingletonNum += lp.pairSingletonIdx.size(); -} - -/* 当所有任务结束后,global data里还有未处理的数据 */ -static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { - // cout << "last task start" << endl; - auto &lp = *task; - auto &g = *gDataArg; - // 遗留的未匹配的pair - for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read - auto &readName = prevUnpair.first; - auto &prevPosInfo = prevUnpair.second; - auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - - if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read - auto &remainPosInfo = g.unpairedDic[readName]; - auto remainFragEnd = remainPosInfo.unpairedRE; - int64_t remainPosKey = remainFragEnd.posKey; - modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read - auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; - - remainUnpairInfo.pairArr.push_back(remainFragEnd); - g.unpairedDic.erase(readName); - } else { - g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 - } - } - - map taskChanged; - for (auto &e : g.unpairedPosArr) { - auto posKey = e.first; - auto taskSeq = e.second.taskSeq; - auto &t = taskChanged[taskSeq]; - auto &arr = g.unpairedPosArr[posKey].pairArr; - - if (arr.size() > 1) { - std::sort(arr.begin(), arr.end()); - processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, - &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); - } else if (arr.size() == 1) { - // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); - t.singletonIdx.insert(arr[0].read1IndexInFile); - } - } - // 更新结果 - vector addDup; - map ndPosVal; - for (auto &e : taskChanged) { - auto taskSeq = e.first; - auto &t = e.second; - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, - t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], - g.latterRepIdxArr[taskSeq], g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], - g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], - g.latterNotSingletonIdxArr[taskSeq]); - } - g.unpairedPosArr.clear(); - - // 将dupidx放进全局数据 - vector cacheDupIdx; - vector midArr; - vector intCacheDupIdx; - vector intMidArr; - for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) { - refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr); - } - for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], - intCacheDupIdx, intMidArr); - for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); - for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i], - intCacheDupIdx, intMidArr); - - g.dupIdxArr.push_back(vector()); - auto &vIdx = g.dupIdxArr.back(); - lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); - vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); - std::sort(vIdx.begin(), vIdx.end()); - - g.opticalDupIdxArr.push_back(vector()); - auto &vOpticalIdx = g.opticalDupIdxArr.back(); - vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); - std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); - - g.repIdxArr.push_back(vector()); - auto &vRepIdx = g.repIdxArr.back(); - vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); - std::sort(vRepIdx.begin(), vRepIdx.end()); - - g.singletonIdxArr.push_back(vector()); - auto &vSingletonIdx = g.singletonIdxArr.back(); - vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); - std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); -} - // for step 2 generate read ends // multi-thread generate read ends @@ -951,7 +426,7 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { } } } - //sortReadEndsArr(frags); + // sortReadEndsArr(frags); sort(frags.begin(), frags.end()); sort(pairs.begin(), pairs.end()); } @@ -961,13 +436,9 @@ static void doGenRE(PipelineArg &pipeArg) { GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; ReadData &readData = pipeArg.readData; - -// cout << "dogen-gen buf : " << pipeArg.genREOrder % pipeArg.GENBUFNUM << endl; -// cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl; - // generate read ends const int numThread = pipeArg.numThread; - // / 4; + kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 // 轮询每个线程中未找到匹配的read,找到匹配的 @@ -1008,8 +479,7 @@ static void doSort(PipelineArg &pipeArg) { GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; SortMarkData &smd = *(SortMarkData *)sortData.dataPtr; - // cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl; - // cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl; + smd.unpairedDic = genREData.unpairedDic; smd.unpairedPosArr = genREData.unpairedPosArr; @@ -1101,28 +571,22 @@ static void doIntersect(PipelineArg &pipeArg) { MDSet notSingletonIdx; // 先处理重叠的frags - // cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl; getIntersectData(lpSM.frags, pSM.frags, &reArr); processFrags(reArr, &dupIdx, ¬DupIdx); refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx); refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx); - // 再处理重叠的pairs reArr.clear(); dupIdx.clear(); notDupIdx.clear(); getIntersectData(lpSM.pairs, pSM.pairs, &reArr, true); - // cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl; - // cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t" - // << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl; - // cout << "intersect size: " << reArr.size() << endl; + processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, ¬SingletonIdx); refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, notSingletonIdx, lp, p); -// return; // 处理之前未匹配的部分 map recalcPos; CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 @@ -1130,8 +594,7 @@ static void doIntersect(PipelineArg &pipeArg) { int64_t prevLastPos = 0, nextFirstPos = 0; if (lpSM.frags.size() > 0) prevLastPos = lpSM.frags.back().posKey; if (pSM.frags.size() > 0) nextFirstPos = pSM.frags[0].posKey; - //cout << "dic size: " << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size() - // << endl; + for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read auto &readName = prevUnpair.first; auto &prevPosInfo = prevUnpair.second; @@ -1235,8 +698,6 @@ static void doIntersect(PipelineArg &pipeArg) { } } pSM.unpairedDic.erase(readName); // 在next task里删除该read - // p.unpairedPosArr.erase(nextPosKey); - // p.unpairedPosArr[nextPosKey]. } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read auto &remainPosInfo = g.unpairedDic[readName]; auto remainFragEnd = remainPosInfo.unpairedRE; @@ -1279,10 +740,11 @@ static void doIntersect(PipelineArg &pipeArg) { } // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) -// for (auto posKey : addToGlobal) { -// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; -// } + // for (auto posKey : addToGlobal) { + // g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; + // } // return; + /* 不需要把p中找不到lp的unpair,放到global中,否则最后找到pair后,还要再执行一次冗余检测,造成重复的冗余索引 */ // 更新结果 for (auto &e : taskChanged) { @@ -1303,21 +765,14 @@ static void doIntersect(PipelineArg &pipeArg) { t.notRepIdx, t.notSingletonIdx, p, lp); // 把结果放到p中 } } -// for (auto posKey : addToGlobal) { -// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; -// } - -// return; // 将dupidx放进全局数据 g.latterDupIdxArr.push_back(DPSet()); g.latterOpticalDupIdxArr.push_back(MDSet()); g.latterRepIdxArr.push_back(DPSet()); - g.latterSingletonIdxArr.push_back(MDSet()); g.latterNotDupIdxArr.push_back(MDSet()); g.latterNotOpticalDupIdxArr.push_back(MDSet()); g.latterNotRepIdxArr.push_back(MDSet()); - g.latterNotSingletonIdxArr.push_back(MDSet()); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); @@ -1334,12 +789,6 @@ static void doIntersect(PipelineArg &pipeArg) { auto &vRepIdx = g.repIdxArr.back(); vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); std::sort(vRepIdx.begin(), vRepIdx.end()); - - g.singletonIdxArr.push_back(vector()); - auto &vSingletonIdx = g.singletonIdxArr.back(); - vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); - std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); -// gSingletonNum += lp.pairSingletonIdx.size(); } static void *pipeRead(void *data) { @@ -1542,7 +991,6 @@ static void *pipeIntersect(void *data) { /* 当所有任务结束后,global data里还有未处理的数据 */ static void mergeAllTask(PipelineArg &pipeArg) { - // cout << "last task start" << endl; MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM]; IntersectData &g = pipeArg.intersectData; SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; @@ -1577,9 +1025,6 @@ static void mergeAllTask(PipelineArg &pipeArg) { std::sort(arr.begin(), arr.end()); processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); - } else if (arr.size() == 1) { - // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); - t.singletonIdx.insert(arr[0].read1IndexInFile); } } // 更新结果 @@ -1609,9 +1054,6 @@ static void mergeAllTask(PipelineArg &pipeArg) { intCacheDupIdx, intMidArr); for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); - for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i], - intCacheDupIdx, intMidArr); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); @@ -1628,12 +1070,6 @@ static void mergeAllTask(PipelineArg &pipeArg) { auto &vRepIdx = g.repIdxArr.back(); vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); std::sort(vRepIdx.begin(), vRepIdx.end()); - - g.singletonIdxArr.push_back(vector()); - auto &vSingletonIdx = g.singletonIdxArr.back(); - vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); - std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); -// gSingletonNum += lp.pairSingletonIdx.size(); } static void parallelPipeline() {