添加了dupset信息,还不完善
This commit is contained in:
parent
7352a00f2c
commit
85515ac06e
|
|
@ -13,11 +13,11 @@
|
||||||
"program": "${workspaceRoot}/build/bin/picard_cpp",
|
"program": "${workspaceRoot}/build/bin/picard_cpp",
|
||||||
"args": [
|
"args": [
|
||||||
"MarkDuplicates",
|
"MarkDuplicates",
|
||||||
"--INPUT", "~/data/bam/zy_normal.bam",
|
"--INPUT", "~/data/bam/100w.bam",
|
||||||
"--OUTPUT", "out.bam",
|
"--OUTPUT", "out.bam",
|
||||||
"--METRICS_FILE", "metrics.txt",
|
"--METRICS_FILE", "metrics.txt",
|
||||||
"--num_threads", "1",
|
"--num_threads", "1",
|
||||||
"--max_mem", "100G",
|
"--max_mem", "1G",
|
||||||
"--verbosity", "DEBUG",
|
"--verbosity", "DEBUG",
|
||||||
"--asyncio", "true",
|
"--asyncio", "true",
|
||||||
],
|
],
|
||||||
|
|
|
||||||
6
run.sh
6
run.sh
|
|
@ -1,6 +1,6 @@
|
||||||
input=~/data/bam/zy_normal.bam
|
#input=~/data/bam/zy_normal.bam
|
||||||
#input=~/data/bam/zy_tumor.bam
|
#input=~/data/bam/zy_tumor.bam
|
||||||
#input=~/data/bam/100w.bam
|
input=~/data/bam/100w.bam
|
||||||
#input=~/data/bam/1kw.sam
|
#input=~/data/bam/1kw.sam
|
||||||
#input=~/data/bam/n1kw.sam
|
#input=~/data/bam/n1kw.sam
|
||||||
|
|
||||||
|
|
@ -10,7 +10,7 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \
|
||||||
--OUTPUT ~/data/bam/out.bam \
|
--OUTPUT ~/data/bam/out.bam \
|
||||||
--INDEX_FORMAT BAI \
|
--INDEX_FORMAT BAI \
|
||||||
--num_threads 1 \
|
--num_threads 1 \
|
||||||
--max_mem 1G \
|
--max_mem 2G \
|
||||||
--verbosity DEBUG \
|
--verbosity DEBUG \
|
||||||
--asyncio true #\
|
--asyncio true #\
|
||||||
#--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
|
#--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
|
||||||
|
|
|
||||||
|
|
@ -154,14 +154,13 @@ int MarkDuplicates(int argc, char *argv[]) {
|
||||||
// BamBufType inBuf(false);
|
// BamBufType inBuf(false);
|
||||||
BamBufType inBuf(g_gArg.use_asyncio);
|
BamBufType inBuf(g_gArg.use_asyncio);
|
||||||
inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
|
inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
|
||||||
DupIdxQueue idxQue;
|
DupIdxQueue<DupInfo> dupIdxQue;
|
||||||
idxQue.Init(&gData.dupIdxArr);
|
dupIdxQue.Init(&gData.dupIdxArr);
|
||||||
Timer tw;
|
Timer tw;
|
||||||
cout << "dupsize: " << idxQue.Size() << endl;
|
cout << "dupsize: " << dupIdxQue.Size() << endl;
|
||||||
uint64_t bamIdx = 0;
|
uint64_t bamIdx = 0;
|
||||||
uint64_t dupIdx = idxQue.Pop();
|
DupInfo dupIdx = dupIdxQue.Pop();
|
||||||
cout << "dup arr size: " << gData.dupIdxArr.size() << endl;
|
|
||||||
cout << "first dup: " << dupIdx << endl;
|
|
||||||
while (inBuf.ReadStat() >= 0) {
|
while (inBuf.ReadStat() >= 0) {
|
||||||
Timer tw1;
|
Timer tw1;
|
||||||
size_t readNum = inBuf.ReadBam();
|
size_t readNum = inBuf.ReadBam();
|
||||||
|
|
@ -169,8 +168,12 @@ int MarkDuplicates(int argc, char *argv[]) {
|
||||||
for (size_t i = 0; i < inBuf.Size(); ++i) {
|
for (size_t i = 0; i < inBuf.Size(); ++i) {
|
||||||
/* 判断是否冗余 */
|
/* 判断是否冗余 */
|
||||||
if (bamIdx == dupIdx) {
|
if (bamIdx == dupIdx) {
|
||||||
// cerr << bamIdx << endl;
|
cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl;
|
||||||
dupIdx = idxQue.Pop();
|
// 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同
|
||||||
|
while((dupIdx = dupIdxQue.Pop()) == bamIdx);
|
||||||
|
}
|
||||||
|
if (bamIdx == -1) { // repressent
|
||||||
|
|
||||||
}
|
}
|
||||||
if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) {
|
if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) {
|
||||||
Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str());
|
Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str());
|
||||||
|
|
@ -183,7 +186,7 @@ int MarkDuplicates(int argc, char *argv[]) {
|
||||||
inBuf.ClearAll();
|
inBuf.ClearAll();
|
||||||
cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl;
|
cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl;
|
||||||
}
|
}
|
||||||
cout << "dupsize: " << idxQue.Size() << endl;
|
cout << "dupsize: " << dupIdxQue.Size() << endl;
|
||||||
if (sam_idx_save(g_outBamFp) < 0) {
|
if (sam_idx_save(g_outBamFp) < 0) {
|
||||||
Error("writing index failed");
|
Error("writing index failed");
|
||||||
sam_close(g_outBamFp);
|
sam_close(g_outBamFp);
|
||||||
|
|
|
||||||
|
|
@ -71,29 +71,31 @@ struct DupContainer {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 优先队列,用最小堆来实现对所有冗余索引的排序
|
* 优先队列,用最小堆来实现对所有冗余索引的排序
|
||||||
*/
|
*/
|
||||||
|
template <typename T>
|
||||||
struct PairArrIdIdx {
|
struct PairArrIdIdx {
|
||||||
int arrId = 0;
|
int arrId = 0;
|
||||||
uint64_t arrIdx = 0;
|
uint64_t arrIdx = 0;
|
||||||
int64_t dupIdx = 0;
|
T dupIdx = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
struct IdxGreaterThan {
|
struct IdxGreaterThan {
|
||||||
bool operator()(const PairArrIdIdx &a, const PairArrIdIdx &b) { return a.dupIdx > b.dupIdx; }
|
bool operator()(const PairArrIdIdx<T> &a, const PairArrIdIdx<T> &b) { return a.dupIdx > b.dupIdx; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
struct DupIdxQueue {
|
struct DupIdxQueue {
|
||||||
// 将冗余索引和他对应的task vector对应起来
|
// 将冗余索引和他对应的task vector对应起来
|
||||||
|
|
||||||
// 由于是多个task来查找冗余,所以每次找到的冗余index都放在一个独立的vector中,vector之间可能有重叠,所以需要用一个最小堆来维护
|
// 由于是多个task来查找冗余,所以每次找到的冗余index都放在一个独立的vector中,vector之间可能有重叠,所以需要用一个最小堆来维护
|
||||||
vector<vector<int64_t>> *dupIdx2DArr;
|
vector<vector<T>> *dupIdx2DArr;
|
||||||
priority_queue<PairArrIdIdx, vector<PairArrIdIdx>, IdxGreaterThan> minHeap;
|
priority_queue<PairArrIdIdx<T>, vector<PairArrIdIdx<T>>, IdxGreaterThan<T>> minHeap;
|
||||||
uint64_t popNum = 0;
|
uint64_t popNum = 0;
|
||||||
|
|
||||||
int Init(vector<vector<int64_t>> *_dupIdx2DArr) {
|
int Init(vector<vector<T>> *_dupIdx2DArr) {
|
||||||
dupIdx2DArr = _dupIdx2DArr;
|
dupIdx2DArr = _dupIdx2DArr;
|
||||||
if (dupIdx2DArr == nullptr) {
|
if (dupIdx2DArr == nullptr) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
@ -107,8 +109,8 @@ struct DupIdxQueue {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t Pop() {
|
T Pop() {
|
||||||
int64_t ret = -1;
|
T ret = -1;
|
||||||
if (!minHeap.empty()) {
|
if (!minHeap.empty()) {
|
||||||
auto idx = minHeap.top();
|
auto idx = minHeap.top();
|
||||||
minHeap.pop();
|
minHeap.pop();
|
||||||
|
|
@ -133,6 +135,7 @@ struct DupIdxQueue {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 用来检测optical duplication的graph
|
* 用来检测optical duplication的graph
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -65,8 +65,8 @@ static inline void sortReadEndsArr(vector<ReadEnds> &arr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/
|
/* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/
|
||||||
static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<int64_t> *dupIdx, set<int64_t> *opticalDupIdx,
|
static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<DupInfo> *dupIdx, set<int64_t> *opticalDupIdx = nullptr,
|
||||||
set<int64_t> *notDupIdx = nullptr) {
|
set<int64_t> *notDupIdx = nullptr, set<int64_t> *notOpticalDupIdx = nullptr) {
|
||||||
if (vpRe.size() < 2) {
|
if (vpRe.size() < 2) {
|
||||||
if (vpRe.size() == 1) {
|
if (vpRe.size() == 1) {
|
||||||
// addSingletonToCount(libraryIdGenerator);
|
// addSingletonToCount(libraryIdGenerator);
|
||||||
|
|
@ -78,11 +78,11 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<int64_t> *dupId
|
||||||
}
|
}
|
||||||
int maxScore = 0;
|
int maxScore = 0;
|
||||||
const ReadEnds *pBest = nullptr;
|
const ReadEnds *pBest = nullptr;
|
||||||
int maxOperateTime = 0;
|
// int maxOperateTime = 0;
|
||||||
/** All read ends should have orientation FF, FR, RF, or RR **/
|
/** All read ends should have orientation FF, FR, RF, or RR **/
|
||||||
for (auto pe : vpRe) { // 找分数最高的readend
|
for (auto pe : vpRe) { // 找分数最高的readend
|
||||||
maxOperateTime = max(maxOperateTime, pe->oprateTime);
|
// maxOperateTime = max(maxOperateTime, pe->oprateTime);
|
||||||
(const_cast<ReadEnds *>(pe))->oprateTime ++;
|
// (const_cast<ReadEnds *>(pe))->oprateTime ++;
|
||||||
if (pe->score > maxScore || pBest == nullptr) {
|
if (pe->score > maxScore || pBest == nullptr) {
|
||||||
maxScore = pe->score;
|
maxScore = pe->score;
|
||||||
pBest = pe;
|
pBest = pe;
|
||||||
|
|
@ -94,15 +94,40 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<int64_t> *dupId
|
||||||
notDupIdx->insert(pBest->read1IndexInFile);
|
notDupIdx->insert(pBest->read1IndexInFile);
|
||||||
notDupIdx->insert(pBest->read2IndexInFile);
|
notDupIdx->insert(pBest->read2IndexInFile);
|
||||||
}
|
}
|
||||||
|
//if (false) {
|
||||||
if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余
|
if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余
|
||||||
// trackOpticalDuplicates
|
// trackOpticalDuplicates
|
||||||
|
vector<const ReadEnds *> prevOpticalRe;
|
||||||
|
if (notOpticalDupIdx != nullptr) {
|
||||||
|
for (auto pe : vpRe) {
|
||||||
|
if (pe->isOpticalDuplicate) {
|
||||||
|
prevOpticalRe.push_back(pe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
trackOpticalDuplicates(vpRe, pBest);
|
trackOpticalDuplicates(vpRe, pBest);
|
||||||
|
// 由于重叠问题,可能会更新数据
|
||||||
|
if (notOpticalDupIdx != nullptr) {
|
||||||
|
for (auto pe : prevOpticalRe) {
|
||||||
|
if (!pe->isOpticalDuplicate) {
|
||||||
|
notOpticalDupIdx->insert(pe->read1IndexInFile);
|
||||||
|
notOpticalDupIdx->insert(pe->read2IndexInFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (auto pe : vpRe) { // 对非best read标记冗余
|
for (auto pe : vpRe) { // 对非best read标记冗余
|
||||||
if (pe != pBest) { // 非best
|
if (pe != pBest) { // 非best
|
||||||
dupIdx->insert(pe->read1IndexInFile); // 添加read1
|
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1
|
||||||
if (pe->read2IndexInFile != pe->read1IndexInFile)
|
if (pe->read2IndexInFile != pe->read1IndexInFile)
|
||||||
dupIdx->insert(pe->read2IndexInFile); // 添加read2
|
dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); //read2,
|
||||||
|
// 注意这里代表是read1的索引
|
||||||
|
// 检查是否optical dup
|
||||||
|
if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) {
|
||||||
|
opticalDupIdx->insert(pe->read1IndexInFile);
|
||||||
|
if (pe->read2IndexInFile != pe->read1IndexInFile)
|
||||||
|
opticalDupIdx->insert(pe->read2IndexInFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 在输出的bam文件中添加tag
|
// 在输出的bam文件中添加tag
|
||||||
|
|
@ -112,7 +137,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, set<int64_t> *dupId
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 处理一组非paired的readends,标记冗余 */
|
/* 处理一组非paired的readends,标记冗余 */
|
||||||
static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, set<int64_t> *dupIdx,
|
static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, set<DupInfo> *dupIdx,
|
||||||
set<int64_t> *notDupIdx = nullptr) {
|
set<int64_t> *notDupIdx = nullptr) {
|
||||||
if (containsPairs) {
|
if (containsPairs) {
|
||||||
for (auto pe : vpRe) {
|
for (auto pe : vpRe) {
|
||||||
|
|
@ -207,25 +232,25 @@ static void generateReadEnds(SerailMarkDupArg *arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 处理pairs */
|
/* 处理pairs */
|
||||||
static void processPairs(vector<ReadEnds> &readEnds, set<int64_t> *dupIdx, set<int64_t> *opticalDupIdx,
|
static void processPairs(vector<ReadEnds> &readEnds, set<DupInfo> *dupIdx, set<int64_t> *opticalDupIdx,
|
||||||
set<int64_t> *notDupIdx = nullptr) {
|
set<int64_t> *notDupIdx = nullptr, set<int64_t> *notOpticalDupIdx = nullptr) {
|
||||||
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
|
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
|
||||||
const ReadEnds *pReadEnd = nullptr;
|
const ReadEnds *pReadEnd = nullptr;
|
||||||
for (auto &re : readEnds) {
|
for (auto &re : readEnds) {
|
||||||
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
|
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
|
||||||
vpCache.push_back(&re); // 处理一个潜在的冗余组
|
vpCache.push_back(&re); // 处理一个潜在的冗余组
|
||||||
else {
|
else {
|
||||||
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); // 不一样
|
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); // 不一样
|
||||||
vpCache.clear();
|
vpCache.clear();
|
||||||
vpCache.push_back(&re);
|
vpCache.push_back(&re);
|
||||||
pReadEnd = &re;
|
pReadEnd = &re;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx);
|
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 处理frags */
|
/* 处理frags */
|
||||||
static void processFrags(vector<ReadEnds> &readEnds, set<int64_t> *dupIdx, set<int64_t> *notDupIdx = nullptr) {
|
static void processFrags(vector<ReadEnds> &readEnds, set<DupInfo> *dupIdx, set<int64_t> *notDupIdx = nullptr) {
|
||||||
bool containsPairs = false;
|
bool containsPairs = false;
|
||||||
bool containsFrags = false;
|
bool containsFrags = false;
|
||||||
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
|
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
|
||||||
|
|
@ -296,7 +321,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 将frags重叠部分的dup idx放进数据中 */
|
/* 将frags重叠部分的dup idx放进数据中 */
|
||||||
static inline void refreshFragDupIdx(set<int64_t> &dupIdx, set<int64_t> ¬DupIdx, SerailMarkDupArg *lastArg,
|
static inline void refreshFragDupIdx(set<DupInfo> &dupIdx, set<int64_t> ¬DupIdx, SerailMarkDupArg *lastArg,
|
||||||
SerailMarkDupArg *curArg) {
|
SerailMarkDupArg *curArg) {
|
||||||
auto &lp = *lastArg;
|
auto &lp = *lastArg;
|
||||||
auto &p = *curArg;
|
auto &p = *curArg;
|
||||||
|
|
@ -311,8 +336,8 @@ static inline void refreshFragDupIdx(set<int64_t> &dupIdx, set<int64_t> ¬DupI
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 将pairs重叠部分的dup idx放进数据中 */
|
/* 将pairs重叠部分的dup idx放进数据中 */
|
||||||
static inline void refreshPairDupIdx(set<int64_t> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> ¬DupIdx,
|
static inline void refreshPairDupIdx(set<DupInfo> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> ¬DupIdx,
|
||||||
SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) {
|
set<int64_t> ¬OpticalDupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) {
|
||||||
auto &lp = *lastArg;
|
auto &lp = *lastArg;
|
||||||
auto &p = *curArg;
|
auto &p = *curArg;
|
||||||
for (auto idx : dupIdx) {
|
for (auto idx : dupIdx) {
|
||||||
|
|
@ -325,31 +350,38 @@ static inline void refreshPairDupIdx(set<int64_t> &dupIdx, set<int64_t> &optical
|
||||||
}
|
}
|
||||||
for (auto idx : notDupIdx) {
|
for (auto idx : notDupIdx) {
|
||||||
lp.pairDupIdx.erase(idx);
|
lp.pairDupIdx.erase(idx);
|
||||||
lp.pairOpticalDupIdx.erase(idx);
|
//lp.pairOpticalDupIdx.erase(idx);
|
||||||
p.pairDupIdx.erase(idx);
|
p.pairDupIdx.erase(idx);
|
||||||
|
//p.pairOpticalDupIdx.erase(idx);
|
||||||
|
}
|
||||||
|
for (auto idx : notOpticalDupIdx) {
|
||||||
|
lp.pairOpticalDupIdx.erase(idx);
|
||||||
p.pairOpticalDupIdx.erase(idx);
|
p.pairOpticalDupIdx.erase(idx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 用来分别处理dup和optical dup
|
// 用来分别处理dup和optical dup
|
||||||
static void refeshTaskDupInfo(set<int64_t> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> ¬DupIdx,
|
static void refeshTaskDupInfo(set<DupInfo> &dupIdx, set<int64_t> &opticalDupIdx, set<int64_t> ¬DupIdx,
|
||||||
set<int64_t> &latterDupIdx, set<int64_t> &latterOpticalDupIdx,
|
set<int64_t> ¬OpticalDupIdx, set<DupInfo> &latterDupIdx,
|
||||||
set<int64_t> &latterNotDupIdx) {
|
set<int64_t> &latterOpticalDupIdx, set<int64_t> &latterNotDupIdx,
|
||||||
|
set<int64_t> &latterNotOpticalDupIdx) {
|
||||||
for (auto idx : dupIdx) latterDupIdx.insert(idx);
|
for (auto idx : dupIdx) latterDupIdx.insert(idx);
|
||||||
for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx);
|
for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx);
|
||||||
for (auto idx : notDupIdx) latterNotDupIdx.insert(idx);
|
for (auto idx : notDupIdx) latterNotDupIdx.insert(idx);
|
||||||
|
for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 最后合并数据并排序 */
|
/* 最后合并数据并排序 */
|
||||||
static void refeshFinalTaskDupInfo(set<int64_t> &dupIdx, set<int64_t> ¬DupIdx, vector<int64_t> &dupArr) {
|
template<typename T>
|
||||||
vector<int64_t> midArr;
|
static void refeshFinalTaskDupInfo(set<T> &dupIdx, set<int64_t> ¬DupIdx, vector<T> &dupArr) {
|
||||||
|
vector<T> midArr;
|
||||||
|
|
||||||
auto ai = dupArr.begin();
|
auto ai = dupArr.begin();
|
||||||
auto bi = dupIdx.begin();
|
auto bi = dupIdx.begin();
|
||||||
auto ae = dupArr.end();
|
auto ae = dupArr.end();
|
||||||
auto be = dupIdx.end();
|
auto be = dupIdx.end();
|
||||||
|
|
||||||
int64_t val = 0;
|
T val = 0;
|
||||||
while (ai != ae && bi != be) {
|
while (ai != ae && bi != be) {
|
||||||
if (*ai < *bi) {
|
if (*ai < *bi) {
|
||||||
val = *ai++;
|
val = *ai++;
|
||||||
|
|
@ -385,7 +417,8 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
|
||||||
auto &g = *gDataArg;
|
auto &g = *gDataArg;
|
||||||
|
|
||||||
vector<ReadEnds> reArr;
|
vector<ReadEnds> reArr;
|
||||||
set<int64_t> dupIdx;
|
set<DupInfo> dupIdx;
|
||||||
|
set<int64_t> notOpticalDupIdx;
|
||||||
set<int64_t> notDupIdx;
|
set<int64_t> notDupIdx;
|
||||||
// 先处理重叠的frags
|
// 先处理重叠的frags
|
||||||
getIntersectData(lp.frags, p.frags, &reArr);
|
getIntersectData(lp.frags, p.frags, &reArr);
|
||||||
|
|
@ -398,8 +431,8 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
|
||||||
notDupIdx.clear();
|
notDupIdx.clear();
|
||||||
set<int64_t> opticalDupIdx;
|
set<int64_t> opticalDupIdx;
|
||||||
getIntersectData(lp.pairs, p.pairs, &reArr, true);
|
getIntersectData(lp.pairs, p.pairs, &reArr, true);
|
||||||
processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx);
|
processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx, ¬OpticalDupIdx);
|
||||||
refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p);
|
refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx, &lp, &p);
|
||||||
|
|
||||||
// 处理之前未匹配的部分
|
// 处理之前未匹配的部分
|
||||||
map<CalcKey, int64_t> recalcPos;
|
map<CalcKey, int64_t> recalcPos;
|
||||||
|
|
@ -550,7 +583,7 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
|
||||||
pairArrP = &g.unpairedPosArr[posKey].pairArr;
|
pairArrP = &g.unpairedPosArr[posKey].pairArr;
|
||||||
else
|
else
|
||||||
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
|
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
|
||||||
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx);
|
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx, &t.notOpticalDupIdx);
|
||||||
if (taskSeq < lp.taskSeq)
|
if (taskSeq < lp.taskSeq)
|
||||||
g.unpairedPosArr.erase(posKey);
|
g.unpairedPosArr.erase(posKey);
|
||||||
}
|
}
|
||||||
|
|
@ -564,23 +597,24 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur
|
||||||
auto taskSeq = e.first;
|
auto taskSeq = e.first;
|
||||||
auto &t = e.second;
|
auto &t = e.second;
|
||||||
if (taskSeq < lp.taskSeq) {
|
if (taskSeq < lp.taskSeq) {
|
||||||
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, g.latterDupIdxArr[taskSeq],
|
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq],
|
||||||
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]);
|
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]);
|
||||||
} else if (taskSeq == lp.taskSeq) {
|
} else if (taskSeq == lp.taskSeq) {
|
||||||
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &lp, &p);
|
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &lp, &p);
|
||||||
} else {
|
} else {
|
||||||
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &p, &lp); // 把结果放到p中
|
refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &p, &lp); // 把结果放到p中
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' <<
|
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' <<
|
||||||
// g.unpairedPosArr.size() << endl; cout << "calc g time: " <<
|
// g.unpairedPosArr.size() << endl; cout << "calc g time: " <<
|
||||||
// t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据
|
// t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据
|
||||||
g.latterDupIdxArr.push_back(set<int64_t>());
|
g.latterDupIdxArr.push_back(set<DupInfo>());
|
||||||
g.latterOpticalDupIdxArr.push_back(set<int64_t>());
|
g.latterOpticalDupIdxArr.push_back(set<int64_t>());
|
||||||
g.latterNotDupIdxArr.push_back(set<int64_t>());
|
g.latterNotDupIdxArr.push_back(set<int64_t>());
|
||||||
|
g.latterNotOpticalDupIdxArr.push_back(set<int64_t>());
|
||||||
|
|
||||||
g.dupIdxArr.push_back(vector<int64_t>());
|
g.dupIdxArr.push_back(vector<DupInfo>());
|
||||||
auto &vIdx = g.dupIdxArr.back();
|
auto &vIdx = g.dupIdxArr.back();
|
||||||
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
|
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
|
||||||
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
|
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
|
||||||
|
|
@ -629,8 +663,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
|
||||||
for (auto &e : taskChanged) {
|
for (auto &e : taskChanged) {
|
||||||
auto taskSeq = e.first;
|
auto taskSeq = e.first;
|
||||||
auto &t = e.second;
|
auto &t = e.second;
|
||||||
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, g.latterDupIdxArr[taskSeq],
|
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq],
|
||||||
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]);
|
g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]);
|
||||||
}
|
}
|
||||||
|
|
||||||
cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
|
cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl;
|
||||||
|
|
@ -641,9 +675,9 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) {
|
||||||
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i)
|
for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i)
|
||||||
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]);
|
refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]);
|
||||||
for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i)
|
for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i)
|
||||||
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotDupIdxArr[i], g.opticalDupIdxArr[i]);
|
refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i]);
|
||||||
|
|
||||||
g.dupIdxArr.push_back(vector<int64_t>());
|
g.dupIdxArr.push_back(vector<DupInfo>());
|
||||||
auto &vIdx = g.dupIdxArr.back();
|
auto &vIdx = g.dupIdxArr.back();
|
||||||
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
|
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
|
||||||
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
|
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
|
||||||
|
|
@ -734,19 +768,21 @@ void serialMarkDups() {
|
||||||
tm_arr[5].acc_end();
|
tm_arr[5].acc_end();
|
||||||
// 统计所有冗余index数量
|
// 统计所有冗余index数量
|
||||||
int64_t dupNum = 0;
|
int64_t dupNum = 0;
|
||||||
map<int64_t, int> dup;
|
int64_t opticalDupNum = 0;
|
||||||
|
|
||||||
|
map<int64_t, int> dup;
|
||||||
|
// int taskSeq = 0;
|
||||||
|
// for (auto &arr : gData.dupIdxArr) {
|
||||||
|
// for (auto idx : arr) {
|
||||||
|
// if (dup.find(idx.idx) != dup.end()) {
|
||||||
|
// // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t'
|
||||||
|
// // << idx << endl;
|
||||||
|
// }
|
||||||
|
// dup[idx.idx] = taskSeq;
|
||||||
|
// }
|
||||||
|
// taskSeq++;
|
||||||
|
// }
|
||||||
|
|
||||||
int taskSeq = 0;
|
|
||||||
for (auto &arr : gData.dupIdxArr) {
|
|
||||||
for (auto idx : arr) {
|
|
||||||
if (dup.find(idx) != dup.end()) {
|
|
||||||
// cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t'
|
|
||||||
// << idx << endl;
|
|
||||||
}
|
|
||||||
dup[idx] = taskSeq;
|
|
||||||
}
|
|
||||||
taskSeq++;
|
|
||||||
}
|
|
||||||
// #include <fstream>
|
// #include <fstream>
|
||||||
// ofstream out("tumor_dup.txt");
|
// ofstream out("tumor_dup.txt");
|
||||||
// for (auto idx : dup)
|
// for (auto idx : dup)
|
||||||
|
|
@ -756,6 +792,7 @@ void serialMarkDups() {
|
||||||
// out.close();
|
// out.close();
|
||||||
|
|
||||||
for (auto &arr : gData.dupIdxArr) dupNum += arr.size();
|
for (auto &arr : gData.dupIdxArr) dupNum += arr.size();
|
||||||
|
for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size();
|
||||||
|
|
||||||
cout << "dup num : " << dupNum << '\t' << dup.size() << endl;
|
cout << "dup num : " << dupNum << '\t' << dup.size() << endl;
|
||||||
|
|
||||||
|
|
@ -774,6 +811,7 @@ void serialMarkDups() {
|
||||||
<< gMetrics.OpticalDuplicatesCountHist << "\t" << gMetrics.OpticalDuplicatesByLibraryId << endl;
|
<< gMetrics.OpticalDuplicatesCountHist << "\t" << gMetrics.OpticalDuplicatesByLibraryId << endl;
|
||||||
cout << "optical dup: " << zzhopticalSet.size() << endl;
|
cout << "optical dup: " << zzhopticalSet.size() << endl;
|
||||||
cout << "optical arr dup: " << zzhopticalArr.size() << endl;
|
cout << "optical arr dup: " << zzhopticalArr.size() << endl;
|
||||||
|
cout << "optical size: " << opticalDupNum << endl;
|
||||||
|
|
||||||
Timer::log_time("serial end ");
|
Timer::log_time("serial end ");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,11 +30,31 @@ struct CalcKey {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* 用来记录冗余索引相关的信息 */
|
||||||
|
struct DupInfo {
|
||||||
|
int64_t idx;
|
||||||
|
int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引
|
||||||
|
int16_t dupSet = 0; // dup set size
|
||||||
|
|
||||||
|
DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) { }
|
||||||
|
DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {}
|
||||||
|
bool operator<(const DupInfo &o) const {
|
||||||
|
return idx < o.idx;
|
||||||
|
}
|
||||||
|
bool operator>(const DupInfo &o) const {
|
||||||
|
return idx > o.idx;
|
||||||
|
}
|
||||||
|
operator int64_t() const {
|
||||||
|
return idx;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/* 当遗留数据在当前任务找到了pair read后,进行冗余计算时候存放结果的数据结构 */
|
/* 当遗留数据在当前任务找到了pair read后,进行冗余计算时候存放结果的数据结构 */
|
||||||
struct TaskSeqDupInfo {
|
struct TaskSeqDupInfo {
|
||||||
set<int64_t> dupIdx;
|
set<DupInfo> dupIdx;
|
||||||
set<int64_t> opticalDupIdx;
|
set<int64_t> opticalDupIdx;
|
||||||
set<int64_t> notDupIdx;
|
set<int64_t> notDupIdx;
|
||||||
|
set<int64_t> notOpticalDupIdx;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */
|
/* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */
|
||||||
|
|
@ -57,9 +77,9 @@ struct SerailMarkDupArg {
|
||||||
vector<BamWrap *> bams; // 存放待处理的bam read
|
vector<BamWrap *> bams; // 存放待处理的bam read
|
||||||
vector<ReadEnds> pairs; // 成对的reads
|
vector<ReadEnds> pairs; // 成对的reads
|
||||||
vector<ReadEnds> frags; // 暂未找到配对的reads
|
vector<ReadEnds> frags; // 暂未找到配对的reads
|
||||||
set<int64_t> pairDupIdx; // pair的冗余read的索引
|
set<DupInfo> pairDupIdx; // pair的冗余read的索引
|
||||||
set<int64_t> pairOpticalDupIdx; // optical冗余read的索引
|
set<int64_t> pairOpticalDupIdx; // optical冗余read的索引
|
||||||
set<int64_t> fragDupIdx; // frag的冗余read的索引
|
set<DupInfo> fragDupIdx; // frag的冗余read的索引
|
||||||
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
||||||
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储
|
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储
|
||||||
};
|
};
|
||||||
|
|
@ -70,13 +90,14 @@ struct GlobalDataArg {
|
||||||
UnpairedPositionMap unpairedPosArr;
|
UnpairedPositionMap unpairedPosArr;
|
||||||
|
|
||||||
// 每个task对应一个vector
|
// 每个task对应一个vector
|
||||||
vector<vector<int64_t>> dupIdxArr;
|
vector<vector<DupInfo>> dupIdxArr;
|
||||||
vector<vector<int64_t>> opticalDupIdxArr;
|
vector<vector<int64_t>> opticalDupIdxArr;
|
||||||
|
|
||||||
// 用来存放后续计算的数据
|
// 用来存放后续计算的数据
|
||||||
vector<set<int64_t>> latterDupIdxArr;
|
vector<set<DupInfo>> latterDupIdxArr;
|
||||||
vector<set<int64_t>> latterOpticalDupIdxArr;
|
vector<set<int64_t>> latterOpticalDupIdxArr;
|
||||||
vector<set<int64_t>> latterNotDupIdxArr;
|
vector<set<int64_t>> latterNotDupIdxArr;
|
||||||
|
vector<set<int64_t>> latterNotOpticalDupIdxArr;
|
||||||
};
|
};
|
||||||
|
|
||||||
// 串行运行mark duplicate
|
// 串行运行mark duplicate
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue