picard_cpp/src/sam/markdups/serial_md.h

943 lines
34 KiB
C
Raw Normal View History

#include <algorithm>
#include <robin-map/include/tsl/robin_map.h>
/* 存放未匹配readend相同位点的所有readend */
struct UnpairedREInfo
{
int64_t taskSeq;
ReadEnds unpairedRE;
};
/* 对于一个pair数据一个完整的计算点包含read1的比对位置和read2的比对位置 */
struct CalcKey
{
int64_t read1Pos;
int64_t read2Pos;
bool operator<(const CalcKey &o) const
{
int comp = (int)(read1Pos - o.read1Pos);
if (comp == 0)
comp = (int)(read2Pos - o.read2Pos);
return comp < 0;
}
};
/* 当遗留数据在当前任务找到了pair read后进行冗余计算时候存放结果的数据结构 */
struct TaskSeqDupInfo
{
set<int64_t> dupIdx;
set<int64_t> opticalDupIdx;
set<int64_t> notDupIdx;
};
/* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */
struct UnpairedPosInfo
{
int unpairedNum = 0;
int64_t taskSeq;
vector<ReadEnds> pairArr;
set<string> readNameSet;
};
// typedef unordered_map<string, UnpairedREInfo> UnpairedNameMap;
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // 以read name为索引保存未匹配的pair read
typedef tsl::robin_map<int64_t, UnpairedPosInfo> UnpairedPositionMap; // 以位点为索引保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量
/* 单线程处理冗余参数结构体 */
struct SerailMarkDupArg
{
int64_t taskSeq;
int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置
vector<BamWrap *> bams; // 存放待处理的bam read
vector<ReadEnds> pairs;
vector<ReadEnds> frags;
set<int64_t> pairDupIdx; // pair的冗余read的索引
set<int64_t> pairOpticalDupIdx; // optical冗余read的索引
set<int64_t> fragDupIdx; // frag的冗余read的索引
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
};
/* 全局保留的数据因为有些paired数据比对到了不同的染色体相距甚远 */
struct GlobalDataArg
{
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr;
// 每个task对应一个vector
vector<vector<int64_t>> dupIdxArr;
vector<vector<int64_t>> opticalDupIdxArr;
// 用来存放后续计算的数据
vector<set<int64_t>> latterDupIdxArr;
vector<set<int64_t>> latterOpticalDupIdxArr;
vector<set<int64_t>> latterNotDupIdxArr;
};
static GlobalDataArg gData;
/* 查找 */
// template<class Itr, class T>
// static inline Itr binaryFind(Itr first, Itr last, const T &val)
// {
// first = std::lower_bound(first, last, val);
// return (first != last && *first == val) ? first : last;
// }
/* 排序 */
static inline void sortReadEndsArr(vector<ReadEnds> &arr)
{
size_t blockSize = 64 * 1024;
blockSize = min(blockSize, arr.size());
size_t blockNum = (arr.size() + blockSize - 1) / blockSize;
size_t crossNum = 1024;
size_t start, end, i, left, right;
std::sort(arr.begin(), arr.begin() + blockSize);
for (i = 1; i < blockNum; ++i)
{
start = i * blockSize;
end = min(start + blockSize, arr.size());
std::sort(arr.begin() + start, arr.begin() + end);
left = crossNum;
while (!(arr[start - left] < arr[start]))
{
left = left << 1;
if (left >= blockSize)
{
std::sort(arr.begin(), arr.end()); // 退化到普通排序
return;
}
}
right = min(crossNum, end - start - 1);
while (!(arr[start - 1] < arr[start + right]))
{
right = min(right << 1, end - start - 1);
if (right == end - start - 1)
break;
}
std::sort(arr.begin() + start - left, arr.begin() + start + right);
}
}
/* 处理一组pairend的readends标记冗余 */
static void markDupsForPairs(vector<const ReadEnds *> &vpRe,
set<int64_t> *dupIdx,
set<int64_t> *opticalDupIdx,
set<int64_t> *notDupIdx = nullptr)
{
if (vpRe.size() < 2)
{
if (vpRe.size() == 1)
{
// addSingletonToCount(libraryIdGenerator);
}
return;
}
int maxScore = 0;
const ReadEnds *pBest = nullptr;
/** All read ends should have orientation FF, FR, RF, or RR **/
for (auto pe : vpRe) // 找分数最高的readend
{
if (pe->score > maxScore || pBest == nullptr)
{
maxScore = pe->score;
pBest = pe;
}
}
if (notDupIdx != nullptr)
{
notDupIdx->insert(pBest->read1IndexInFile);
notDupIdx->insert(pBest->read2IndexInFile);
}
if (!g_mdArg.READ_NAME_REGEX.empty()) // 检查光学冗余
{
// trackOpticalDuplicates
}
for (auto pe : vpRe) // 对非best read标记冗余
{
if (pe != pBest) // 非best
{
dupIdx->insert(pe->read1IndexInFile); // 添加read1
if (pe->read2IndexInFile != pe->read1IndexInFile)
dupIdx->insert(pe->read2IndexInFile); // 添加read2
}
}
// if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS)
// {
// addRepresentativeReadIndex(vpRe);
// }
}
/* 处理一组非paired的readends标记冗余 */
static void markDupsForFrags(vector<const ReadEnds *> &vpRe,
bool containsPairs,
set<int64_t> *dupIdx,
set<int64_t> *notDupIdx = nullptr)
{
if (containsPairs)
{
for (auto pe : vpRe)
{
if (!pe->IsPaired())
{
dupIdx->insert(pe->read1IndexInFile);
}
}
}
else
{
int maxScore = 0;
const ReadEnds *pBest = nullptr;
for (auto pe : vpRe)
{
if (pe->score > maxScore || pBest == nullptr)
{
maxScore = pe->score;
pBest = pe;
}
}
if (notDupIdx != nullptr)
{
notDupIdx->insert(pBest->read1IndexInFile);
}
for (auto pe : vpRe)
{
if (pe != pBest)
{
dupIdx->insert(pe->read1IndexInFile);
}
}
}
}
/* 找到与readend pos相等的所有readend */
static void getEqualRE(const ReadEnds &re, vector<ReadEnds> &src, vector<ReadEnds> *dst)
{
auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::pairsLittleThan); // 只比对位点
dst->insert(dst->end(), range.first, range.second);
}
/* 单线程生成readends (第一步)*/
static void generateReadEnds(SerailMarkDupArg *arg)
{
auto &p = *arg;
auto &rnParser = g_vRnParser[0];
p.pairs.clear();
p.frags.clear();
p.unpairedDic.clear();
p.unpairedPosArr.clear();
/* 处理每个read创建ReadEnd并放入frag和pair中 */
set<ReadEnds> reSet;
ReadEnds lastRe;
for (int i = 0; i < p.bams.size(); ++i) // 循环处理每个read
{
BamWrap *bw = p.bams[i];
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag())
{
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
}
else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对
{
ReadEnds fragEnd;
tm_arr[8].acc_start();
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
tm_arr[8].acc_end();
p.frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了
{
string key = bw->query_name();
if (p.unpairedDic.find(key) == p.unpairedDic.end())
{
p.unpairedDic[key] = {p.taskSeq, fragEnd};
}
else // 找到了pairend
{
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
p.pairs.push_back(pairedEnds);
p.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
}
tm_arr[9].acc_start();
sortReadEndsArr(p.frags);
// sort(p.frags.begin(), p.frags.end());
tm_arr[9].acc_end();
// cout << "sort pairs" << endl;
tm_arr[10].acc_start();
sort(p.pairs.begin(), p.pairs.end());
tm_arr[10].acc_end();
// 记录位点上的未匹配的read个数
for (auto &e : p.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey;
auto &unpairArrInfo = p.unpairedPosArr[posKey];
unpairArrInfo.unpairedNum++;
unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first);
}
cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << endl;
}
/* 处理pairs */
static void processPairs(vector<ReadEnds> &readEnds,
set<int64_t> *dupIdx,
set<int64_t> *opticalDupIdx,
set<int64_t> *notDupIdx = nullptr)
{
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds)
{
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
vpCache.push_back(&re); // 处理一个潜在的冗余组
else
{
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); // 不一样
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
}
}
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx);
}
/* 处理frags */
static void processFrags(vector<ReadEnds> &readEnds,
set<int64_t> *dupIdx,
set<int64_t> *notDupIdx = nullptr)
{
bool containsPairs = false;
bool containsFrags = false;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds)
{
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false))
{
vpCache.push_back(&re);
containsPairs = containsPairs || re.IsPaired();
containsFrags = containsFrags || !re.IsPaired();
}
else
{
if (vpCache.size() > 1 && containsFrags)
{
markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx);
}
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
containsPairs = re.IsPaired();
containsFrags = !re.IsPaired();
}
}
if (vpCache.size() > 1 && containsFrags)
{
markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx);
}
}
/* 单线程markdup (第二步)*/
static void markdups(SerailMarkDupArg *arg)
{
auto &p = *arg;
p.pairDupIdx.clear();
p.pairOpticalDupIdx.clear();
p.fragDupIdx.clear();
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
// 先处理 pair
processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx);
// 再处理frag
processFrags(p.frags, &p.fragDupIdx);
}
/* 获取交叉部分的数据 */
static inline void getIntersectData(vector<ReadEnds> &leftArr,
vector<ReadEnds> &rightArr,
vector<ReadEnds> *dst,
bool isPairCmp = false)
{
const size_t leftEndIdx = leftArr.size() - 1;
const size_t rightStartIdx = 0;
size_t leftSpan = 0;
size_t rightSpan = 0;
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp))
{
leftSpan += 1;
if (leftSpan > leftEndIdx)
{
leftSpan = leftArr.size() - 1;
break;
}
}
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp))
{
rightSpan += 1;
if (rightSpan == rightArr.size() - 1)
break;
}
dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end());
dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan);
std::sort(dst->begin(), dst->end());
}
/* 将frags重叠部分的dup idx放进数据中 */
static inline void refreshFragDupIdx(set<int64_t> &dupIdx,
set<int64_t> &notDupIdx,
SerailMarkDupArg * lastArg,
SerailMarkDupArg *curArg)
{
auto &lp = *lastArg;
auto &p = *curArg;
for (auto idx : dupIdx)
{
lp.fragDupIdx.insert(idx);
p.fragDupIdx.erase(idx);
}
for (auto idx : notDupIdx)
{
lp.fragDupIdx.erase(idx);
p.fragDupIdx.erase(idx);
}
}
/* 将pairs重叠部分的dup idx放进数据中 */
static inline void refreshPairDupIdx(set<int64_t> &dupIdx,
set<int64_t> &opticalDupIdx,
set<int64_t> &notDupIdx,
SerailMarkDupArg *lastArg,
SerailMarkDupArg *curArg)
{
auto &lp = *lastArg;
auto &p = *curArg;
for (auto idx : dupIdx)
{
lp.pairDupIdx.insert(idx);
p.pairDupIdx.erase(idx);
}
for (auto idx : opticalDupIdx)
{
lp.pairOpticalDupIdx.insert(idx);
p.pairOpticalDupIdx.erase(idx);
}
for (auto idx : notDupIdx)
{
lp.pairDupIdx.erase(idx);
lp.pairOpticalDupIdx.erase(idx);
p.pairDupIdx.erase(idx);
p.pairOpticalDupIdx.erase(idx);
}
}
// 用来分别处理dup和optical dup
static void refeshTaskDupInfo(set<int64_t> &dupIdx,
set<int64_t> &opticalDupIdx,
set<int64_t> &notDupIdx,
set<int64_t> &latterDupIdx,
set<int64_t> &latterOpticalDupIdx,
set<int64_t> &latterNotDupIdx)
{
for (auto idx : dupIdx)
latterDupIdx.insert(idx);
for (auto idx : opticalDupIdx)
latterOpticalDupIdx.insert(idx);
for (auto idx : notDupIdx)
latterNotDupIdx.insert(idx);
}
/* 最后合并数据并排序 */
static void refeshFinalTaskDupInfo(set<int64_t> &dupIdx,
set<int64_t> &notDupIdx,
vector<int64_t> &dupArr)
{
vector<int64_t> midArr;
auto ai = dupArr.begin();
auto bi = dupIdx.begin();
auto ae = dupArr.end();
auto be = dupIdx.end();
int64_t val = 0;
while (ai != ae && bi != be)
{
if (*ai < *bi)
{
val = *ai++;
}
else if (*bi < *ai)
{
val = *bi++;
}
else
{
val = *ai++;
bi++;
}
if (notDupIdx.find(val) == notDupIdx.end())
{
midArr.push_back(val);
}
}
while (ai != ae)
{
val = *ai++;
if (notDupIdx.find(val) == notDupIdx.end())
{
midArr.push_back(val);
}
}
while (bi != be)
{
val = *bi++;
if (notDupIdx.find(val) == notDupIdx.end())
{
midArr.push_back(val);
}
}
dupArr = midArr;
}
/* 处理相邻的两个任务,有相交叉的数据 */
static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg, GlobalDataArg *gDataArg)
{
auto &lp = *lastArg;
auto &p = *curArg;
auto &g = *gDataArg;
vector<ReadEnds> reArr;
set<int64_t> dupIdx;
set<int64_t> notDupIdx;
// 先处理重叠的frags
getIntersectData(lp.frags, p.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p);
// 再处理重叠的pairs
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
set<int64_t> opticalDupIdx;
getIntersectData(lp.pairs, p.pairs, &reArr, true);
processPairs(reArr, &dupIdx, &opticalDupIdx, &notDupIdx);
refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p);
// 处理之前未匹配的部分
map<CalcKey, int64_t> recalcPos;
set<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
set<int64_t> addToGlobal;
int64_t prevLastPos = 0, nextFirstPos = 0;
if (lp.frags.size() > 0)
prevLastPos = lp.frags.back().posKey;
if (p.frags.size() > 0)
nextFirstPos = p.frags[0].posKey;
// cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl;
for (auto &prevUnpair : lp.unpairedDic) // 遍历上一个任务中的每个未匹配的read
{
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) // 在当前这个任务里找到了这个未匹配的read
{
auto &nextPosInfo = p.unpairedDic[readName];
auto &nextFragEnd = nextPosInfo.unpairedRE;
int64_t prevPosKey = prevFragEnd.posKey;
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下poskey可能是后面的read
int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey);
CalcKey ck = {prevPosKey, nextPosKey};
UnpairedPosInfo *prevUnpairInfoP = nullptr;
UnpairedPosInfo *nextUnpairInfoP = nullptr;
if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end())
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
// pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定
// 1. prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据;
// 2. prevpos在交叉部分之前nextpos在交叉部分需要获取lp中的相等read pair进行重新计算
// 复杂情况1. g中包含prevPosKey对应的unpairp中有对应的pair此时应该把这些pair考虑进去
// 3. prevpos在交叉部分nextpos在交叉部分之后需要获取p中的相等read pair进行重新计算
// 复杂情况2. p中是否包含prevPosKey对应的unpair
// 4. prevpos在交叉部分nextpos在交叉部分需要获取lp和p中的相等read pair进行重新计算
bool addDataToPos = true;
if (alreadyAdd.find(ck) != alreadyAdd.end())
{
addDataToPos = false; // 之前已经添加过了,后面就不用再添加数据了
}
else
alreadyAdd.insert(ck);
if (prevPosKey < nextFirstPos) // prevpos在交叉部分之前
{
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (nextPosKey <= prevLastPos && addDataToPos) // 第二种情况
{
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
}
// 第一种情况,第二种情况下都会出现,复杂情况一
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
if (gPosInfo != g.unpairedPosArr.end()) // 可能g和p有匹配的刚好和该位点一致
{
auto &gUnpairInfo = gPosInfo->second;
auto pPosInfo = p.unpairedPosArr.find(nextPosKey);
if (pPosInfo != p.unpairedPosArr.end())
{
auto &pUnpairInfo = pPosInfo->second;
for (auto &rn : gUnpairInfo.readNameSet) // 遍历每一个readname看是否有匹配的
{
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end())
{
auto pe = g.unpairedDic[rn].unpairedRE;
auto fe = p.unpairedDic[rn].unpairedRE;
modifyPairedEnds(fe, &pe);
prevPairArr.push_back(pe);
g.unpairedDic.erase(rn);
p.unpairedDic.erase(rn);
// cout << "找到了!" << rn << endl;
}
}
}
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
else // prevpos在交叉部分
{
if (nextPosKey > prevLastPos) // nextpos在交叉部分之后
{ // 第三种情况
if (nextUnpairInfoP != nullptr) // 且在pos点next task有unpair这样才把这些数据放到next task里
{
auto &nextPairArr = nextUnpairInfoP->pairArr;
nextPairArr.push_back(prevFragEnd);
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos)
{
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
}
recalcPos[ck] = nextPosInfo.taskSeq; // 将数据放到next task里, (这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
std::sort(prevPairArr.begin(), prevPairArr.end());
}
else // next task在该位点没有unpair那就把数据放到prev task里
{
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) // 第二种情况
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
}
else
{ // 第四种情况
if (prevUnpairInfoP == nullptr) {
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
prevUnpairInfoP->taskSeq = lp.taskSeq;
}
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos)
{
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
}
p.unpairedDic.erase(readName); // 在next task里删除该read
}
else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在遗留数据中找到了匹配的read
{
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
auto &remainPairArr = remainUnpairInfo.pairArr;
remainPairArr.push_back(remainFragEnd);
CalcKey ck = {remainPosKey, prevFragEnd.posKey};
recalcPos[ck] = remainPosInfo.taskSeq;
std::sort(remainPairArr.begin(), remainPairArr.end());
g.unpairedDic.erase(readName);
}
else // 都没找到,那就保存到遗留数据里
{
int64_t prevPosKey = prevFragEnd.posKey;
g.unpairedDic.insert(prevUnpair);
addToGlobal.insert(prevPosKey);
}
}
for (auto posKey : addToGlobal) // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
map<int64_t, TaskSeqDupInfo> taskChanged;
set<int64_t> posProcessed;
for (auto &e : recalcPos)
{
auto posKey = e.first.read1Pos;
if (posProcessed.find(posKey) != posProcessed.end())
continue;
posProcessed.insert(posKey);
auto taskSeq = e.second;
auto &t = taskChanged[taskSeq];
// 在对应的任务包含的dup idx里修改结果数据
vector<ReadEnds> *pairArrP = nullptr;
if (taskSeq < lp.taskSeq)
pairArrP = &g.unpairedPosArr[posKey].pairArr;
else
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx);
if (taskSeq < lp.taskSeq)
g.unpairedPosArr.erase(posKey);
}
// 更新结果
for (auto &e : taskChanged)
{
auto taskSeq = e.first;
auto &t = e.second;
if (taskSeq < lp.taskSeq)
{
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx,
g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]);
}
else if (taskSeq == lp.taskSeq)
{
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &lp, &p);
}
else
{
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &p, &lp); // 把结果放到p中
}
}
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
// cout << "calc g time: " << t.seconds_elapsed() << " s" << endl;
// 将dupidx放进全局数据
g.latterDupIdxArr.push_back(set<int64_t>());
g.latterOpticalDupIdxArr.push_back(set<int64_t>());
g.latterNotDupIdxArr.push_back(set<int64_t>());
g.dupIdxArr.push_back(vector<int64_t>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
}
/* 当所有任务结束后global data里还有未处理的数据 */
static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg)
{
auto &lp = *task;
auto &g = *gDataArg;
// 遗留的未匹配的pair
for (auto &prevUnpair : lp.unpairedDic) // 遍历上一个任务中的每个未匹配的read
{
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在遗留数据中找到了匹配的read
{
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
remainUnpairInfo.pairArr.push_back(remainFragEnd);
g.unpairedDic.erase(readName);
}
}
map<int64_t, TaskSeqDupInfo> taskChanged;
for (auto &e : g.unpairedPosArr)
{
auto posKey = e.first;
auto taskSeq = e.second.taskSeq;
auto &t = taskChanged[taskSeq];
auto &arr = g.unpairedPosArr[posKey].pairArr;
if (arr.size() > 1)
{
std::sort(arr.begin(), arr.end());
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx);
}
}
// 更新结果
vector<int64_t> addDup;
map<int64_t, int64_t> ndPosVal;
for (auto &e : taskChanged)
{
auto taskSeq = e.first;
auto &t = e.second;
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx,
g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]);
}
cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
g.unpairedPosArr.clear();
g.unpairedDic.clear();
// 将dupidx放进全局数据
for (int i = 0; i < g.dupIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]);
for (int i = 0; i < g.opticalDupIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotDupIdxArr[i], g.opticalDupIdxArr[i]);
g.dupIdxArr.push_back(vector<int64_t>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
}
/* 串行处理数据,标记冗余 */
static void serialMarkDups()
{
tm_arr[5].acc_start();
Timer::log_time("serial start");
// 读取缓存初始化
BamBufType inBamBuf(g_gArg.use_asyncio);
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
// BamBufType inBamBuf(false);
// inBamBuf.Init(g_inBamFp, g_inBamHeader, 100 * 1024 * 1024);
int64_t processedBamNum = 0;
SerailMarkDupArg smdArg1, smdArg2;
SerailMarkDupArg *lastArgP = &smdArg1;
SerailMarkDupArg *curArgP = &smdArg2;
bool isFirstRound = true;
int roundNum = 0;
while (inBamBuf.ReadStat() >= 0)
{
Timer t_round;
// 读取bam文件中的read
tm_arr[4].acc_start();
size_t readNum = inBamBuf.ReadBam();
tm_arr[4].acc_end();
cout << "read num: " << readNum << endl;
// lastArgP = curArgP;
tm_arr[6].acc_start();
curArgP->taskSeq = roundNum;
curArgP->bamStartIdx = processedBamNum;
curArgP->bams = inBamBuf.GetBamArr();
tm_arr[6].acc_end();
tm_arr[0].acc_start();
Timer t1;
generateReadEnds(curArgP);
//cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl;
tm_arr[0].acc_end();
tm_arr[1].acc_start();
t1.reinit();
markdups(curArgP);
//cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
tm_arr[1].acc_end();
if (!isFirstRound)
{
tm_arr[2].acc_start();
t1.reinit();
handleIntersectData(lastArgP, curArgP, &gData);
//cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl;
// addTaskIdxToSet(lastArgP, &gData);
tm_arr[2].acc_end();
}
else
{
isFirstRound = false;
}
inBamBuf.ClearAll(); // 清理上一轮读入的数据
processedBamNum += readNum;
// 交换
auto tmp = lastArgP;
lastArgP = curArgP;
curArgP = tmp;
cout << "round time: " << t_round.seconds_elapsed() << endl;
roundNum++;
if (roundNum > 9){
// break;
}
}
// cout << "here" << endl;
tm_arr[3].acc_start();
// 处理剩下的全局数据
handleLastTask(lastArgP, &gData);
// cout << "here 2" << endl;
tm_arr[3].acc_end();
tm_arr[5].acc_end();
// 统计所有冗余index数量
int64_t dupNum = 0;
set<int64_t> dup;
// int taskSeq = 0;
// for (auto &arr : gData.dupIdxArr)
// {
// for (auto idx : arr) {
// if (dup.find(idx) != dup.end())
// {
// cout << "dup index: " << taskSeq << '\t' << idx << endl;
// }
// dup.insert(idx);
// }
// taskSeq++;
// }
// #include <fstream>
// ofstream out("tumor_dup.txt");
// for (auto idx : dup)
// {
// out << idx << endl;
// }
// out.close();
for (auto &arr : gData.dupIdxArr)
dupNum += arr.size();
cout << "dup num : " << dupNum << '\t' << dup.size() << endl;
cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl;
cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl;
cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl;
cout << "handle last : " << tm_arr[3].acc_seconds_elapsed() << endl;
cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl;
cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl;
cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl;
cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl;
cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl;
cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl;
cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl;
Timer::log_time("serial end ");
//for (auto i : gData.dupArr)
// cout << i << endl;
}