diff --git a/run.sh b/run.sh index f7323ef..dcd8b97 100755 --- a/run.sh +++ b/run.sh @@ -7,8 +7,8 @@ nthread=32 #nthread=64 #nthread=128 #input=/home/zzh/data/bam/zy_normal.bam -input=/home/zzh/data/bam/zy_tumor.bam -#input=/home/zzh/data/wgs_na12878.bam +#input=/home/zzh/data/bam/zy_tumor.bam +input=/home/zzh/data/wgs_na12878.bam #input=~/data/bam/100w.bam #input=~/data/bam/t100w.sam #input=~/data/bam/1k.sam diff --git a/src/sam/markdups/pipeline_md.cpp b/src/sam/markdups/pipeline_md.cpp index 8032638..9ff11a3 100644 --- a/src/sam/markdups/pipeline_md.cpp +++ b/src/sam/markdups/pipeline_md.cpp @@ -1016,15 +1016,13 @@ static void doGenRE(PipelineArg &pipeArg) { const int numThread = pipeArg.genREThreadNum; // / 4; kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); - -#if 1 // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 // 轮询每个线程中未找到匹配的read,找到匹配的 genREData.unpairedDic.clear(); genREData.unpairedPosArr.clear(); - vector &pairs = genREData.pairsArr[g_gArg.num_threads]; + vector &pairs = genREData.pairsArr[numThread]; pairs.clear(); - for (int i = 0; i < g_gArg.num_threads; ++i) { + for (int i = 0; i < numThread; ++i) { for (auto &val : genREData.unpairedDicArr[i]) { const string &key = val.first; const ReadEnds &fragEnd = val.second.unpairedRE; @@ -1048,8 +1046,6 @@ static void doGenRE(PipelineArg &pipeArg) { unpairArrInfo.taskSeq = readData.taskSeq; unpairArrInfo.readNameSet.insert(e.first); } -#endif - } // end for step 2 generate read ends @@ -1058,57 +1054,23 @@ static void doSort(PipelineArg &pipeArg) { // return; GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; -// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl; -// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl; + SortMarkData &smd = *(SortMarkData *)sortData.dataPtr; + // cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl; + // cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl; + smd.unpairedDic = genREData.unpairedDic; + smd.unpairedPosArr = genREData.unpairedPosArr; -#if 1 - sortData.CopyGenREData(genREData); -#else - sortData.unpairedDic.clear(); - sortData.unpairedPosArr.clear(); - const int numThread = pipeArg.genREThreadNum; - // / 4; - - vector &pairs = genREData.pairsArr[numThread]; - pairs.clear(); - for (int i = 0; i < numThread; ++i) { - for (auto &val : genREData.unpairedDicArr[i]) { - const string &key = val.first; - const ReadEnds &fragEnd = val.second.unpairedRE; - if (sortData.unpairedDic.find(key) == sortData.unpairedDic.end()) { - sortData.unpairedDic[key] = {(int64_t)pipeArg.sortOrder, fragEnd}; - } else { // 找到了pairend - auto &pairedEnds = sortData.unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - sortData.unpairedDic.erase(key); // 删除找到的pairend - } - } - } - sort(pairs.begin(), pairs.end()); - - // create unpaired info - for (auto &e : sortData.unpairedDic) { - auto posKey = e.second.unpairedRE.posKey; - auto &unpairArrInfo = sortData.unpairedPosArr[posKey]; - unpairArrInfo.unpairedNum++; - unpairArrInfo.taskSeq = pipeArg.sortOrder; - unpairArrInfo.readNameSet.insert(e.first); - } - -#endif - - sortData.pairs.clear(); - sortData.frags.clear(); + smd.pairs.clear(); + smd.frags.clear(); const ReadEnds *pRE; ReadEndsHeap pairsHeap, fragsHeap; pairsHeap.Init(&genREData.pairsArr); while ((pRE = pairsHeap.Pop()) != nullptr) { - sortData.pairs.push_back(*pRE); + smd.pairs.push_back(*pRE); } fragsHeap.Init(&genREData.fragsArr); while ((pRE = fragsHeap.Pop()) != nullptr) { - sortData.frags.push_back(*pRE); + smd.frags.push_back(*pRE); } } // for step-4 sort @@ -1124,14 +1086,16 @@ static void doMarkDup(PipelineArg &pipeArg) { mdData.pairSingletonIdx.clear(); tm_arr[5].acc_start(); - mdData.CopySortData(sortData); + auto tmpPtr = mdData.dataPtr; + mdData.dataPtr = sortData.dataPtr; + sortData.dataPtr = tmpPtr; tm_arr[5].acc_end(); - + SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; // 先处理 pair - processPairs(mdData.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx, + processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx, &mdData.pairSingletonIdx); // 再处理frag - processFrags(mdData.frags, &mdData.fragDupIdx); + processFrags(smd.frags, &mdData.fragDupIdx); } template @@ -1172,6 +1136,8 @@ static void doIntersect(PipelineArg &pipeArg) { return; // 要等第二部分数据才能进行计算 MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM]; + SortMarkData &lpSM = *(SortMarkData*)lp.dataPtr; + SortMarkData &pSM = *(SortMarkData *)p.dataPtr; vector reArr; DPSet dupIdx; @@ -1185,7 +1151,7 @@ static void doIntersect(PipelineArg &pipeArg) { // 先处理重叠的frags // cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl; - getIntersectData(lp.frags, p.frags, &reArr); + getIntersectData(lpSM.frags, pSM.frags, &reArr); processFrags(reArr, &dupIdx, ¬DupIdx); refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx); refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx); @@ -1195,11 +1161,11 @@ static void doIntersect(PipelineArg &pipeArg) { reArr.clear(); dupIdx.clear(); notDupIdx.clear(); - getIntersectData(lp.pairs, p.pairs, &reArr, true); -// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl; -// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t" -// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl; -// cout << "intersect size: " << reArr.size() << endl; + getIntersectData(lpSM.pairs, pSM.pairs, &reArr, true); + // cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl; + // cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t" + // << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl; + // cout << "intersect size: " << reArr.size() << endl; processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, ¬SingletonIdx); refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, @@ -1211,16 +1177,16 @@ static void doIntersect(PipelineArg &pipeArg) { CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 MDSet addToGlobal; int64_t prevLastPos = 0, nextFirstPos = 0; - if (lp.frags.size() > 0) prevLastPos = lp.frags.back().posKey; - if (p.frags.size() > 0) nextFirstPos = p.frags[0].posKey; + if (lpSM.frags.size() > 0) prevLastPos = lpSM.frags.back().posKey; + if (pSM.frags.size() > 0) nextFirstPos = pSM.frags[0].posKey; //cout << "dic size: " << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size() // << endl; - for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read + for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read auto &readName = prevUnpair.first; auto &prevPosInfo = prevUnpair.second; auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read - auto &nextPosInfo = p.unpairedDic[readName]; + if (pSM.unpairedDic.find(readName) != pSM.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read + auto &nextPosInfo = pSM.unpairedDic[readName]; auto &nextFragEnd = nextPosInfo.unpairedRE; int64_t prevPosKey = prevFragEnd.posKey; modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read @@ -1229,10 +1195,10 @@ static void doIntersect(PipelineArg &pipeArg) { UnpairedPosInfo *prevUnpairInfoP = nullptr; UnpairedPosInfo *nextUnpairInfoP = nullptr; - if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end()) - prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; - if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) - nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; + if (lpSM.unpairedPosArr.find(prevPosKey) != lpSM.unpairedPosArr.end()) + prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey]; + if (pSM.unpairedPosArr.find(prevPosKey) != pSM.unpairedPosArr.end()) + nextUnpairInfoP = &pSM.unpairedPosArr[prevPosKey]; // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 // 1. // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; @@ -1255,23 +1221,23 @@ static void doIntersect(PipelineArg &pipeArg) { auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr prevPairArr.push_back(prevFragEnd); if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况 - getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); + getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr); } // 第一种情况,第二种情况下都会出现,复杂情况一 auto gPosInfo = g.unpairedPosArr.find(prevPosKey); if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致 auto &gUnpairInfo = gPosInfo->second; - auto pPosInfo = p.unpairedPosArr.find(nextPosKey); - if (pPosInfo != p.unpairedPosArr.end()) { + auto pPosInfo = pSM.unpairedPosArr.find(nextPosKey); + if (pPosInfo != pSM.unpairedPosArr.end()) { auto &pUnpairInfo = pPosInfo->second; for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的 if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) { auto pe = g.unpairedDic[rn].unpairedRE; - auto fe = p.unpairedDic[rn].unpairedRE; + auto fe = pSM.unpairedDic[rn].unpairedRE; modifyPairedEnds(fe, &pe); prevPairArr.push_back(pe); g.unpairedDic.erase(rn); - p.unpairedDic.erase(rn); + pSM.unpairedDic.erase(rn); } } } @@ -1286,8 +1252,8 @@ static void doIntersect(PipelineArg &pipeArg) { auto &prevPairArr = prevUnpairInfoP->pairArr; prevPairArr.push_back(prevFragEnd); if (addDataToPos) { - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); - getEqualRE(prevFragEnd, p.pairs, &nextPairArr); + getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); + getEqualRE(prevFragEnd, pSM.pairs, &nextPairArr); } // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) recalcPos[ck] = nextPosInfo.taskSeq; @@ -1298,26 +1264,26 @@ static void doIntersect(PipelineArg &pipeArg) { auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr prevPairArr.push_back(prevFragEnd); if (addDataToPos) // 第二种情况 - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); recalcPos[ck] = prevPosInfo.taskSeq; std::sort(prevPairArr.begin(), prevPairArr.end()); } } else { // 第四种情况 if (prevUnpairInfoP == nullptr) { - prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; + prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey]; prevUnpairInfoP->taskSeq = lp.taskSeq; } auto &prevPairArr = prevUnpairInfoP->pairArr; prevPairArr.push_back(prevFragEnd); if (addDataToPos) { - getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); - getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr); + getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); } recalcPos[ck] = prevPosInfo.taskSeq; std::sort(prevPairArr.begin(), prevPairArr.end()); } } - p.unpairedDic.erase(readName); // 在next task里删除该read + pSM.unpairedDic.erase(readName); // 在next task里删除该read // p.unpairedPosArr.erase(nextPosKey); // p.unpairedPosArr[nextPosKey]. } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read @@ -1354,7 +1320,7 @@ static void doIntersect(PipelineArg &pipeArg) { if (taskSeq < lp.taskSeq) pairArrP = &g.unpairedPosArr[posKey].pairArr; else - pairArrP = &lp.unpairedPosArr[posKey].pairArr; + pairArrP = &lpSM.unpairedPosArr[posKey].pairArr; processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); if (taskSeq < lp.taskSeq) @@ -1627,9 +1593,9 @@ static void mergeAllTask(PipelineArg &pipeArg) { // cout << "last task start" << endl; MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM]; IntersectData &g = pipeArg.intersectData; - + SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; // 遗留的未匹配的pair - for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read + for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read auto &readName = prevUnpair.first; auto &prevPosInfo = prevUnpair.second; auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end diff --git a/src/sam/markdups/pipeline_md.h b/src/sam/markdups/pipeline_md.h index d2b3d60..b2c936c 100644 --- a/src/sam/markdups/pipeline_md.h +++ b/src/sam/markdups/pipeline_md.h @@ -25,19 +25,15 @@ struct GenREData { UnpairedPositionMap unpairedPosArr; // }; -struct SortData { +struct SortMarkData { vector pairs; // 成对的reads vector frags; // 暂未找到配对的reads UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 - void CopyGenREData(GenREData &genREData) { - // unpairedDic.clear(); - // unpairedPosArr.clear(); - // for (auto &val : genREData.unpairedDic) unpairedDic.insert(val); - // for (auto &val : genREData.unpairedPosArr) unpairedPosArr.insert(val); - unpairedDic = genREData.unpairedDic; - unpairedPosArr = genREData.unpairedPosArr; - } +}; + +struct SortData { + volatile void *dataPtr; // SortMarkData pointer }; struct MarkDupData { @@ -48,26 +44,7 @@ struct MarkDupData { DPSet pairRepIdx; // pair的dupset代表read的索引 MDSet pairSingletonIdx; // 某位置只有一对read的所有read pair个数 - // 从sort step拷贝过来的数据 - vector pairs; // 成对的reads - vector frags; // 暂未找到配对的reads - UnpairedNameMap unpairedDic; // 用来寻找pair end - UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 - - void CopySortData(SortData &sortData) { - // pairs.clear(); - // frags.clear(); - // unpairedDic.clear(); - // unpairedPosArr.clear(); - // for (auto &val : sortData.pairs) pairs.push_back(val); - // for (auto &val : sortData.frags) frags.push_back(val); - // for (auto &val : sortData.unpairedDic) unpairedDic.insert(val); - // for (auto &val : sortData.unpairedPosArr) unpairedPosArr.insert(val); - pairs = sortData.pairs; - frags = sortData.frags; - unpairedDic = sortData.unpairedDic; - unpairedPosArr = sortData.unpairedPosArr; - } + volatile void *dataPtr; // SortMarkData pointer }; struct IntersectData { @@ -89,27 +66,6 @@ struct IntersectData { vector> latterNotOpticalDupIdxArr; vector> latterNotRepIdxArr; vector> latterNotSingletonIdxArr; - - MarkDupData lp; - MarkDupData p; - void copyOneMarkdupData(MarkDupData &src, MarkDupData &dst) { - dst.taskSeq = src.taskSeq; - dst.pairDupIdx = src.pairDupIdx; - dst.pairOpticalDupIdx = src.pairOpticalDupIdx; - dst.fragDupIdx = src.fragDupIdx; - dst.pairRepIdx = src.pairRepIdx; - dst.pairSingletonIdx = src.pairSingletonIdx; - - // 从sort step拷贝过来的数据 - dst.pairs = src.pairs; - dst.frags = src.frags; - dst.unpairedDic = src.unpairedDic; - dst.unpairedPosArr = src.unpairedPosArr; - } - void CopyMarkDupData(MarkDupData &_lp, MarkDupData &_p) { - copyOneMarkdupData(_lp, lp); - copyOneMarkdupData(_p, p); - } }; // 记录流水线状态,task的序号,以及某阶段是否结束 @@ -139,8 +95,12 @@ struct PipelineArg { genRESig = NEW_LOCK(0); // 最大值2, 双buffer sortSig = NEW_LOCK(0); markDupSig = NEW_LOCK(0); + for (int i = 0; i < SORTBUFNUM; ++i) { sortData[i].dataPtr = &sortMarkData[i]; } + for (int i = 0; i < MARKBUFNUM; ++i) { markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM]; } } + SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM]; + // for step-1 read ReadData readData; // for step-2 generate readends