picard_cpp/src/sam/markdups/serial_md.cpp

1047 lines
45 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "serial_md.h"
#include <common/hts/bam_buf.h>
#include <common/utils/debug.h>
#include <common/utils/global_arg.h>
#include <common/utils/profiling.h>
#include <common/utils/timer.h>
#include <common/utils/util.h>
#include <sam/utils/read_ends.h>
#include <algorithm>
#include <iostream>
#include <vector>
#include "dup_metrics.h"
#include "markdups_arg.h"
#include "md_funcs.h"
#include "shared_args.h"
using std::cout;
using std::vector;
/* 查找 */
// template<class Itr, class T>
// static inline Itr binaryFind(Itr first, Itr last, const T &val)
// {
// first = std::lower_bound(first, last, val);
// return (first != last && *first == val) ? first : last;
// }
/* 排序 */
static inline void sortReadEndsArr(vector<ReadEnds> &arr) {
size_t blockSize = 64 * 1024;
if (arr.size() < blockSize) {
std::sort(arr.begin(), arr.end());
return;
}
size_t blockNum = (arr.size() + blockSize - 1) / blockSize;
size_t crossNum = 1024;
size_t start, end, i, left, right;
std::sort(arr.begin(), arr.begin() + blockSize);
for (i = 1; i < blockNum; ++i) {
start = i * blockSize;
end = min(start + blockSize, arr.size());
std::sort(arr.begin() + start, arr.begin() + end);
left = crossNum;
while (!(arr[start - left] < arr[start])) {
left = left << 1;
if (left >= blockSize) {
std::sort(arr.begin(), arr.end()); // 退化到普通排序
return;
}
}
right = min(crossNum, end - start - 1);
while (!(arr[start - 1] < arr[start + right])) {
right = min(right << 1, end - start - 1);
if (right == end - start - 1)
break;
}
std::sort(arr.begin() + start - left, arr.begin() + start + right);
}
}
/* 处理一组pairend的readends标记冗余, 这个函数需要串行运行,因为需要做一些统计*/
static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
if (vpRe.size() < 2) {
if (vpRe.size() == 1) {
// 这个统计可能会有误差因为当前位点可能还有没匹配上的read导致当前位点的readpaired数量为1
// 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要
gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile);
}
return;
}
int maxScore = 0;
const ReadEnds *pBest = nullptr;
// bool print = false;
// int maxOperateTime = 0;
/** All read ends should have orientation FF, FR, RF, or RR **/
for (auto pe : vpRe) { // 找分数最高的readend
// maxOperateTime = max(maxOperateTime, pe->oprateTime);
// (const_cast<ReadEnds *>(pe))->oprateTime ++;
if (pe->score > maxScore || pBest == nullptr) {
maxScore = pe->score;
pBest = pe;
}
/*
if (pe->read1IndexInFile == 255830545) {
print = true;
}
*/
}
/*
if (print) {
cout << "mark pair find: " << endl;
for (auto pe : vpRe) {
cout << pe->read1IndexInFile << " " << pe->read2IndexInFile << " " << pe->score << endl;
}
cout << "mark pair end: " << endl;
}
*/
// cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" <<
// pBest->read2IndexInFile << endl; if (maxOperateTime == 0) ++zzhtestnum;
if (notDupIdx != nullptr) {
notDupIdx->insert(pBest->read1IndexInFile);
notDupIdx->insert(pBest->read2IndexInFile);
}
// if (false) {
// if (true) {
if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余
// trackOpticalDuplicates
vector<const ReadEnds *> prevOpticalRe;
if (notOpticalDupIdx != nullptr) {
for (auto pe : vpRe) {
if (pe->isOpticalDuplicate) {
prevOpticalRe.push_back(pe);
}
}
}
trackOpticalDuplicates(vpRe, pBest);
// 由于重叠问题,可能会更新数据
if (notOpticalDupIdx != nullptr) {
for (auto pe : prevOpticalRe) {
if (!pe->isOpticalDuplicate) {
notOpticalDupIdx->insert(pe->read1IndexInFile);
notOpticalDupIdx->insert(pe->read2IndexInFile);
}
}
}
}
for (auto pe : vpRe) { // 对非best read标记冗余
if (pe != pBest) { // 非best
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1
if (pe->read2IndexInFile != pe->read1IndexInFile)
dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2,
// 注意这里代表是read1的索引
// 检查是否optical dup
if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) {
opticalDupIdx->insert(pe->read1IndexInFile);
if (pe->read2IndexInFile != pe->read1IndexInFile)
opticalDupIdx->insert(pe->read2IndexInFile);
}
}
}
// 在输出的bam文件中添加tag, best作为dupset的代表
if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) {
repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
if (notRepIdx != nullptr) {
for (auto pe : vpRe) {
if (pe != pBest) {
notRepIdx->insert(pe->read1IndexInFile);
if (pe->read2IndexInFile != pe->read1IndexInFile)
notRepIdx->insert(pe->read2IndexInFile);
}
}
}
}
}
/* 处理一组非paired的readends标记冗余 */
static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, DPSet<DupInfo> *dupIdx,
MDSet<int64_t> *notDupIdx = nullptr) {
if (containsPairs) {
for (auto pe : vpRe) {
if (!pe->IsPaired()) {
dupIdx->insert(pe->read1IndexInFile);
}
}
} else {
int maxScore = 0;
const ReadEnds *pBest = nullptr;
for (auto pe : vpRe) {
if (pe->score > maxScore || pBest == nullptr) {
maxScore = pe->score;
pBest = pe;
}
}
if (notDupIdx != nullptr) {
notDupIdx->insert(pBest->read1IndexInFile);
}
for (auto pe : vpRe) {
if (pe != pBest) {
dupIdx->insert(pe->read1IndexInFile);
}
}
}
}
/* 找到与readend pos相等的所有readend */
static void getEqualRE(const ReadEnds &re, vector<ReadEnds> &src, vector<ReadEnds> *dst) {
auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::PairsLittleThan); // 只比对位点
dst->insert(dst->end(), range.first, range.second);
}
/* 单线程生成readends (第一步)*/
static void generateReadEnds(MarkDupDataArg *arg) {
auto &p = *arg;
auto &rnParser = g_vRnParser[0];
p.pairs.clear();
p.frags.clear();
p.unpairedDic.clear();
p.unpairedPosArr.clear();
// bam_num1 += p.bams.size();
/* 处理每个read创建ReadEnd并放入frag和pair中 */
// MDSet<ReadEnds> reSet;
// ReadEnds lastRe;
for (size_t i = 0; i < p.bams.size(); ++i) { // 循环处理每个read
BamWrap *bw = p.bams[i];
// ++bam_num1;
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag()) {
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
} else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对
ReadEnds fragEnd;
tm_arr[8].acc_start();
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
tm_arr[8].acc_end();
p.frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了
string key = bw->query_name();
if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {p.taskSeq, fragEnd};
} else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
p.pairs.push_back(pairedEnds);
p.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
}
tm_arr[9].acc_start();
sortReadEndsArr(p.frags);
// sort(p.frags.begin(), p.frags.end());
tm_arr[9].acc_end();
// cout << "sort pairs" << endl;
tm_arr[10].acc_start();
sort(p.pairs.begin(), p.pairs.end());
tm_arr[10].acc_end();
// 记录位点上的未匹配的read个数
for (auto &e : p.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey;
auto &unpairArrInfo = p.unpairedPosArr[posKey];
unpairArrInfo.unpairedNum++;
unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first);
}
if (p.taskSeq == 98) {
for (auto &re : p.unpairedPosArr[14293757783047].pairArr) {
cout << "task 99 unpair reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl;
}
}
// cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() <<
// endl;
}
/* 处理pairs */
static void processPairs(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
// return;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds) {
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
vpCache.push_back(&re); // 处理一个潜在的冗余组
else {
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); // 不一样
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
}
}
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx);
}
/* 处理frags */
static void processFrags(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *notDupIdx = nullptr) {
bool containsPairs = false;
bool containsFrags = false;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds) {
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) {
vpCache.push_back(&re);
containsPairs = containsPairs || re.IsPaired();
containsFrags = containsFrags || !re.IsPaired();
} else {
if (vpCache.size() > 1 && containsFrags) {
markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx);
}
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
containsPairs = re.IsPaired();
containsFrags = !re.IsPaired();
}
}
if (vpCache.size() > 1 && containsFrags) {
markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx);
}
}
/* 单线程markdup (第二步)*/
static void markdups(MarkDupDataArg *arg) {
auto &p = *arg;
p.pairDupIdx.clear();
p.pairOpticalDupIdx.clear();
p.fragDupIdx.clear();
p.pairRepIdx.clear();
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
// 先处理 pair
processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx);
// cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl;
// 再处理frag
processFrags(p.frags, &p.fragDupIdx);
}
/* 获取交叉部分的数据 */
static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds> &rightArr, vector<ReadEnds> *dst,
bool isPairCmp = false) {
if (leftArr.empty() || rightArr.empty()) {
// cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl;
return;
}
const size_t leftEndIdx = leftArr.size() - 1;
const size_t rightStartIdx = 0;
size_t leftSpan = 0;
size_t rightSpan = 0;
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) {
leftSpan += 1;
if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了可能会有bug上上个也与下一个有交集呢
leftSpan = leftArr.size() - 1;
break;
}
}
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) {
rightSpan += 1;
if (rightSpan == rightArr.size() - 1) // 同上可能会有bug
break;
}
dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end());
dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan);
std::sort(dst->begin(), dst->end());
}
/* 将frags重叠部分的dup idx放进数据中 */
static inline void refreshFragDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &notDupIdx, MarkDupDataArg *lastArg,
MarkDupDataArg *curArg) {
auto &lp = *lastArg;
auto &p = *curArg;
for (auto idx : dupIdx) {
lp.fragDupIdx.insert(idx);
p.fragDupIdx.erase(idx);
}
for (auto idx : notDupIdx) {
lp.fragDupIdx.erase(idx);
p.fragDupIdx.erase(idx);
}
}
/* 将pairs重叠部分的dup idx放进数据中 */
static inline void refreshPairDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDupIdx, DPSet<DupInfo> &repIdx,
MDSet<int64_t> &notDupIdx, MDSet<int64_t> &notOpticalDupIdx,
MDSet<int64_t> &notRepIdx, MarkDupDataArg *lastArg, MarkDupDataArg *curArg) {
auto &lp = *lastArg;
auto &p = *curArg;
for (auto idx : dupIdx) {
lp.pairDupIdx.insert(idx);
p.pairDupIdx.erase(idx);
}
for (auto idx : opticalDupIdx) {
lp.pairOpticalDupIdx.insert(idx);
p.pairOpticalDupIdx.erase(idx);
}
for (auto idx : repIdx) {
lp.pairRepIdx.insert(idx);
p.pairRepIdx.erase(idx);
}
// for (auto idx : notDupIdx) {
// if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl;
// if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl;
// if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl;
// if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl;
// lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx);
// }
for (auto idx : notOpticalDupIdx) {
lp.pairOpticalDupIdx.erase(idx);
p.pairOpticalDupIdx.erase(idx);
}
for (auto idx : notRepIdx) {
lp.pairRepIdx.erase(idx);
p.pairRepIdx.erase(idx);
}
}
// 用来分别处理dup和optical dup
static void refeshTaskDupInfo(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDupIdx, DPSet<DupInfo> &repIdx,
MDSet<int64_t> &notDupIdx, MDSet<int64_t> &notOpticalDupIdx, MDSet<int64_t> &notRepIdx,
DPSet<DupInfo> &latterDupIdx, MDSet<int64_t> &latterOpticalDupIdx,
DPSet<DupInfo> &latterRepIdx, MDSet<int64_t> &latterNotDupIdx,
MDSet<int64_t> &latterNotOpticalDupIdx, MDSet<int64_t> &latterNotRepIdx) {
for (auto idx : dupIdx) {
latterDupIdx.insert(idx);
// latterNotDupIdx.erase(idx); // 后来的更新为准
}
for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx);
for (auto idx : repIdx) latterRepIdx.insert(idx);
for (auto idx : notDupIdx) latterNotDupIdx.insert(idx);
for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx);
for (auto idx : notRepIdx) latterNotRepIdx.insert(idx);
}
/* 最后合并数据并排序 */
template <typename DupContainer, typename T>
static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> &notDupIdx, vector<T> &dupArr,
vector<T> &cacheDupIdx, vector<T> &midArr) {
midArr.resize(0);
cacheDupIdx.resize(0);
cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end());
std::sort(cacheDupIdx.begin(), cacheDupIdx.end());
auto ai = dupArr.begin();
auto ae = dupArr.end();
auto bi = cacheDupIdx.begin();
auto be = cacheDupIdx.end();
// auto bi = dupIdx.begin();
// auto be = dupIdx.end();
T val = 0;
while (ai != ae && bi != be) {
if (*ai < *bi) {
val = *ai++;
} else if (*bi < *ai) {
val = *bi++;
} else {
val = *bi++; // 相等的时候取后面的作为结果
ai++;
}
if (notDupIdx.find(val) == notDupIdx.end()) {
midArr.push_back(val);
}
}
while (ai != ae) {
val = *ai++;
if (notDupIdx.find(val) == notDupIdx.end()) {
midArr.push_back(val);
}
}
while (bi != be) {
val = *bi++;
if (notDupIdx.find(val) == notDupIdx.end()) {
midArr.push_back(val);
}
}
dupArr = midArr;
}
/* 处理相邻的两个任务,有相交叉的数据 */
static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) {
auto &lp = *lastArg;
auto &p = *curArg;
auto &g = *gDataArg;
vector<ReadEnds> reArr;
DPSet<DupInfo> dupIdx;
MDSet<int64_t> opticalDupIdx;
DPSet<DupInfo> repIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notRepIdx;
// 先处理重叠的frags
getIntersectData(lp.frags, p.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p);
// 再处理重叠的pairs
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
getIntersectData(lp.pairs, p.pairs, &reArr, true);
processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx);
refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, &lp, &p);
// cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl;
// cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl;
// 处理之前未匹配的部分
map<CalcKey, int64_t> recalcPos;
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
MDSet<int64_t> addToGlobal;
int64_t prevLastPos = 0, nextFirstPos = 0;
if (lp.frags.size() > 0)
prevLastPos = lp.frags.back().posKey;
if (p.frags.size() > 0)
nextFirstPos = p.frags[0].posKey;
// cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl;
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
// if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" ||
// readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") {
// cout << "read name found: " << lp.taskSeq << endl;
// }
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546
// || prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) {
// cout << prevFragEnd.read1IndexInFile << "\t" << readName << endl;
// }
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto &nextPosInfo = p.unpairedDic[readName];
auto &nextFragEnd = nextPosInfo.unpairedRE;
int64_t prevPosKey = prevFragEnd.posKey;
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下poskey可能是后面的read
int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey);
CalcKey ck = {prevPosKey, nextPosKey};
UnpairedPosInfo *prevUnpairInfoP = nullptr;
UnpairedPosInfo *nextUnpairInfoP = nullptr;
if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end())
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546 ||
// prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) {
// cout << "find in p: " << lp.taskSeq << "\t" << prevFragEnd.read1IndexInFile << "\t" << readName <<
// endl; if (nextUnpairInfoP != nullptr)
// cout << "next p: " << nextUnpairInfoP->unpairedNum << endl;
// if (prevUnpairInfoP != nullptr)
// cout << "prev p: " << prevUnpairInfoP->unpairedNum << endl;
// cout << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl;
// }
// pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定
// 1.
// prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据;
// 2.
// prevpos在交叉部分之前nextpos在交叉部分需要获取lp中的相等read pair进行重新计算
// 复杂情况1. g中包含prevPosKey对应的unpairp中有对应的pair此时应该把这些pair考虑进去
// 3.
// prevpos在交叉部分nextpos在交叉部分之后需要获取p中的相等read pair进行重新计算
// 复杂情况2. p中是否包含prevPosKey对应的unpair
// 4.
// prevpos在交叉部分nextpos在交叉部分需要获取lp和p中的相等read pair进行重新计算
bool addDataToPos = true;
if (alreadyAdd.find(ck) != alreadyAdd.end()) {
// 之前已经添加过了后面就不用再添加数据了因为同一个位置可能找到两个及以上的unpair数据处理之前的数据时候可能已经添加了这些数据
addDataToPos = false;
} else
alreadyAdd.insert(ck);
if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
}
// 第一种情况,第二种情况下都会出现,复杂情况一
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的刚好和该位点一致
auto &gUnpairInfo = gPosInfo->second;
auto pPosInfo = p.unpairedPosArr.find(nextPosKey);
if (pPosInfo != p.unpairedPosArr.end()) {
auto &pUnpairInfo = pPosInfo->second;
for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname看是否有匹配的
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) {
auto pe = g.unpairedDic[rn].unpairedRE;
auto fe = p.unpairedDic[rn].unpairedRE;
modifyPairedEnds(fe, &pe);
prevPairArr.push_back(pe);
g.unpairedDic.erase(rn);
p.unpairedDic.erase(rn);
// cout << "找到了!" << rn << endl;
}
}
}
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
} else { // prevpos在交叉部分
if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况
if (nextUnpairInfoP != nullptr) { // 且在pos点next task有unpair这样才把这些数据放到next task里
auto &nextPairArr = nextUnpairInfoP->pairArr;
nextPairArr.push_back(prevFragEnd);
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &nextPairArr);
}
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
recalcPos[ck] = nextPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
std::sort(nextPairArr.begin(), nextPairArr.end());
} else { // next task在该位点没有unpair那就把数据放到prev task里
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) // 第二种情况
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
} else { // 第四种情况
if (prevUnpairInfoP == nullptr) {
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
prevUnpairInfoP->taskSeq = lp.taskSeq;
}
auto &prevPairArr = prevUnpairInfoP->pairArr;
prevPairArr.push_back(prevFragEnd);
if (addDataToPos) {
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
}
recalcPos[ck] = prevPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end());
}
}
p.unpairedDic.erase(readName); // 在next task里删除该read
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
// if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" ||
// readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") {
// cout << "find in g: " << prevFragEnd.read1IndexInFile << "\t" << readName << endl;
// }
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
auto &remainPairArr = remainUnpairInfo.pairArr;
remainPairArr.push_back(remainFragEnd);
CalcKey ck = {remainPosKey, prevFragEnd.posKey};
recalcPos[ck] = remainPosInfo.taskSeq;
std::sort(remainPairArr.begin(), remainPairArr.end());
g.unpairedDic.erase(readName);
} else { // 都没找到,那就保存到遗留数据里
int64_t prevPosKey = prevFragEnd.posKey;
g.unpairedDic.insert(prevUnpair);
addToGlobal.insert(prevPosKey);
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546) {
// cout << "here find" << endl;
// }
}
}
map<int64_t, TaskSeqDupInfo> taskChanged;
MDSet<int64_t> posProcessed;
for (auto &e : recalcPos) {
auto posKey = e.first.read1Pos;
if (posProcessed.find(posKey) != posProcessed.end())
continue;
posProcessed.insert(posKey);
auto taskSeq = e.second;
auto &t = taskChanged[taskSeq];
// 在对应的任务包含的dup idx里修改结果数据
vector<ReadEnds> *pairArrP = nullptr;
if (taskSeq < lp.taskSeq)
pairArrP = &g.unpairedPosArr[posKey].pairArr;
else
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
// if (taskSeq == 98) cout << "handle recalc pairs: " << taskSeq << "\t" << posKey << endl;
// if (p.taskSeq == 163) {
// cout << "final" << endl;
// }
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx,
&t.notRepIdx);
if (taskSeq < lp.taskSeq)
g.unpairedPosArr.erase(posKey);
}
// if (lp.taskSeq > 98) {
// // exit(0);
// }
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
// 放在这里因为lp中的unpairedPosArr中的readends可能会被修改比如optical duplicate
for (auto posKey : addToGlobal) {
// if (posKey == 14293757783047) {
// for (auto &re: lp.unpairedPosArr[posKey].pairArr) {
// cout << "lp reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl;
// }
// cout << "found in g: " << lp.taskSeq << "\t" << lp.unpairedPosArr[posKey].unpairedNum << "\t" <<
// lp.unpairedPosArr[posKey].pairArr.size() << endl;
// }
g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
}
// 更新结果
for (auto &e : taskChanged) {
auto taskSeq = e.first;
auto &t = e.second;
// cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl;
if (taskSeq < lp.taskSeq) {
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx,
g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq],
g.latterNotRepIdxArr[taskSeq]);
} else if (taskSeq == lp.taskSeq) {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &lp,
&p);
} else {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &p,
&lp); // 把结果放到p中
}
}
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' <<
// g.unpairedPosArr.size() << endl; cout << "calc g time: " <<
// t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据
g.latterDupIdxArr.push_back(DPSet<DupInfo>());
g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterRepIdxArr.push_back(DPSet<DupInfo>());
g.latterNotDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotRepIdxArr.push_back(MDSet<int64_t>());
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
// cout << vIdx.size() << endl;
// zzhtestnum += vIdx.size();
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
}
/* 当所有任务结束后global data里还有未处理的数据 */
static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
// cout << "last task start" << endl;
auto &lp = *task;
auto &g = *gDataArg;
// 遗留的未匹配的pair
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first;
auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey;
modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下poskey可能是后面的read
auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey];
remainUnpairInfo.pairArr.push_back(remainFragEnd);
g.unpairedDic.erase(readName);
} else {
g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数
}
}
map<int64_t, TaskSeqDupInfo> taskChanged;
for (auto &e : g.unpairedPosArr) {
auto posKey = e.first;
auto taskSeq = e.second.taskSeq;
auto &t = taskChanged[taskSeq];
auto &arr = g.unpairedPosArr[posKey].pairArr;
if (arr.size() > 1) {
std::sort(arr.begin(), arr.end());
// cout << "last task before mark pair" << endl;
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notRepIdx);
} else if (arr.size() == 1) {
gMetrics.singletonReads.insert(arr[0].read1IndexInFile);
}
}
// 更新结果
vector<int64_t> addDup;
map<int64_t, int64_t> ndPosVal;
for (auto &e : taskChanged) {
auto taskSeq = e.first;
auto &t = e.second;
// cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl;
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx,
g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq],
g.latterNotRepIdxArr[taskSeq]);
}
// cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
g.unpairedPosArr.clear();
// for (auto &rn : g.unpairedDic) {
// cout << rn.first << endl;
// }
/*
int taskSeq = 0;
for (auto &arr : g.dupIdxArr) {
cout << taskSeq << "\t" << arr.size();
if (taskSeq < (int)g.dupIdxArr.size() - 1)
cout << "\t" << g.latterDupIdxArr[taskSeq].size() << "\t" << g.latterNotDupIdxArr[taskSeq].size() <<
endl; else cout << endl;
// if (taskSeq == 98) {
// vector<DupInfo> v;
// v.insert(v.end(), g.latterDupIdxArr[taskSeq].begin(), g.latterDupIdxArr[taskSeq].end());
// std::sort(v.begin(), v.end());
// for (auto &idx : v)
// cout << idx.idx << " " << idx.repIdx << " " << idx.dupSet << endl;
// v.clear();
// v.insert(v.end(), g.latterNotDupIdxArr[taskSeq].begin(), g.latterNotDupIdxArr[taskSeq].end());
// std::sort(v.begin(), v.end());
// for (auto &idx : v) cout << idx << endl;
// }
taskSeq++;
}
*/
// 将dupidx放进全局数据
vector<DupInfo> cacheDupIdx;
vector<DupInfo> midArr;
vector<int64_t> intCacheDupIdx;
vector<int64_t> intMidArr;
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) {
/*
if (i == 98) {
cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl;
int inlatter = 0;
int innotdup = 0;
int latterinnotdup = 0;
for (auto idx : g.dupIdxArr[i]) {
if (g.latterDupIdxArr[i].find(idx) != g.latterDupIdxArr[i].end()) {
++inlatter;
}
}
for (auto idx : g.dupIdxArr[i]) {
if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) {
cout << idx.idx << endl;
++innotdup;
}
}
for (auto idx : g.latterDupIdxArr[i]) {
if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) {
++latterinnotdup;
}
}
cout << inlatter << "\t" << innotdup << "\t" << latterinnotdup << endl;
}
*/
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr);
/*
if (i == 98) {
cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl;
}
*/
}
for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i],
intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
// cout << "last " << vIdx.size() << endl;
// zzhtestnum += vIdx.size();
/*
for (auto &arr : g.dupIdxArr) {
cout << taskSeq << "\t" << arr.size() << endl;
taskSeq++;
}
*/
g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
}
void calculateMetrics(MarkDupDataArg &lp, MarkDupDataArg &p, GlobalDataArg &g, DuplicationMetrics *pgMetrics) {
DuplicationMetrics &gMetrics = *pgMetrics;
// gMetrics.READ_PAIRS_EXAMINED /= 2;
cout << "calculateMetrics start: " << endl;
cout << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size() << endl;
cout << "all: " << (lp.unpairedDic.size() + p.unpairedDic.size() + g.unpairedDic.size()) << endl;
cout << "calculateMetrics end" << endl;
}
/* 串行处理数据,标记冗余 */
void serialMarkDups() {
tm_arr[5].acc_start();
Timer::log_time("serial start");
// 读取缓存初始化
BamBufType inBamBuf(g_gArg.use_asyncio);
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
// BamBufType inBamBuf(false);
// inBamBuf.Init(g_inBamFp, g_inBamHeader, 100 * 1024 * 1024);
int64_t processedBamNum = 0;
MarkDupDataArg smdArg1, smdArg2;
MarkDupDataArg *lastArgP = &smdArg1;
MarkDupDataArg *curArgP = &smdArg2;
bool isFirstRound = true;
int roundNum = 0;
int64_t readNumSum = 0;
while (inBamBuf.ReadStat() >= 0) {
Timer t_round;
// 读取bam文件中的read
tm_arr[4].acc_start();
size_t readNum = inBamBuf.ReadBam();
readNumSum += readNum;
// readNumSum += inBamBuf.GetBamArr().size();
tm_arr[4].acc_end();
cout << "read num: " << readNum << '\t' << roundNum << endl;
// lastArgP = curArgP;
tm_arr[6].acc_start();
curArgP->taskSeq = roundNum;
curArgP->bamStartIdx = processedBamNum;
curArgP->bams = inBamBuf.GetBamArr();
tm_arr[6].acc_end();
tm_arr[0].acc_start();
Timer t1;
generateReadEnds(curArgP);
// cout << "calc read end time: " << t1.seconds_elapsed() << " s" <<
// endl;
tm_arr[0].acc_end();
tm_arr[1].acc_start();
t1.reinit();
markdups(curArgP);
// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
tm_arr[1].acc_end();
if (!isFirstRound) {
tm_arr[2].acc_start();
t1.reinit();
handleIntersectData(lastArgP, curArgP, &gData);
// cout << "intersect time: " << t1.seconds_elapsed() << " s" <<
// endl;
// addTaskIdxToSet(lastArgP, &gData);
tm_arr[2].acc_end();
} else {
isFirstRound = false;
}
inBamBuf.ClearAll(); // 清理上一轮读入的数据
processedBamNum += readNum;
// 交换
auto tmp = lastArgP;
lastArgP = curArgP;
curArgP = tmp;
// cout << "round time: " << t_round.seconds_elapsed() << endl;
roundNum++;
if (roundNum % 100 == 0) {
// cout << "read sum: " << readNumSum << endl;
// cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl;
}
}
// cout << "here" << endl;
tm_arr[3].acc_start();
// 处理剩下的全局数据
handleLastTask(lastArgP, &gData);
// 计算各种统计指标
// calculateMetrics(*lastArgP, *curArgP, gData, &gMetrics);
// cout << "here 2" << endl;
tm_arr[3].acc_end();
tm_arr[5].acc_end();
// 统计所有冗余index数量
int64_t dupNum = 0;
int64_t opticalDupNum = 0;
map<int64_t, int> dup;
int taskSeq = 0;
/* for (auto &arr : gData.dupIdxArr) {
for (auto idx : arr) {
if (dup.find(idx.idx) != dup.end()) {
// cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t'
// << idx << endl;
}
dup[idx.idx] = taskSeq;
}
// cout << taskSeq << "\t" << arr.size() << endl;
taskSeq++;
}
*/
// #include <fstream>
// ofstream out("tumor_dup.txt");
// for (auto idx : dup)
// {
// out << idx << endl;
// }
// out.close();
for (auto &arr : gData.dupIdxArr) dupNum += arr.size();
for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size();
cout << "dup num : " << dupNum << '\t' << dup.size() << "\t" << zzhtestnum << endl;
cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl;
cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl;
cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl;
cout << "handle last : " << tm_arr[3].acc_seconds_elapsed() << endl;
cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl;
cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl;
cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl;
cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl;
cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl;
cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl;
cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl;
cout << "optical size: " << opticalDupNum << endl;
cout << "singleton size: " << gMetrics.singletonReads.size() << endl;
Timer::log_time("serial end ");
cout << "read num sum: " << readNumSum << endl;
// for (auto i : gData.dupArr)
// cout << i << endl;
}