解决了一个某些情况下冗余不完全的bug,在情况三中,把全部数据都放到后来的task里,就可以了

This commit is contained in:
zzh 2024-09-04 11:08:03 +08:00
parent 85515ac06e
commit f84d7bb0dc
7 changed files with 360 additions and 114 deletions

7
.vscode/launch.json vendored
View File

@ -13,13 +13,14 @@
"program": "${workspaceRoot}/build/bin/picard_cpp", "program": "${workspaceRoot}/build/bin/picard_cpp",
"args": [ "args": [
"MarkDuplicates", "MarkDuplicates",
"--INPUT", "~/data/bam/100w.bam", "--INPUT", "~/data/bam/zy_tumor.bam",
"--OUTPUT", "out.bam", "--OUTPUT", "/dev/null",
"--METRICS_FILE", "metrics.txt", "--METRICS_FILE", "metrics.txt",
"--num_threads", "1", "--num_threads", "1",
"--max_mem", "1G", "--max_mem", "2G",
"--verbosity", "DEBUG", "--verbosity", "DEBUG",
"--asyncio", "true", "--asyncio", "true",
"--READ_NAME_REGEX", ""
], ],
"cwd": "${workspaceFolder}", // "cwd": "${workspaceFolder}", //
} }

View File

@ -95,6 +95,12 @@
"netfwd": "cpp", "netfwd": "cpp",
"timer": "cpp", "timer": "cpp",
"__nullptr": "cpp", "__nullptr": "cpp",
"__node_handle": "c" "__node_handle": "c",
"__hash_table": "cpp",
"__tree": "cpp",
"queue": "cpp",
"stack": "cpp",
"__bit_reference": "cpp",
"__string": "cpp"
} }
} }

11
run.sh
View File

@ -1,20 +1,21 @@
#input=~/data/bam/zy_normal.bam #input=~/data/bam/zy_normal.bam
#input=~/data/bam/zy_tumor.bam input=~/data/bam/zy_tumor.bam
input=~/data/bam/100w.bam #input=~/data/bam/100w.bam
#input=~/data/bam/1kw.sam #input=~/data/bam/1kw.sam
#input=~/data/bam/n1kw.sam #input=~/data/bam/n1kw.sam
time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \
MarkDuplicates \ MarkDuplicates \
--INPUT $input \ --INPUT $input \
--OUTPUT ~/data/bam/out.bam \ --OUTPUT /dev/null \
--INDEX_FORMAT BAI \ --INDEX_FORMAT BAI \
--num_threads 1 \ --num_threads 1 \
--max_mem 2G \ --max_mem 2G \
--verbosity DEBUG \ --verbosity DEBUG \
--asyncio true #\ --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
#--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX ""
#--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
#--READ_NAME_REGEX ""
# --INPUT /mnt/d/data/100w.bam \ # --INPUT /mnt/d/data/100w.bam \
# --INPUT /mnt/d/data/zy_normal.bam \ # --INPUT /mnt/d/data/zy_normal.bam \

View File

@ -154,13 +154,15 @@ int MarkDuplicates(int argc, char *argv[]) {
// BamBufType inBuf(false); // BamBufType inBuf(false);
BamBufType inBuf(g_gArg.use_asyncio); BamBufType inBuf(g_gArg.use_asyncio);
inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
DupIdxQueue<DupInfo> dupIdxQue; DupIdxQueue<DupInfo> dupIdxQue, repIdxQue;
dupIdxQue.Init(&gData.dupIdxArr); dupIdxQue.Init(&gData.dupIdxArr);
repIdxQue.Init(&gData.repIdxArr);
Timer tw; Timer tw;
cout << "dupsize: " << dupIdxQue.Size() << endl; cout << "dupsize: " << dupIdxQue.Size() << endl;
uint64_t bamIdx = 0; uint64_t bamIdx = 0;
DupInfo dupIdx = dupIdxQue.Pop(); DupInfo dupIdx = dupIdxQue.Pop();
DupInfo repIdx = repIdxQue.Pop();
exit(0);
while (inBuf.ReadStat() >= 0) { while (inBuf.ReadStat() >= 0) {
Timer tw1; Timer tw1;
size_t readNum = inBuf.ReadBam(); size_t readNum = inBuf.ReadBam();
@ -168,12 +170,21 @@ int MarkDuplicates(int argc, char *argv[]) {
for (size_t i = 0; i < inBuf.Size(); ++i) { for (size_t i = 0; i < inBuf.Size(); ++i) {
/* 判断是否冗余 */ /* 判断是否冗余 */
if (bamIdx == dupIdx) { if (bamIdx == dupIdx) {
cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl; //if (dupIdx.dupSet != 0)
// cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl;
// 为了防止小内存运行的时候有重复的dupidx这时候dup的repIdx和dupSetSize可能会有不同 // 为了防止小内存运行的时候有重复的dupidx这时候dup的repIdx和dupSetSize可能会有不同
while((dupIdx = dupIdxQue.Pop()) == bamIdx); while((dupIdx = dupIdxQue.Pop()) == bamIdx);
} }
if (bamIdx == -1) { // repressent if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent
// cerr << bamIdx << " " << repIdx.repIdx << " " << repIdx.dupSet << endl;
while (repIdx == bamIdx || repIdx.dupSet == 0) {
if (repIdxQue.Size() > 0)
repIdx = repIdxQue.Pop();
else {
repIdx = -1;
break;
}
}
} }
if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) { if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) {
Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str());

View File

@ -214,6 +214,7 @@ struct MarkDupsArg
/* "The scoring strategy for choosing the non-duplicate among candidates." */ /* "The scoring strategy for choosing the non-duplicate among candidates." */
ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::SUM_OF_BASE_QUALITIES; ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::SUM_OF_BASE_QUALITIES;
//ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::TOTAL_MAPPED_REFERENCE_LENGTH;
/* "The program record ID for the @PG record(s) created by this program. Set to null to disable " + /* "The program record ID for the @PG record(s) created by this program. Set to null to disable " +
"PG record creation. This string may have a suffix appended to avoid collision with other " + "PG record creation. This string may have a suffix appended to avoid collision with other " +

View File

@ -10,7 +10,6 @@
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
#include <set>
#include <vector> #include <vector>
#include "dup_metrics.h" #include "dup_metrics.h"
@ -19,7 +18,6 @@
#include "shared_args.h" #include "shared_args.h"
using std::cout; using std::cout;
using std::set;
using std::vector; using std::vector;
/* 查找 */ /* 查找 */
@ -65,8 +63,9 @@ static inline void sortReadEndsArr(vector<ReadEnds> &arr) {
} }
/* 处理一组pairend的readends标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ /* 处理一组pairend的readends标记冗余, 这个函数需要串行运行,因为需要做一些统计*/
static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<DupInfo> *dupIdx, set<int64_t> *opticalDupIdx = nullptr, static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
set<int64_t> *notDupIdx = nullptr, set<int64_t> *notOpticalDupIdx = nullptr) { DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
if (vpRe.size() < 2) { if (vpRe.size() < 2) {
if (vpRe.size() == 1) { if (vpRe.size() == 1) {
// addSingletonToCount(libraryIdGenerator); // addSingletonToCount(libraryIdGenerator);
@ -78,7 +77,8 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<DupInfo> *dupId
} }
int maxScore = 0; int maxScore = 0;
const ReadEnds *pBest = nullptr; const ReadEnds *pBest = nullptr;
// int maxOperateTime = 0; // bool print = false;
// int maxOperateTime = 0;
/** All read ends should have orientation FF, FR, RF, or RR **/ /** All read ends should have orientation FF, FR, RF, or RR **/
for (auto pe : vpRe) { // 找分数最高的readend for (auto pe : vpRe) { // 找分数最高的readend
// maxOperateTime = max(maxOperateTime, pe->oprateTime); // maxOperateTime = max(maxOperateTime, pe->oprateTime);
@ -87,7 +87,21 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<DupInfo> *dupId
maxScore = pe->score; maxScore = pe->score;
pBest = pe; pBest = pe;
} }
/*
if (pe->read1IndexInFile == 255830545) {
print = true;
}
*/
} }
/*
if (print) {
cout << "mark pair find: " << endl;
for (auto pe : vpRe) {
cout << pe->read1IndexInFile << " " << pe->read2IndexInFile << " " << pe->score << endl;
}
cout << "mark pair end: " << endl;
}
*/
// cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" << pBest->read2IndexInFile << endl; // cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" << pBest->read2IndexInFile << endl;
// if (maxOperateTime == 0) ++zzhtestnum; // if (maxOperateTime == 0) ++zzhtestnum;
if (notDupIdx != nullptr) { if (notDupIdx != nullptr) {
@ -130,15 +144,25 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<DupInfo> *dupId
} }
} }
} }
// 在输出的bam文件中添加tag // 在输出的bam文件中添加tag, best作为dupset的代表
if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) { if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) {
// addRepresentativeReadIndex(vpRe); // 每次都更新就行,用最新的覆盖之前的(如果之前有) repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
if (notRepIdx != nullptr) {
for (auto pe : vpRe) {
if (pe != pBest) {
notRepIdx->insert(pe->read1IndexInFile);
if (pe->read2IndexInFile != pe->read1IndexInFile)
notRepIdx->insert(pe->read2IndexInFile);
}
}
}
} }
} }
/* 处理一组非paired的readends标记冗余 */ /* 处理一组非paired的readends标记冗余 */
static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, set<DupInfo> *dupIdx, static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, DPSet<DupInfo> *dupIdx,
set<int64_t> *notDupIdx = nullptr) { MDSet<int64_t> *notDupIdx = nullptr) {
if (containsPairs) { if (containsPairs) {
for (auto pe : vpRe) { for (auto pe : vpRe) {
if (!pe->IsPaired()) { if (!pe->IsPaired()) {
@ -182,7 +206,7 @@ static void generateReadEnds(SerailMarkDupArg *arg) {
p.unpairedPosArr.clear(); p.unpairedPosArr.clear();
/* 处理每个read创建ReadEnd并放入frag和pair中 */ /* 处理每个read创建ReadEnd并放入frag和pair中 */
// set<ReadEnds> reSet; // MDSet<ReadEnds> reSet;
// ReadEnds lastRe; // ReadEnds lastRe;
for (int i = 0; i < p.bams.size(); ++i) { // 循环处理每个read for (int i = 0; i < p.bams.size(); ++i) { // 循环处理每个read
@ -227,30 +251,37 @@ static void generateReadEnds(SerailMarkDupArg *arg) {
unpairArrInfo.taskSeq = p.taskSeq; unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first); unpairArrInfo.readNameSet.insert(e.first);
} }
if (p.taskSeq == 98) {
for (auto &re : p.unpairedPosArr[14293757783047].pairArr) {
cout << "task 99 unpair reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl;
}
}
// cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << // cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() <<
// endl; // endl;
} }
/* 处理pairs */ /* 处理pairs */
static void processPairs(vector<ReadEnds> &readEnds, set<DupInfo> *dupIdx, set<int64_t> *opticalDupIdx, static void processPairs(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
set<int64_t> *notDupIdx = nullptr, set<int64_t> *notOpticalDupIdx = nullptr) { DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
// return;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
const ReadEnds *pReadEnd = nullptr; const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds) { for (auto &re : readEnds) {
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
vpCache.push_back(&re); // 处理一个潜在的冗余组 vpCache.push_back(&re); // 处理一个潜在的冗余组
else { else {
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); // 不一样 markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); // 不一样
vpCache.clear(); vpCache.clear();
vpCache.push_back(&re); vpCache.push_back(&re);
pReadEnd = &re; pReadEnd = &re;
} }
} }
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx);
} }
/* 处理frags */ /* 处理frags */
static void processFrags(vector<ReadEnds> &readEnds, set<DupInfo> *dupIdx, set<int64_t> *notDupIdx = nullptr) { static void processFrags(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *notDupIdx = nullptr) {
bool containsPairs = false; bool containsPairs = false;
bool containsFrags = false; bool containsFrags = false;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
@ -282,9 +313,11 @@ static void markdups(SerailMarkDupArg *arg) {
p.pairDupIdx.clear(); p.pairDupIdx.clear();
p.pairOpticalDupIdx.clear(); p.pairOpticalDupIdx.clear();
p.fragDupIdx.clear(); p.fragDupIdx.clear();
p.pairRepIdx.clear();
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */ /* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
// 先处理 pair // 先处理 pair
processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx); processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx);
// cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl;
// 再处理frag // 再处理frag
processFrags(p.frags, &p.fragDupIdx); processFrags(p.frags, &p.fragDupIdx);
@ -294,7 +327,7 @@ static void markdups(SerailMarkDupArg *arg) {
static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds> &rightArr, vector<ReadEnds> *dst, static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds> &rightArr, vector<ReadEnds> *dst,
bool isPairCmp = false) { bool isPairCmp = false) {
if (leftArr.empty() || rightArr.empty()) { if (leftArr.empty() || rightArr.empty()) {
cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl; // cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl;
return; return;
} }
const size_t leftEndIdx = leftArr.size() - 1; const size_t leftEndIdx = leftArr.size() - 1;
@ -321,7 +354,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
} }
/* 将frags重叠部分的dup idx放进数据中 */ /* 将frags重叠部分的dup idx放进数据中 */
static inline void refreshFragDupIdx(set<DupInfo> &dupIdx, set<int64_t> &notDupIdx, SerailMarkDupArg *lastArg, static inline void refreshFragDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &notDupIdx, SerailMarkDupArg *lastArg,
SerailMarkDupArg *curArg) { SerailMarkDupArg *curArg) {
auto &lp = *lastArg; auto &lp = *lastArg;
auto &p = *curArg; auto &p = *curArg;
@ -336,50 +369,56 @@ static inline void refreshFragDupIdx(set<DupInfo> &dupIdx, set<int64_t> &notDupI
} }
/* 将pairs重叠部分的dup idx放进数据中 */ /* 将pairs重叠部分的dup idx放进数据中 */
static inline void refreshPairDupIdx(set<DupInfo> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> &notDupIdx, static inline void refreshPairDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDupIdx, DPSet<DupInfo> &repIdx,
set<int64_t> &notOpticalDupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { MDSet<int64_t> &notDupIdx, MDSet<int64_t> &notOpticalDupIdx, MDSet<int64_t> &notRepIdx,
SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) {
auto &lp = *lastArg; auto &lp = *lastArg;
auto &p = *curArg; auto &p = *curArg;
for (auto idx : dupIdx) { for (auto idx : dupIdx) { lp.pairDupIdx.insert(idx); p.pairDupIdx.erase(idx); }
lp.pairDupIdx.insert(idx); for (auto idx : opticalDupIdx) { lp.pairOpticalDupIdx.insert(idx); p.pairOpticalDupIdx.erase(idx); }
p.pairDupIdx.erase(idx); for (auto idx : repIdx) { lp.pairRepIdx.insert(idx); p.pairRepIdx.erase(idx); }
} // for (auto idx : notDupIdx) {
for (auto idx : opticalDupIdx) { // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl;
lp.pairOpticalDupIdx.insert(idx); // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl;
p.pairOpticalDupIdx.erase(idx); // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl;
} // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl;
for (auto idx : notDupIdx) { // lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx);
lp.pairDupIdx.erase(idx); // }
//lp.pairOpticalDupIdx.erase(idx); for (auto idx : notOpticalDupIdx) { lp.pairOpticalDupIdx.erase(idx); p.pairOpticalDupIdx.erase(idx); }
p.pairDupIdx.erase(idx); for (auto idx : notRepIdx) { lp.pairRepIdx.erase(idx); p.pairRepIdx.erase(idx); }
//p.pairOpticalDupIdx.erase(idx);
}
for (auto idx : notOpticalDupIdx) {
lp.pairOpticalDupIdx.erase(idx);
p.pairOpticalDupIdx.erase(idx);
}
} }
// 用来分别处理dup和optical dup // 用来分别处理dup和optical dup
static void refeshTaskDupInfo(set<DupInfo> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> &notDupIdx, static void refeshTaskDupInfo(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDupIdx, DPSet<DupInfo> &repIdx,
set<int64_t> &notOpticalDupIdx, set<DupInfo> &latterDupIdx, MDSet<int64_t> &notDupIdx, MDSet<int64_t> &notOpticalDupIdx, MDSet<int64_t> &notRepIdx,
set<int64_t> &latterOpticalDupIdx, set<int64_t> &latterNotDupIdx, DPSet<DupInfo> &latterDupIdx, MDSet<int64_t> &latterOpticalDupIdx, DPSet<DupInfo> &latterRepIdx,
set<int64_t> &latterNotOpticalDupIdx) { MDSet<int64_t> &latterNotDupIdx, MDSet<int64_t> &latterNotOpticalDupIdx, MDSet<int64_t> &latterNotRepIdx) {
for (auto idx : dupIdx) latterDupIdx.insert(idx); for (auto idx : dupIdx) {
latterDupIdx.insert(idx);
// latterNotDupIdx.erase(idx); // 后来的更新为准
}
for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx);
for (auto idx : repIdx) latterRepIdx.insert(idx);
for (auto idx : notDupIdx) latterNotDupIdx.insert(idx); for (auto idx : notDupIdx) latterNotDupIdx.insert(idx);
for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx); for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx);
for (auto idx : notRepIdx) latterNotRepIdx.insert(idx);
} }
/* 最后合并数据并排序 */ /* 最后合并数据并排序 */
template<typename T> template<typename DupContainer, typename T>
static void refeshFinalTaskDupInfo(set<T> &dupIdx, set<int64_t> &notDupIdx, vector<T> &dupArr) { static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> &notDupIdx, vector<T> &dupArr,
vector<T> midArr; vector<T> &cacheDupIdx, vector<T> &midArr) {
midArr.resize(0);
cacheDupIdx.resize(0);
cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end());
std::sort(cacheDupIdx.begin(), cacheDupIdx.end());
auto ai = dupArr.begin(); auto ai = dupArr.begin();
auto bi = dupIdx.begin();
auto ae = dupArr.end(); auto ae = dupArr.end();
auto be = dupIdx.end(); auto bi = cacheDupIdx.begin();
auto be = cacheDupIdx.end();
//auto bi = dupIdx.begin();
//auto be = dupIdx.end();
T val = 0; T val = 0;
while (ai != ae && bi != be) { while (ai != ae && bi != be) {
@ -388,8 +427,8 @@ static void refeshFinalTaskDupInfo(set<T> &dupIdx, set<int64_t> &notDupIdx, vect
} else if (*bi < *ai) { } else if (*bi < *ai) {
val = *bi++; val = *bi++;
} else { } else {
val = *ai++; val = *bi++; // 相等的时候取后面的作为结果
bi++; ai++;
} }
if (notDupIdx.find(val) == notDupIdx.end()) { if (notDupIdx.find(val) == notDupIdx.end()) {
midArr.push_back(val); midArr.push_back(val);
@ -417,9 +456,12 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
auto &g = *gDataArg; auto &g = *gDataArg;
vector<ReadEnds> reArr; vector<ReadEnds> reArr;
set<DupInfo> dupIdx; DPSet<DupInfo> dupIdx;
set<int64_t> notOpticalDupIdx; MDSet<int64_t> opticalDupIdx;
set<int64_t> notDupIdx; DPSet<DupInfo> repIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notRepIdx;
// 先处理重叠的frags // 先处理重叠的frags
getIntersectData(lp.frags, p.frags, &reArr); getIntersectData(lp.frags, p.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx); processFrags(reArr, &dupIdx, &notDupIdx);
@ -429,15 +471,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
reArr.clear(); reArr.clear();
dupIdx.clear(); dupIdx.clear();
notDupIdx.clear(); notDupIdx.clear();
set<int64_t> opticalDupIdx;
getIntersectData(lp.pairs, p.pairs, &reArr, true); getIntersectData(lp.pairs, p.pairs, &reArr, true);
processPairs(reArr, &dupIdx, &opticalDupIdx, &notDupIdx, &notOpticalDupIdx); processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx);
refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx, &lp, &p); refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, &lp, &p);
//cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl;
//cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl;
// 处理之前未匹配的部分 // 处理之前未匹配的部分
map<CalcKey, int64_t> recalcPos; map<CalcKey, int64_t> recalcPos;
set<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了 CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
set<int64_t> addToGlobal; MDSet<int64_t> addToGlobal;
int64_t prevLastPos = 0, nextFirstPos = 0; int64_t prevLastPos = 0, nextFirstPos = 0;
if (lp.frags.size() > 0) if (lp.frags.size() > 0)
prevLastPos = lp.frags.back().posKey; prevLastPos = lp.frags.back().posKey;
@ -446,8 +489,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
// cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl; // cout << "range: " << nextFirstPos << '\t' << prevLastPos << endl;
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
auto &readName = prevUnpair.first; auto &readName = prevUnpair.first;
// if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" ||
// readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") {
// cout << "read name found: " << lp.taskSeq << endl;
// }
auto &prevPosInfo = prevUnpair.second; auto &prevPosInfo = prevUnpair.second;
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546
// || prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) {
// cout << prevFragEnd.read1IndexInFile << "\t" << readName << endl;
// }
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto &nextPosInfo = p.unpairedDic[readName]; auto &nextPosInfo = p.unpairedDic[readName];
@ -463,6 +514,16 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546 ||
// prevFragEnd.read1IndexInFile == 255832599 || prevFragEnd.read1IndexInFile == 255832601) {
// cout << "find in p: " << lp.taskSeq << "\t" << prevFragEnd.read1IndexInFile << "\t" << readName << endl;
// if (nextUnpairInfoP != nullptr)
// cout << "next p: " << nextUnpairInfoP->unpairedNum << endl;
// if (prevUnpairInfoP != nullptr)
// cout << "prev p: " << prevUnpairInfoP->unpairedNum << endl;
// cout << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl;
// }
// pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定 // pos分为两种情况根据poskey(pair中两个read分别的pos)的位置确定
// 1. // 1.
// prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据; // prevpos在交叉部分之前nextpos在交叉部分之后这种情况不需要获取pairarr中的数据;
@ -519,11 +580,13 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
prevPairArr.push_back(prevFragEnd); prevPairArr.push_back(prevFragEnd);
if (addDataToPos) { if (addDataToPos) {
getEqualRE(prevFragEnd, p.pairs, &prevPairArr); getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
getEqualRE(prevFragEnd, p.pairs, &nextPairArr);
} }
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
recalcPos[ck] = nextPosInfo.taskSeq; recalcPos[ck] = nextPosInfo.taskSeq;
std::sort(prevPairArr.begin(), prevPairArr.end()); std::sort(prevPairArr.begin(), prevPairArr.end());
std::sort(nextPairArr.begin(), nextPairArr.end());
} else { // next task在该位点没有unpair那就把数据放到prev task里 } else { // next task在该位点没有unpair那就把数据放到prev task里
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
prevPairArr.push_back(prevFragEnd); prevPairArr.push_back(prevFragEnd);
@ -549,6 +612,10 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
} }
p.unpairedDic.erase(readName); // 在next task里删除该read p.unpairedDic.erase(readName); // 在next task里删除该read
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
// if (readName == "A01415:368:HL7NTDSX3:3:1104:5195:34757" ||
// readName == "A01415:368:HL7NTDSX3:3:1104:4887:32095") {
// cout << "find in g: " << prevFragEnd.read1IndexInFile << "\t" << readName << endl;
// }
auto &remainPosInfo = g.unpairedDic[readName]; auto &remainPosInfo = g.unpairedDic[readName];
auto remainFragEnd = remainPosInfo.unpairedRE; auto remainFragEnd = remainPosInfo.unpairedRE;
int64_t remainPosKey = remainFragEnd.posKey; int64_t remainPosKey = remainFragEnd.posKey;
@ -565,11 +632,14 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
int64_t prevPosKey = prevFragEnd.posKey; int64_t prevPosKey = prevFragEnd.posKey;
g.unpairedDic.insert(prevUnpair); g.unpairedDic.insert(prevUnpair);
addToGlobal.insert(prevPosKey); addToGlobal.insert(prevPosKey);
// if (prevFragEnd.read1IndexInFile == 255830545 || prevFragEnd.read1IndexInFile == 255830546) {
// cout << "here find" << endl;
// }
} }
} }
map<int64_t, TaskSeqDupInfo> taskChanged; map<int64_t, TaskSeqDupInfo> taskChanged;
set<int64_t> posProcessed; MDSet<int64_t> posProcessed;
for (auto &e : recalcPos) { for (auto &e : recalcPos) {
auto posKey = e.first.read1Pos; auto posKey = e.first.read1Pos;
if (posProcessed.find(posKey) != posProcessed.end()) if (posProcessed.find(posKey) != posProcessed.end())
@ -583,49 +653,79 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
pairArrP = &g.unpairedPosArr[posKey].pairArr; pairArrP = &g.unpairedPosArr[posKey].pairArr;
else else
pairArrP = &lp.unpairedPosArr[posKey].pairArr; pairArrP = &lp.unpairedPosArr[posKey].pairArr;
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx, &t.notOpticalDupIdx); // if (taskSeq == 98) cout << "handle recalc pairs: " << taskSeq << "\t" << posKey << endl;
// if (p.taskSeq == 163) {
// cout << "final" << endl;
// }
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx);
if (taskSeq < lp.taskSeq) if (taskSeq < lp.taskSeq)
g.unpairedPosArr.erase(posKey); g.unpairedPosArr.erase(posKey);
} }
// if (lp.taskSeq > 98) {
// // exit(0);
// }
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
// 放在这里因为lp中的unpairedPosArr中的readends可能会被修改比如optical duplicate // 放在这里因为lp中的unpairedPosArr中的readends可能会被修改比如optical duplicate
for (auto posKey : addToGlobal) g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; for (auto posKey : addToGlobal) {
// if (posKey == 14293757783047) {
// for (auto &re: lp.unpairedPosArr[posKey].pairArr) {
// cout << "lp reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl;
// }
// cout << "found in g: " << lp.taskSeq << "\t" << lp.unpairedPosArr[posKey].unpairedNum << "\t" << lp.unpairedPosArr[posKey].pairArr.size() << endl;
// }
g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
}
// 更新结果 // 更新结果
for (auto &e : taskChanged) { for (auto &e : taskChanged) {
auto taskSeq = e.first; auto taskSeq = e.first;
auto &t = e.second; auto &t = e.second;
// cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl;
if (taskSeq < lp.taskSeq) { if (taskSeq < lp.taskSeq) {
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx,
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq]);
} else if (taskSeq == lp.taskSeq) { } else if (taskSeq == lp.taskSeq) {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &lp, &p); refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &lp, &p);
} else { } else {
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &p, &lp); // 把结果放到p中 refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, &p, &lp); // 把结果放到p中
} }
} }
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' <<
// g.unpairedPosArr.size() << endl; cout << "calc g time: " << // g.unpairedPosArr.size() << endl; cout << "calc g time: " <<
// t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据 // t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据
g.latterDupIdxArr.push_back(set<DupInfo>()); g.latterDupIdxArr.push_back(DPSet<DupInfo>());
g.latterOpticalDupIdxArr.push_back(set<int64_t>()); g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotDupIdxArr.push_back(set<int64_t>()); g.latterRepIdxArr.push_back(DPSet<DupInfo>());
g.latterNotOpticalDupIdxArr.push_back(set<int64_t>()); g.latterNotDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotOpticalDupIdxArr.push_back(MDSet<int64_t>());
g.latterNotRepIdxArr.push_back(MDSet<int64_t>());
g.dupIdxArr.push_back(vector<DupInfo>()); g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back(); auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
// cout << vIdx.size() << endl;
// zzhtestnum += vIdx.size();
g.opticalDupIdxArr.push_back(vector<int64_t>()); g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back(); auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
} }
/* 当所有任务结束后global data里还有未处理的数据 */ /* 当所有任务结束后global data里还有未处理的数据 */
static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
// cout << "last task start" << endl;
auto &lp = *task; auto &lp = *task;
auto &g = *gDataArg; auto &g = *gDataArg;
// 遗留的未匹配的pair // 遗留的未匹配的pair
@ -643,6 +743,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
remainUnpairInfo.pairArr.push_back(remainFragEnd); remainUnpairInfo.pairArr.push_back(remainFragEnd);
g.unpairedDic.erase(readName); g.unpairedDic.erase(readName);
} else {
g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数
} }
} }
map<int64_t, TaskSeqDupInfo> taskChanged; map<int64_t, TaskSeqDupInfo> taskChanged;
@ -654,7 +756,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
if (arr.size() > 1) { if (arr.size() > 1) {
std::sort(arr.begin(), arr.end()); std::sort(arr.begin(), arr.end());
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); // cout << "last task before mark pair" << endl;
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notRepIdx);
} }
} }
// 更新结果 // 更新结果
@ -663,28 +766,104 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
for (auto &e : taskChanged) { for (auto &e : taskChanged) {
auto taskSeq = e.first; auto taskSeq = e.first;
auto &t = e.second; auto &t = e.second;
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], // cout << t.dupIdx.size() << "\t" << t.notDupIdx.size() << endl;
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx,
g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq]);
} }
cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; // cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
g.unpairedPosArr.clear(); g.unpairedPosArr.clear();
g.unpairedDic.clear(); g.unpairedDic.clear();
/*
int taskSeq = 0;
for (auto &arr : g.dupIdxArr) {
cout << taskSeq << "\t" << arr.size();
if (taskSeq < (int)g.dupIdxArr.size() - 1)
cout << "\t" << g.latterDupIdxArr[taskSeq].size() << "\t" << g.latterNotDupIdxArr[taskSeq].size() << endl;
else
cout << endl;
// if (taskSeq == 98) {
// vector<DupInfo> v;
// v.insert(v.end(), g.latterDupIdxArr[taskSeq].begin(), g.latterDupIdxArr[taskSeq].end());
// std::sort(v.begin(), v.end());
// for (auto &idx : v)
// cout << idx.idx << " " << idx.repIdx << " " << idx.dupSet << endl;
// v.clear();
// v.insert(v.end(), g.latterNotDupIdxArr[taskSeq].begin(), g.latterNotDupIdxArr[taskSeq].end());
// std::sort(v.begin(), v.end());
// for (auto &idx : v) cout << idx << endl;
// }
taskSeq++;
}
*/
// 将dupidx放进全局数据 // 将dupidx放进全局数据
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) vector<DupInfo> cacheDupIdx;
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]); vector<DupInfo> midArr;
vector<int64_t> intCacheDupIdx;
vector<int64_t> intMidArr;
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) {
/*
if (i == 98) {
cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl;
int inlatter = 0;
int innotdup = 0;
int latterinnotdup = 0;
for (auto idx : g.dupIdxArr[i]) {
if (g.latterDupIdxArr[i].find(idx) != g.latterDupIdxArr[i].end()) {
++inlatter;
}
}
for (auto idx : g.dupIdxArr[i]) {
if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) {
cout << idx.idx << endl;
++innotdup;
}
}
for (auto idx : g.latterDupIdxArr[i]) {
if (g.latterNotDupIdxArr[i].find(idx) != g.latterNotDupIdxArr[i].end()) {
++latterinnotdup;
}
}
cout << inlatter << "\t" << innotdup << "\t" << latterinnotdup << endl;
}
*/
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr);
/*
if (i == 98) {
cout << "98: " << g.latterDupIdxArr[i].size() << "\t" << g.dupIdxArr[i].size() << endl;
}
*/
}
for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i]); refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
g.dupIdxArr.push_back(vector<DupInfo>()); g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back(); auto &vIdx = g.dupIdxArr.back();
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
std::sort(vIdx.begin(), vIdx.end());
// cout << "last " << vIdx.size() << endl;
// zzhtestnum += vIdx.size();
/*
for (auto &arr : g.dupIdxArr) {
cout << taskSeq << "\t" << arr.size() << endl;
taskSeq++;
}
*/
g.opticalDupIdxArr.push_back(vector<int64_t>()); g.opticalDupIdxArr.push_back(vector<int64_t>());
auto &vOpticalIdx = g.opticalDupIdxArr.back(); auto &vOpticalIdx = g.opticalDupIdxArr.back();
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
g.repIdxArr.push_back(vector<DupInfo>());
auto &vRepIdx = g.repIdxArr.back();
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
std::sort(vRepIdx.begin(), vRepIdx.end());
} }
/* 串行处理数据,标记冗余 */ /* 串行处理数据,标记冗余 */
@ -771,17 +950,20 @@ void serialMarkDups() {
int64_t opticalDupNum = 0; int64_t opticalDupNum = 0;
map<int64_t, int> dup; map<int64_t, int> dup;
// int taskSeq = 0;
// for (auto &arr : gData.dupIdxArr) { int taskSeq = 0;
// for (auto idx : arr) { for (auto &arr : gData.dupIdxArr) {
// if (dup.find(idx.idx) != dup.end()) { for (auto idx : arr) {
// // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' if (dup.find(idx.idx) != dup.end()) {
// // << idx << endl; // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t'
// } // << idx << endl;
// dup[idx.idx] = taskSeq; }
// } dup[idx.idx] = taskSeq;
// taskSeq++; }
// } // cout << taskSeq << "\t" << arr.size() << endl;
taskSeq++;
}
// #include <fstream> // #include <fstream>
// ofstream out("tumor_dup.txt"); // ofstream out("tumor_dup.txt");
@ -794,7 +976,7 @@ void serialMarkDups() {
for (auto &arr : gData.dupIdxArr) dupNum += arr.size(); for (auto &arr : gData.dupIdxArr) dupNum += arr.size();
for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size();
cout << "dup num : " << dupNum << '\t' << dup.size() << endl; cout << "dup num : " << dupNum << '\t' << dup.size() << "\t" << zzhtestnum << endl;
cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl;
cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl;

View File

@ -2,13 +2,16 @@
#include <common/hts/bam_buf.h> #include <common/hts/bam_buf.h>
#include <robin-map/include/tsl/robin_map.h> #include <robin-map/include/tsl/robin_map.h>
#include <robin-map/include/tsl/robin_set.h>
#include <sam/utils/read_ends.h> #include <sam/utils/read_ends.h>
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_set>
using std::set; using std::set;
using std::unordered_set;
using std::string; using std::string;
using std::vector; using std::vector;
@ -28,6 +31,16 @@ struct CalcKey {
comp = (int)(read2Pos - o.read2Pos); comp = (int)(read2Pos - o.read2Pos);
return comp < 0; return comp < 0;
} }
bool operator==(const CalcKey &o) const { return read1Pos == o.read1Pos && read2Pos == o.read2Pos; }
std::size_t operator()(const CalcKey &o) const {
return std::hash<int64_t>()(read1Pos) ^ std::hash<int64_t>()(read2Pos);
}
};
struct CalcKeyHash {
std::size_t operator()(const CalcKey &o) const {
return std::hash<int64_t>()(o.read1Pos) ^ std::hash<int64_t>()(o.read2Pos);
}
}; };
/* 用来记录冗余索引相关的信息 */ /* 用来记录冗余索引相关的信息 */
@ -36,6 +49,7 @@ struct DupInfo {
int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引 int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引
int16_t dupSet = 0; // dup set size int16_t dupSet = 0; // dup set size
DupInfo() : DupInfo(-1, 0, 0) { }
DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) { } DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) { }
DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {} DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {}
bool operator<(const DupInfo &o) const { bool operator<(const DupInfo &o) const {
@ -49,12 +63,38 @@ struct DupInfo {
} }
}; };
struct DupInfoHash {
std::size_t operator()(const DupInfo &o) const { return std::hash<int64_t>()(o.idx); }
};
struct DupInfoEqual {
bool operator()(const DupInfo &o1, const DupInfo &o2) const { return o1.idx == o2.idx; }
bool operator()(const DupInfo &o1, const int64_t &o2) const { return o1.idx == o2; }
bool operator()(const int64_t &o1, const DupInfo &o2) const { return o1 == o2.idx; }
};
template<typename T>
// using MDSet = set<T>;
// using MDSet = unordered_set<T>;
using MDSet = tsl::robin_set<T>;
template <typename T>
// using DPSet = set<T>;
// using DPSet = unordered_set<T, DupInfoHash, DupInfoEqual>;
using DPSet = tsl::robin_set<T, DupInfoHash, DupInfoEqual>;
template <typename T>
//using CalcSet = set<T>;
using CalcSet = tsl::robin_set<T, CalcKeyHash>;
/* 当遗留数据在当前任务找到了pair read后进行冗余计算时候存放结果的数据结构 */ /* 当遗留数据在当前任务找到了pair read后进行冗余计算时候存放结果的数据结构 */
struct TaskSeqDupInfo { struct TaskSeqDupInfo {
set<DupInfo> dupIdx; DPSet<DupInfo> dupIdx;
set<int64_t> opticalDupIdx; MDSet<int64_t> opticalDupIdx;
set<int64_t> notDupIdx; DPSet<DupInfo> repIdx;
set<int64_t> notOpticalDupIdx; MDSet<int64_t> notDupIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notRepIdx;
}; };
/* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */ /* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */
@ -62,7 +102,7 @@ struct UnpairedPosInfo {
int unpairedNum = 0; int unpairedNum = 0;
int64_t taskSeq; int64_t taskSeq;
vector<ReadEnds> pairArr; vector<ReadEnds> pairArr;
set<string> readNameSet; MDSet<string> readNameSet;
}; };
// typedef unordered_map<string, UnpairedREInfo> UnpairedNameMap; // typedef unordered_map<string, UnpairedREInfo> UnpairedNameMap;
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap; // typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
@ -77,9 +117,10 @@ struct SerailMarkDupArg {
vector<BamWrap *> bams; // 存放待处理的bam read vector<BamWrap *> bams; // 存放待处理的bam read
vector<ReadEnds> pairs; // 成对的reads vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads vector<ReadEnds> frags; // 暂未找到配对的reads
set<DupInfo> pairDupIdx; // pair的冗余read的索引 DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
set<int64_t> pairOpticalDupIdx; // optical冗余read的索引 MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
set<DupInfo> fragDupIdx; // frag的冗余read的索引 DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储 UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
}; };
@ -92,12 +133,15 @@ struct GlobalDataArg {
// 每个task对应一个vector // 每个task对应一个vector
vector<vector<DupInfo>> dupIdxArr; vector<vector<DupInfo>> dupIdxArr;
vector<vector<int64_t>> opticalDupIdxArr; vector<vector<int64_t>> opticalDupIdxArr;
vector<vector<DupInfo>> repIdxArr;
// 用来存放后续计算的数据 // 用来存放后续计算的数据
vector<set<DupInfo>> latterDupIdxArr; vector<DPSet<DupInfo>> latterDupIdxArr;
vector<set<int64_t>> latterOpticalDupIdxArr; vector<MDSet<int64_t>> latterOpticalDupIdxArr;
vector<set<int64_t>> latterNotDupIdxArr; vector<DPSet<DupInfo>> latterRepIdxArr;
vector<set<int64_t>> latterNotOpticalDupIdxArr; vector<MDSet<int64_t>> latterNotDupIdxArr;
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
vector<MDSet<int64_t>> latterNotRepIdxArr;
}; };
// 串行运行mark duplicate // 串行运行mark duplicate