清除了一部分singleton代码

This commit is contained in:
zzh 2024-11-21 23:11:07 +08:00
parent 4ba4a1e670
commit d37ea0afcc
1 changed files with 10 additions and 574 deletions

View File

@ -22,14 +22,6 @@
using std::cout;
using std::vector;
/* 查找 */
// template<class Itr, class T>
// static inline Itr binaryFind(Itr first, Itr last, const T &val)
// {
// first = std::lower_bound(first, last, val);
// return (first != last && *first == val) ? first : last;
// }
/* 排序 */
static inline void sortReadEndsArr(vector<ReadEnds> &arr) {
size_t blockSize = 64 * 1024;
@ -177,150 +169,6 @@ static void getEqualRE(const ReadEnds &re, vector<ReadEnds> &src, vector<ReadEnd
#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread))
#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread))
static void threadGenerateReadEnds(void *data, long idx, int tid) {
auto &p = (*(ParallelDataArg *)data).threadData[idx];
auto &mdArg = *(*(ParallelDataArg *)data).mdArg;
auto &rnParser = g_vRnParser[idx];
int nThread = g_gArg.num_threads;
p.pairs.clear();
p.frags.clear();
p.unpairedDic.clear();
size_t start_id = LOWER_BOUND(idx, nThread, mdArg.bams.size());
size_t end_id = UPPER_BOUND(idx, nThread, mdArg.bams.size());
// if (mdArg.taskSeq == 163)
// cout << "tid: " << tid << "\t" << idx << "\t" << start_id << "\t" << end_id << "\t" << mdArg.bams.size() <<
// endl;
for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read
BamWrap *bw = mdArg.bams[i];
const int64_t bamIdx = mdArg.bamStartIdx + i;
if (bw->GetReadUnmappedFlag()) {
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
} else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对
ReadEnds fragEnd;
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
p.frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了
string key = bw->query_name();
if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {mdArg.taskSeq, fragEnd};
} else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
p.pairs.push_back(pairedEnds);
p.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
}
// sortReadEndsArr(p.frags);
sort(p.frags.begin(), p.frags.end());
sort(p.pairs.begin(), p.pairs.end());
// cout << tid << "\tthread end" << endl;
// if (mdArg.taskSeq == 163)
// cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl;
}
static void *threadSortParallelPairs(void *data) {
auto &pd = *(ParallelDataArg *)data;
vector<ReadEnds> &pairs = pd.threadData[pd.numThread].pairs;
sort(pairs.begin(), pairs.end());
// 合并所有pairs
pd.pairs2d.clear();
for (int i = 0; i < pd.numThread; ++i) {
pd.pairs2d.push_back(&pd.threadData[i].pairs);
}
pd.pairs2d.push_back(&pd.threadData[pd.numThread].pairs);
ReadEndsQueue pairsQue;
pairsQue.Init(&pd.pairs2d);
const ReadEnds *pRE;
while ((pRE = pairsQue.Pop()) != nullptr) {
pd.mdArg->pairs.push_back(*pRE);
}
return nullptr;
}
static void *threadSortParallelFrags(void *data) {
auto &pd = *(ParallelDataArg *)data;
ReadEndsQueue fragsQue;
const ReadEnds *pRE;
pd.frags2d.clear();
for (int i = 0; i < pd.numThread; ++i) {
pd.frags2d.push_back(&pd.threadData[i].frags);
}
fragsQue.Init(&pd.frags2d);
while ((pRE = fragsQue.Pop()) != nullptr) {
pd.mdArg->frags.push_back(*pRE);
}
return nullptr;
}
static void *threadCreateUnpairInfo(void *data) {
auto &pd = *(ParallelDataArg *)data;
auto &p = *pd.mdArg;
for (auto &e : p.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey;
auto &unpairArrInfo = p.unpairedPosArr[posKey];
unpairArrInfo.unpairedNum++;
unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first);
}
return nullptr;
}
/* 单线程生成readends (第一步)*/
static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
auto &p = *arg;
p.pairs.clear();
p.frags.clear();
p.unpairedDic.clear();
p.unpairedPosArr.clear();
pdArg.mdArg = arg;
tm_arr[8].acc_start();
kt_for(g_gArg.num_threads / 2, threadGenerateReadEnds, &pdArg, g_gArg.num_threads);
// tm_arr[8].acc_end();
// 合并各个线程数据
// 查找未匹配的frags
tm_arr[9].acc_start();
vector<ReadEnds> &pairs = pdArg.threadData[g_gArg.num_threads].pairs;
pairs.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) {
for (auto &val : pdArg.threadData[i].unpairedDic) {
const string &key = val.first;
const ReadEnds &fragEnd = val.second.unpairedRE;
if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {p.taskSeq, fragEnd};
} else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
pairs.push_back(pairedEnds);
p.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
threadCreateUnpairInfo(&pdArg);
threadSortParallelPairs(&pdArg);
tm_arr[9].acc_end();
tm_arr[8].acc_end();
tm_arr[11].acc_start();
// pthread_t tid;
// pthread_create(&tid, nullptr, threadSortParallelFrags, &pdArg);
threadSortParallelFrags(&pdArg);
// tm_arr[12].acc_start();
// threadSortParallelPairs(&pdArg);
// tm_arr[12].acc_end();
// tm_arr[13].acc_start();
// threadCreateUnpairInfo(&pdArg);
// tm_arr[13].acc_end();
// pthread_join(tid, nullptr);
tm_arr[11].acc_end();
}
/* 处理pairs */
static void processPairs(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
DPSet<DupInfo> *repIdx, MDSet<int64_t> *singletonIdx, MDSet<int64_t> *notDupIdx = nullptr,
@ -382,8 +230,6 @@ static void markdups(MarkDupDataArg *arg) {
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
// 先处理 pair
processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx, &p.pairSingletonIdx);
// cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl;
// 再处理frag
processFrags(p.frags, &p.fragDupIdx);
}
@ -457,10 +303,6 @@ static inline void refreshPairDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opt
p.pairSingletonIdx.erase(idx);
}
for (auto idx : notDupIdx) {
// if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl;
// if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl;
// if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl;
// if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl;
lp.pairDupIdx.erase(idx);
p.pairDupIdx.erase(idx);
}
@ -486,10 +328,7 @@ static void refeshTaskDupInfo(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDup
MDSet<int64_t> &latterSingletonIdx, MDSet<int64_t> &latterNotDupIdx,
MDSet<int64_t> &latterNotOpticalDupIdx, MDSet<int64_t> &latterNotRepIdx,
MDSet<int64_t> &latterNotSingletonIdx) {
for (auto idx : dupIdx) {
latterDupIdx.insert(idx);
// latterNotDupIdx.erase(idx); // 后来的更新为准
}
for (auto idx : dupIdx) latterDupIdx.insert(idx);
for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx);
for (auto idx : repIdx) latterRepIdx.insert(idx);
for (auto idx : singletonIdx) latterSingletonIdx.insert(idx);
@ -512,8 +351,6 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> &notDupI
auto ae = dupArr.end();
auto bi = cacheDupIdx.begin();
auto be = cacheDupIdx.end();
// auto bi = dupIdx.begin();
// auto be = dupIdx.end();
T val = 0;
while (ai != ae && bi != be) {
@ -544,368 +381,6 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> &notDupI
dupArr = midArr;
}
static int64_t llp_frag_end_pos = 0;
static int64_t llp_pair_end_pos = 0;
static int64_t gSingletonNum = 0;
/* 处理相邻的两个任务,有相交叉的数据 */
static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) {
auto &lp = *lastArg;
auto &p = *curArg;
auto &g = *gDataArg;
vector<ReadEnds> reArr;
DPSet<DupInfo> dupIdx;
MDSet<int64_t> opticalDupIdx;
DPSet<DupInfo> repIdx;
MDSet<int64_t> singletonIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notRepIdx;
MDSet<int64_t> notSingletonIdx;
// 先处理重叠的frags
getIntersectData(lp.frags, p.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p);
// cout << "578" << endl;
if (!p.frags.empty())
if (llp_frag_end_pos >= p.frags.begin()->posKey) {
cout << "frags : " << llp_frag_end_pos << "\t" << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey
<< endl;
}
if (!p.pairs.empty())
if (llp_pair_end_pos >= p.pairs.begin()->posKey) {
cout << "pairs : " << llp_pair_end_pos << "\t" << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey
<< endl;
}
if (!p.frags.empty())
llp_frag_end_pos = lp.frags.rbegin()->posKey;
if (!p.pairs.empty())
llp_pair_end_pos = lp.pairs.rbegin()->posKey;
// 再处理重叠的pairs
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
getIntersectData(lp.pairs, p.pairs, &reArr, true);
// cout << "pairs: " << lp.pairs.size() << "\t" << p.pairs.size() << endl;
processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx,
&notSingletonIdx);
// cout << "size: " << reArr.size() << endl;
refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx,
notSingletonIdx, &lp, &p);
// cout << "pairs size: " << lp.pairs.size() << "\t" << p.pairs.size() << endl;
// cout << "range: " << lp.pairs.front().posKey << "\t" << lp.pairs.back().posKey << "\t"
// << p.pairs.front().posKey << "\t" << p.pairs.back().posKey << endl;
// cout << "intersect size: " << reArr.size() << endl;
// 处理之前未匹配的部分
map<CalcKey, int64_t> recalcPos;
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
MDSet<int64_t> addToGlobal;
int64_t prevLastPos = 0, nextFirstPos = 0;
if (lp.frags.size() > 0)
prevLastPos = lp.frags.back().posKey;
if (p.frags.size() > 0)
nextFirstPos = p.frags[0].posKey;
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto &nextPosInfo = p.unpairedDic[readName];
auto &nextFragEnd = nextPosInfo.unpairedRE;
int64_t prevPosKey = prevFragEnd.posKey;
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下poskey可能是后面的read
int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey);
CalcKey ck = {prevPosKey, nextPosKey};
UnpairedPosInfo *prevUnpairInfoP = nullptr;
UnpairedPosInfo *nextUnpairInfoP = nullptr;
if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end())
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
// pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定
// 1.
// prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据;
// 2.
// prevpos在交叉部分之前nextpos在交叉部分需要获取lp中的相等read pair进行重新计算
// 复杂情况1. g中包含prevPosKey对应的unpairp中有对应的pair此时应该把这些pair考虑进去
// 3.
// prevpos在交叉部分nextpos在交叉部分之后需要获取p中的相等read pair进行重新计算
// 复杂情况2. p中是否包含prevPosKey对应的unpair
// 4.
// prevpos在交叉部分nextpos在交叉部分需要获取lp和p中的相等read pair进行重新计算
bool addDataToPos = true;
if (alreadyAdd.find(ck) != alreadyAdd.end()) {
// 之前已经添加过了后面就不用再添加数据了因为同一个位置可能找到两个及以上的unpair数据处理之前的数据时候可能已经添加了这些数据
addDataToPos = false;
} else
alreadyAdd.insert(ck);
if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
}
// 第一种情况,第二种情况下都会出现,复杂情况一
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的刚好和该位点一致
auto &gUnpairInfo = gPosInfo->second;
auto pPosInfo = p.unpairedPosArr.find(nextPosKey);
if (pPosInfo != p.unpairedPosArr.end()) {
auto &pUnpairInfo = pPosInfo->second;
for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname看是否有匹配的
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) {
auto pe = g.unpairedDic[rn].unpairedRE;
auto fe = p.unpairedDic[rn].unpairedRE;
modifyPairedEnds(fe, &pe);
prevPairArr.push_back(pe);
g.unpairedDic.erase(rn);
p.unpairedDic.erase(rn);
}
}
}
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
} else { // prevpos在交叉部分
if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况
if (nextUnpairInfoP != nullptr) { // 且在pos点next task有unpair这样才把这些数据放到next task里
auto &nextPairArr = nextUnpairInfoP->pairArr;
nextPairArr.push_back(prevFragEnd);
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &nextPairArr);
}
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
recalcPos[ck] = nextPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
std::sort(nextPairArr.begin(), nextPairArr.end());
} else { // next task在该位点没有unpair那就把数据放到prev task里
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) // 第二种情况
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
} else { // 第四种情况
if (prevUnpairInfoP == nullptr) {
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
prevUnpairInfoP->taskSeq = lp.taskSeq;
}
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
}
p.unpairedDic.erase(readName); // 在next task里删除该read
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
auto &remainPairArr = remainUnpairInfo.pairArr;
remainPairArr.push_back(remainFragEnd);
CalcKey ck = {remainPosKey, prevFragEnd.posKey};
recalcPos[ck] = remainPosInfo.taskSeq;
std::sort(remainPairArr.begin(), remainPairArr.end());
g.unpairedDic.erase(readName);
} else { // 都没找到,那就保存到遗留数据里
int64_t prevPosKey = prevFragEnd.posKey;
g.unpairedDic.insert(prevUnpair);
addToGlobal.insert(prevPosKey);
}
}
map<int64_t, TaskSeqDupInfo> taskChanged;
MDSet<int64_t> posProcessed;
for (auto &e : recalcPos) {
auto posKey = e.first.read1Pos;
if (posProcessed.find(posKey) != posProcessed.end())
continue;
posProcessed.insert(posKey);
auto taskSeq = e.second;
auto &t = taskChanged[taskSeq];
// 在对应的任务包含的dup idx里修改结果数据
vector<ReadEnds> *pairArrP = nullptr;
if (taskSeq < lp.taskSeq)
pairArrP = &g.unpairedPosArr[posKey].pairArr;
else
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
if (taskSeq < lp.taskSeq)
g.unpairedPosArr.erase(posKey);
}
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
// 放在这里因为lp中的unpairedPosArr中的readends可能会被修改比如optical duplicate
for (auto posKey : addToGlobal) {
g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
}
// 更新结果
for (auto &e : taskChanged) {
auto taskSeq = e.first;
auto &t = e.second;
if (taskSeq < lp.taskSeq) {
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq],
g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq],
g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq],
g.latterNotSingletonIdxArr[taskSeq]);
} else if (taskSeq == lp.taskSeq) {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
t.notRepIdx, t.notSingletonIdx, &lp, &p);
} else {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中
}
}
// 将dupidx放进全局数据
g.latterDupIdxArr.push_back(DPSet<DupInfo>());
g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterRepIdxArr.push_back(DPSet<DupInfo>());
g.latterSingletonIdxArr.push_back(MDSet<int64_t>());
g.latterNotDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotRepIdxArr.push_back(MDSet<int64_t>());
g.latterNotSingletonIdxArr.push_back(MDSet<int64_t>());
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
g.singletonIdxArr.push_back(vector<int64_t>());
auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
gSingletonNum += lp.pairSingletonIdx.size();
}
/* 当所有任务结束后global data里还有未处理的数据 */
static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
// cout << "last task start" << endl;
auto &lp = *task;
auto &g = *gDataArg;
// 遗留的未匹配的pair
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
remainUnpairInfo.pairArr.push_back(remainFragEnd);
g.unpairedDic.erase(readName);
} else {
g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数
}
}
map<int64_t, TaskSeqDupInfo> taskChanged;
for (auto &e : g.unpairedPosArr) {
auto posKey = e.first;
auto taskSeq = e.second.taskSeq;
auto &t = taskChanged[taskSeq];
auto &arr = g.unpairedPosArr[posKey].pairArr;
if (arr.size() > 1) {
std::sort(arr.begin(), arr.end());
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
} else if (arr.size() == 1) {
// gMetrics.singletonReads.insert(arr[0].read1IndexInFile);
t.singletonIdx.insert(arr[0].read1IndexInFile);
}
}
// 更新结果
vector<int64_t> addDup;
map<int64_t, int64_t> ndPosVal;
for (auto &e : taskChanged) {
auto taskSeq = e.first;
auto &t = e.second;
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq],
g.latterRepIdxArr[taskSeq], g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq],
g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq],
g.latterNotSingletonIdxArr[taskSeq]);
}
g.unpairedPosArr.clear();
// 将dupidx放进全局数据
vector<DupInfo> cacheDupIdx;
vector<DupInfo> midArr;
vector<int64_t> intCacheDupIdx;
vector<int64_t> intMidArr;
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) {
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr);
}
for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i],
intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i],
intCacheDupIdx, intMidArr);
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
g.singletonIdxArr.push_back(vector<int64_t>());
auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
}
// for step 2 generate read ends
// multi-thread generate read ends
@ -951,7 +426,7 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) {
}
}
}
//sortReadEndsArr(frags);
// sortReadEndsArr(frags);
sort(frags.begin(), frags.end());
sort(pairs.begin(), pairs.end());
}
@ -961,13 +436,9 @@ static void doGenRE(PipelineArg &pipeArg) {
GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM];
ReadData &readData = pipeArg.readData;
// cout << "dogen-gen buf : " << pipeArg.genREOrder % pipeArg.GENBUFNUM << endl;
// cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl;
// generate read ends
const int numThread = pipeArg.numThread;
// / 4;
kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread);
// 用来在genRE阶段计算一些Sort阶段应该计算的数据保持每个step用时更平衡
// 轮询每个线程中未找到匹配的read找到匹配的
@ -1008,8 +479,7 @@ static void doSort(PipelineArg &pipeArg) {
GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM];
SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM];
SortMarkData &smd = *(SortMarkData *)sortData.dataPtr;
// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl;
// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl;
smd.unpairedDic = genREData.unpairedDic;
smd.unpairedPosArr = genREData.unpairedPosArr;
@ -1101,28 +571,22 @@ static void doIntersect(PipelineArg &pipeArg) {
MDSet<int64_t> notSingletonIdx;
// 先处理重叠的frags
// cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl;
getIntersectData(lpSM.frags, pSM.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx);
refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx);
// 再处理重叠的pairs
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
getIntersectData(lpSM.pairs, pSM.pairs, &reArr, true);
// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl;
// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t"
// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl;
// cout << "intersect size: " << reArr.size() << endl;
processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx,
&notSingletonIdx);
refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx,
notSingletonIdx, lp, p);
// return;
// 处理之前未匹配的部分
map<CalcKey, int64_t> recalcPos;
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
@ -1130,8 +594,7 @@ static void doIntersect(PipelineArg &pipeArg) {
int64_t prevLastPos = 0, nextFirstPos = 0;
if (lpSM.frags.size() > 0) prevLastPos = lpSM.frags.back().posKey;
if (pSM.frags.size() > 0) nextFirstPos = pSM.frags[0].posKey;
//cout << "dic size: " << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size()
// << endl;
for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
@ -1235,8 +698,6 @@ static void doIntersect(PipelineArg &pipeArg) {
}
}
pSM.unpairedDic.erase(readName); // 在next task里删除该read
// p.unpairedPosArr.erase(nextPosKey);
// p.unpairedPosArr[nextPosKey].
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
@ -1279,10 +740,11 @@ static void doIntersect(PipelineArg &pipeArg) {
}
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
// 放在这里因为lp中的unpairedPosArr中的readends可能会被修改比如optical duplicate
// for (auto posKey : addToGlobal) {
// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
// }
// for (auto posKey : addToGlobal) {
// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
// }
// return;
/* 不需要把p中找不到lp的unpair放到global中否则最后找到pair后还要再执行一次冗余检测造成重复的冗余索引 */
// 更新结果
for (auto &e : taskChanged) {
@ -1303,21 +765,14 @@ static void doIntersect(PipelineArg &pipeArg) {
t.notRepIdx, t.notSingletonIdx, p, lp); // 把结果放到p中
}
}
// for (auto posKey : addToGlobal) {
// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
// }
// return;
// 将dupidx放进全局数据
g.latterDupIdxArr.push_back(DPSet<DupInfo>());
g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterRepIdxArr.push_back(DPSet<DupInfo>());
g.latterSingletonIdxArr.push_back(MDSet<int64_t>());
g.latterNotDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotRepIdxArr.push_back(MDSet<int64_t>());
g.latterNotSingletonIdxArr.push_back(MDSet<int64_t>());
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
@ -1334,12 +789,6 @@ static void doIntersect(PipelineArg &pipeArg) {
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
g.singletonIdxArr.push_back(vector<int64_t>());
auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
// gSingletonNum += lp.pairSingletonIdx.size();
}
static void *pipeRead(void *data) {
@ -1542,7 +991,6 @@ static void *pipeIntersect(void *data) {
/* 当所有任务结束后global data里还有未处理的数据 */
static void mergeAllTask(PipelineArg &pipeArg) {
// cout << "last task start" << endl;
MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM];
IntersectData &g = pipeArg.intersectData;
SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr;
@ -1577,9 +1025,6 @@ static void mergeAllTask(PipelineArg &pipeArg) {
std::sort(arr.begin(), arr.end());
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
} else if (arr.size() == 1) {
// gMetrics.singletonReads.insert(arr[0].read1IndexInFile);
t.singletonIdx.insert(arr[0].read1IndexInFile);
}
}
// 更新结果
@ -1609,9 +1054,6 @@ static void mergeAllTask(PipelineArg &pipeArg) {
intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i],
intCacheDupIdx, intMidArr);
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
@ -1628,12 +1070,6 @@ static void mergeAllTask(PipelineArg &pipeArg) {
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
g.singletonIdxArr.push_back(vector<int64_t>());
auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
// gSingletonNum += lp.pairSingletonIdx.size();
}
static void parallelPipeline() {