用指针交换代替拷贝,好像性能没有啥提升

This commit is contained in:
zzh 2024-11-21 12:10:03 +08:00
parent c74e581e63
commit e348f96504
3 changed files with 61 additions and 135 deletions

4
run.sh
View File

@ -7,8 +7,8 @@ nthread=32
#nthread=64
#nthread=128
#input=/home/zzh/data/bam/zy_normal.bam
input=/home/zzh/data/bam/zy_tumor.bam
#input=/home/zzh/data/wgs_na12878.bam
#input=/home/zzh/data/bam/zy_tumor.bam
input=/home/zzh/data/wgs_na12878.bam
#input=~/data/bam/100w.bam
#input=~/data/bam/t100w.sam
#input=~/data/bam/1k.sam

View File

@ -1016,15 +1016,13 @@ static void doGenRE(PipelineArg &pipeArg) {
const int numThread = pipeArg.genREThreadNum;
// / 4;
kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread);
#if 1
// 用来在genRE阶段计算一些Sort阶段应该计算的数据保持每个step用时更平衡
// 轮询每个线程中未找到匹配的read找到匹配的
genREData.unpairedDic.clear();
genREData.unpairedPosArr.clear();
vector<ReadEnds> &pairs = genREData.pairsArr[g_gArg.num_threads];
vector<ReadEnds> &pairs = genREData.pairsArr[numThread];
pairs.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) {
for (int i = 0; i < numThread; ++i) {
for (auto &val : genREData.unpairedDicArr[i]) {
const string &key = val.first;
const ReadEnds &fragEnd = val.second.unpairedRE;
@ -1048,8 +1046,6 @@ static void doGenRE(PipelineArg &pipeArg) {
unpairArrInfo.taskSeq = readData.taskSeq;
unpairArrInfo.readNameSet.insert(e.first);
}
#endif
}
// end for step 2 generate read ends
@ -1058,57 +1054,23 @@ static void doSort(PipelineArg &pipeArg) {
// return;
GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM];
SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM];
SortMarkData &smd = *(SortMarkData *)sortData.dataPtr;
// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl;
// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl;
smd.unpairedDic = genREData.unpairedDic;
smd.unpairedPosArr = genREData.unpairedPosArr;
#if 1
sortData.CopyGenREData(genREData);
#else
sortData.unpairedDic.clear();
sortData.unpairedPosArr.clear();
const int numThread = pipeArg.genREThreadNum;
// / 4;
vector<ReadEnds> &pairs = genREData.pairsArr[numThread];
pairs.clear();
for (int i = 0; i < numThread; ++i) {
for (auto &val : genREData.unpairedDicArr[i]) {
const string &key = val.first;
const ReadEnds &fragEnd = val.second.unpairedRE;
if (sortData.unpairedDic.find(key) == sortData.unpairedDic.end()) {
sortData.unpairedDic[key] = {(int64_t)pipeArg.sortOrder, fragEnd};
} else { // 找到了pairend
auto &pairedEnds = sortData.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
pairs.push_back(pairedEnds);
sortData.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
sort(pairs.begin(), pairs.end());
// create unpaired info
for (auto &e : sortData.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey;
auto &unpairArrInfo = sortData.unpairedPosArr[posKey];
unpairArrInfo.unpairedNum++;
unpairArrInfo.taskSeq = pipeArg.sortOrder;
unpairArrInfo.readNameSet.insert(e.first);
}
#endif
sortData.pairs.clear();
sortData.frags.clear();
smd.pairs.clear();
smd.frags.clear();
const ReadEnds *pRE;
ReadEndsHeap pairsHeap, fragsHeap;
pairsHeap.Init(&genREData.pairsArr);
while ((pRE = pairsHeap.Pop()) != nullptr) {
sortData.pairs.push_back(*pRE);
smd.pairs.push_back(*pRE);
}
fragsHeap.Init(&genREData.fragsArr);
while ((pRE = fragsHeap.Pop()) != nullptr) {
sortData.frags.push_back(*pRE);
smd.frags.push_back(*pRE);
}
}
// for step-4 sort
@ -1124,14 +1086,16 @@ static void doMarkDup(PipelineArg &pipeArg) {
mdData.pairSingletonIdx.clear();
tm_arr[5].acc_start();
mdData.CopySortData(sortData);
auto tmpPtr = mdData.dataPtr;
mdData.dataPtr = sortData.dataPtr;
sortData.dataPtr = tmpPtr;
tm_arr[5].acc_end();
SortMarkData &smd = *(SortMarkData *)mdData.dataPtr;
// 先处理 pair
processPairs(mdData.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx,
processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx,
&mdData.pairSingletonIdx);
// 再处理frag
processFrags(mdData.frags, &mdData.fragDupIdx);
processFrags(smd.frags, &mdData.fragDupIdx);
}
template <typename T>
@ -1172,6 +1136,8 @@ static void doIntersect(PipelineArg &pipeArg) {
return; // 要等第二部分数据才能进行计算
MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM];
MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM];
SortMarkData &lpSM = *(SortMarkData*)lp.dataPtr;
SortMarkData &pSM = *(SortMarkData *)p.dataPtr;
vector<ReadEnds> reArr;
DPSet<DupInfo> dupIdx;
@ -1185,7 +1151,7 @@ static void doIntersect(PipelineArg &pipeArg) {
// 先处理重叠的frags
// cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl;
getIntersectData(lp.frags, p.frags, &reArr);
getIntersectData(lpSM.frags, pSM.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx);
refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx);
@ -1195,7 +1161,7 @@ static void doIntersect(PipelineArg &pipeArg) {
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
getIntersectData(lp.pairs, p.pairs, &reArr, true);
getIntersectData(lpSM.pairs, pSM.pairs, &reArr, true);
// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl;
// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t"
// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl;
@ -1211,16 +1177,16 @@ static void doIntersect(PipelineArg &pipeArg) {
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
MDSet<int64_t> addToGlobal;
int64_t prevLastPos = 0, nextFirstPos = 0;
if (lp.frags.size() > 0) prevLastPos = lp.frags.back().posKey;
if (p.frags.size() > 0) nextFirstPos = p.frags[0].posKey;
if (lpSM.frags.size() > 0) prevLastPos = lpSM.frags.back().posKey;
if (pSM.frags.size() > 0) nextFirstPos = pSM.frags[0].posKey;
//cout << "dic size: " << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size()
// << endl;
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto &nextPosInfo = p.unpairedDic[readName];
if (pSM.unpairedDic.find(readName) != pSM.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto &nextPosInfo = pSM.unpairedDic[readName];
auto &nextFragEnd = nextPosInfo.unpairedRE;
int64_t prevPosKey = prevFragEnd.posKey;
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下poskey可能是后面的read
@ -1229,10 +1195,10 @@ static void doIntersect(PipelineArg &pipeArg) {
UnpairedPosInfo *prevUnpairInfoP = nullptr;
UnpairedPosInfo *nextUnpairInfoP = nullptr;
if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end())
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
if (lpSM.unpairedPosArr.find(prevPosKey) != lpSM.unpairedPosArr.end())
prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey];
if (pSM.unpairedPosArr.find(prevPosKey) != pSM.unpairedPosArr.end())
nextUnpairInfoP = &pSM.unpairedPosArr[prevPosKey];
// pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定
// 1.
// prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据;
@ -1255,23 +1221,23 @@ static void doIntersect(PipelineArg &pipeArg) {
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr);
}
// 第一种情况,第二种情况下都会出现,复杂情况一
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的刚好和该位点一致
auto &gUnpairInfo = gPosInfo->second;
auto pPosInfo = p.unpairedPosArr.find(nextPosKey);
if (pPosInfo != p.unpairedPosArr.end()) {
auto pPosInfo = pSM.unpairedPosArr.find(nextPosKey);
if (pPosInfo != pSM.unpairedPosArr.end()) {
auto &pUnpairInfo = pPosInfo->second;
for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname看是否有匹配的
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) {
auto pe = g.unpairedDic[rn].unpairedRE;
auto fe = p.unpairedDic[rn].unpairedRE;
auto fe = pSM.unpairedDic[rn].unpairedRE;
modifyPairedEnds(fe, &pe);
prevPairArr.push_back(pe);
g.unpairedDic.erase(rn);
p.unpairedDic.erase(rn);
pSM.unpairedDic.erase(rn);
}
}
}
@ -1286,8 +1252,8 @@ static void doIntersect(PipelineArg &pipeArg) {
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &nextPairArr);
getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr);
getEqualRE(prevFragEnd, pSM.pairs, &nextPairArr);
}
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
recalcPos[ck] = nextPosInfo.taskSeq;
@ -1298,26 +1264,26 @@ static void doIntersect(PipelineArg &pipeArg) {
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) // 第二种情况
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr);
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
} else { // 第四种情况
if (prevUnpairInfoP == nullptr) {
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey];
prevUnpairInfoP->taskSeq = lp.taskSeq;
}
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr);
getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr);
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
}
p.unpairedDic.erase(readName); // 在next task里删除该read
pSM.unpairedDic.erase(readName); // 在next task里删除该read
// p.unpairedPosArr.erase(nextPosKey);
// p.unpairedPosArr[nextPosKey].
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
@ -1354,7 +1320,7 @@ static void doIntersect(PipelineArg &pipeArg) {
if (taskSeq < lp.taskSeq)
pairArrP = &g.unpairedPosArr[posKey].pairArr;
else
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
pairArrP = &lpSM.unpairedPosArr[posKey].pairArr;
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
if (taskSeq < lp.taskSeq)
@ -1627,9 +1593,9 @@ static void mergeAllTask(PipelineArg &pipeArg) {
// cout << "last task start" << endl;
MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM];
IntersectData &g = pipeArg.intersectData;
SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr;
// 遗留的未匹配的pair
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end

View File

@ -25,19 +25,15 @@ struct GenREData {
UnpairedPositionMap unpairedPosArr; //
};
struct SortData {
struct SortMarkData {
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
void CopyGenREData(GenREData &genREData) {
// unpairedDic.clear();
// unpairedPosArr.clear();
// for (auto &val : genREData.unpairedDic) unpairedDic.insert(val);
// for (auto &val : genREData.unpairedPosArr) unpairedPosArr.insert(val);
unpairedDic = genREData.unpairedDic;
unpairedPosArr = genREData.unpairedPosArr;
}
};
struct SortData {
volatile void *dataPtr; // SortMarkData pointer
};
struct MarkDupData {
@ -48,26 +44,7 @@ struct MarkDupData {
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
MDSet<int64_t> pairSingletonIdx; // 某位置只有一对read的所有read pair个数
// 从sort step拷贝过来的数据
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
void CopySortData(SortData &sortData) {
// pairs.clear();
// frags.clear();
// unpairedDic.clear();
// unpairedPosArr.clear();
// for (auto &val : sortData.pairs) pairs.push_back(val);
// for (auto &val : sortData.frags) frags.push_back(val);
// for (auto &val : sortData.unpairedDic) unpairedDic.insert(val);
// for (auto &val : sortData.unpairedPosArr) unpairedPosArr.insert(val);
pairs = sortData.pairs;
frags = sortData.frags;
unpairedDic = sortData.unpairedDic;
unpairedPosArr = sortData.unpairedPosArr;
}
volatile void *dataPtr; // SortMarkData pointer
};
struct IntersectData {
@ -89,27 +66,6 @@ struct IntersectData {
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
vector<MDSet<int64_t>> latterNotRepIdxArr;
vector<MDSet<int64_t>> latterNotSingletonIdxArr;
MarkDupData lp;
MarkDupData p;
void copyOneMarkdupData(MarkDupData &src, MarkDupData &dst) {
dst.taskSeq = src.taskSeq;
dst.pairDupIdx = src.pairDupIdx;
dst.pairOpticalDupIdx = src.pairOpticalDupIdx;
dst.fragDupIdx = src.fragDupIdx;
dst.pairRepIdx = src.pairRepIdx;
dst.pairSingletonIdx = src.pairSingletonIdx;
// 从sort step拷贝过来的数据
dst.pairs = src.pairs;
dst.frags = src.frags;
dst.unpairedDic = src.unpairedDic;
dst.unpairedPosArr = src.unpairedPosArr;
}
void CopyMarkDupData(MarkDupData &_lp, MarkDupData &_p) {
copyOneMarkdupData(_lp, lp);
copyOneMarkdupData(_p, p);
}
};
// 记录流水线状态task的序号以及某阶段是否结束
@ -139,8 +95,12 @@ struct PipelineArg {
genRESig = NEW_LOCK(0); // 最大值2, 双buffer
sortSig = NEW_LOCK(0);
markDupSig = NEW_LOCK(0);
for (int i = 0; i < SORTBUFNUM; ++i) { sortData[i].dataPtr = &sortMarkData[i]; }
for (int i = 0; i < MARKBUFNUM; ++i) { markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM]; }
}
SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM];
// for step-1 read
ReadData readData;
// for step-2 generate readends