并行实现的初步版本,结果一致

This commit is contained in:
zzh 2024-11-14 23:16:17 +08:00
parent fb1bd812ce
commit 9688710573
7 changed files with 314 additions and 94 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
# for fast-markdup # for fast-markdup
*.bai
core.* core.*
*.sam *.sam
*.bam *.bam

10
run.sh
View File

@ -1,3 +1,11 @@
#nthread=1
#nthread=2
#nthread=4
#nthread=8
#nthread=16
nthread=32
#nthread=64
#nthread=128
#input=~/data/bam/zy_normal.bam #input=~/data/bam/zy_normal.bam
input=~/data/bam/zy_tumor.bam input=~/data/bam/zy_tumor.bam
#input=~/data/bam/100w.bam #input=~/data/bam/100w.bam
@ -14,7 +22,7 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \
--INPUT $input \ --INPUT $input \
--OUTPUT ./out.bam \ --OUTPUT ./out.bam \
--INDEX_FORMAT BAI \ --INDEX_FORMAT BAI \
--num_threads 1 \ --num_threads $nthread \
--max_mem 2G \ --max_mem 2G \
--verbosity DEBUG \ --verbosity DEBUG \
--asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null # --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null #

View File

@ -9,6 +9,8 @@ AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/common/hts HTS)
AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam SAM_SRC) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam SAM_SRC)
AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam/markdups SAM_MARKDUPS_SRC) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam/markdups SAM_MARKDUPS_SRC)
set(KTHREAD_FILE ${PROJECT_SOURCE_DIR}/lib/klib/kthread.c)
# #
INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/src") INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/src")
INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/lib") INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/lib")
@ -24,7 +26,7 @@ set(PG_NAME "picard_cpp")
# #
ADD_EXECUTABLE(${PG_NAME} ${MAIN_SRC} ${COMMON} ${UTILS} ${HTS} ADD_EXECUTABLE(${PG_NAME} ${MAIN_SRC} ${COMMON} ${UTILS} ${HTS}
${SAM_SRC} ${SAM_MARKDUPS_SRC}) ${SAM_SRC} ${SAM_MARKDUPS_SRC} ${KTHREAD_FILE})
# #
TARGET_LINK_LIBRARIES(${PG_NAME} libhts.a) TARGET_LINK_LIBRARIES(${PG_NAME} libhts.a)

View File

@ -177,7 +177,8 @@ int MarkDuplicates(int argc, char *argv[]) {
/* 冗余检查和标记 */ /* 冗余检查和标记 */
if (g_gArg.num_threads == 1) { if (g_gArg.num_threads == 1) {
serialMarkDups(); // 串行运行 // serialMarkDups(); // 串行运行
parallelMarkDups(); // 并行运行
} else { } else {
parallelMarkDups(); // 并行运行 parallelMarkDups(); // 并行运行
} }
@ -242,7 +243,7 @@ int MarkDuplicates(int argc, char *argv[]) {
uint32_t duplicateSetSize = 0; uint32_t duplicateSetSize = 0;
int64_t realDupSize = 0; int64_t realDupSize = 0;
// exit(0); exit(0);
while (inBuf.ReadStat() >= 0) { while (inBuf.ReadStat() >= 0) {
Timer tw1; Timer tw1;
size_t readNum = inBuf.ReadBam(); size_t readNum = inBuf.ReadBam();
@ -277,7 +278,8 @@ int MarkDuplicates(int argc, char *argv[]) {
/* 判断是否冗余 */ /* 判断是否冗余 */
if (bamIdx == dupIdx) { if (bamIdx == dupIdx) {
++realDupSize; // for test // cerr << bamIdx << endl;
++realDupSize; // for test
isDup = true; isDup = true;
if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) {
isInDuplicateSet = true; isInDuplicateSet = true;

View File

@ -5,11 +5,14 @@
#include <robin-map/include/tsl/robin_set.h> #include <robin-map/include/tsl/robin_set.h>
#include <sam/utils/read_ends.h> #include <sam/utils/read_ends.h>
#include <queue>
#include <set> #include <set>
#include <string> #include <string>
#include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
using std::priority_queue;
using std::set; using std::set;
using std::string; using std::string;
using std::unordered_set; using std::unordered_set;
@ -127,6 +130,24 @@ struct MarkDupDataArg {
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储 UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
}; };
/* 并行计算相关参数 */
struct ParallelData {
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
};
struct ParallelDataArg {
vector<ParallelData> threadData;
vector<vector<ReadEnds>*> pairs2d; // 成对的reads
vector<vector<ReadEnds>*> frags2d; // 暂未找到配对的reads
MarkDupDataArg *mdArg;
void Init(int nThread) {
for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组
threadData.push_back(ParallelData());
}
}
};
/* 全局保留的数据因为有些paired数据比对到了不同的染色体相距甚远 */ /* 全局保留的数据因为有些paired数据比对到了不同的染色体相距甚远 */
struct GlobalDataArg { struct GlobalDataArg {
UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedNameMap unpairedDic; // 用来寻找pair end
@ -147,4 +168,59 @@ struct GlobalDataArg {
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr; vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
vector<MDSet<int64_t>> latterNotRepIdxArr; vector<MDSet<int64_t>> latterNotRepIdxArr;
vector<MDSet<int64_t>> latterNotSingletonIdxArr; vector<MDSet<int64_t>> latterNotSingletonIdxArr;
};
struct ReadEndsArrIdIdx {
int arrId = 0;
uint64_t arrIdx = 0;
const ReadEnds *pE = nullptr;
};
struct ReadEndsGreaterThan {
bool operator()(const ReadEndsArrIdIdx &a, const ReadEndsArrIdIdx &b) { return *b.pE < *a.pE; }
};
struct ReadEndsQueue {
// 将冗余索引和他对应的task vector对应起来
vector<vector<ReadEnds>*> *arr2d;
priority_queue<ReadEndsArrIdIdx, vector<ReadEndsArrIdIdx>, ReadEndsGreaterThan> minHeap;
uint64_t popNum = 0;
int Init(vector<vector<ReadEnds>*> *_arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto v = (*arr2d)[i];
if (!v->empty()) {
minHeap.push({i, 1, &(*v)[0]});
}
}
return 0;
}
const ReadEnds *Pop() {
const ReadEnds *ret = nullptr;
if (!minHeap.empty()) {
auto idx = minHeap.top();
minHeap.pop();
++popNum;
ret = idx.pE;
auto v = (*arr2d)[idx.arrId];
if (v->size() > idx.arrIdx) {
minHeap.push({idx.arrId, idx.arrIdx + 1, &(*v)[idx.arrIdx]});
}
}
return ret;
}
uint64_t Size() {
uint64_t len = 0;
if (arr2d != nullptr) {
for (auto v : *arr2d) {
len += v->size();
}
}
return len - popNum;
}
}; };

View File

@ -5,6 +5,7 @@
#include <common/utils/timer.h> #include <common/utils/timer.h>
#include <common/utils/util.h> #include <common/utils/util.h>
#include <sam/utils/read_ends.h> #include <sam/utils/read_ends.h>
#include <klib/kthread.h>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
@ -71,15 +72,19 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
// 这个统计可能会有误差因为当前位点可能还有没匹配上的read导致当前位点的readpaired数量为1 // 这个统计可能会有误差因为当前位点可能还有没匹配上的read导致当前位点的readpaired数量为1
// 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要 // 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要
// gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile); // gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile);
// singletonIdx->insert(vpRe[0]->read1IndexInFile); singletonIdx->insert(vpRe[0]->read1IndexInFile);
} }
return; return;
} }
// if (notSingletonIdx != nullptr) { for (auto pe : vpRe) {
// for (auto pe : vpRe) { // gMetrics.singletonReads.erase(pe->read1IndexInFile);
// notSingletonIdx->insert(pe->read1IndexInFile); }
// }
// } if (notSingletonIdx != nullptr) {
for (auto pe : vpRe) {
notSingletonIdx->insert(pe->read1IndexInFile);
}
}
int maxScore = 0; int maxScore = 0;
const ReadEnds *pBest = nullptr; const ReadEnds *pBest = nullptr;
@ -139,6 +144,8 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
} }
for (auto pe : vpRe) { // 对非best read标记冗余 for (auto pe : vpRe) { // 对非best read标记冗余
if (pe != pBest) { // 非best if (pe != pBest) { // 非best
// gMetrics.dupReads.insert(pe->read1IndexInFile);
// gMetrics.dupReads.insert(pe->read2IndexInFile);
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1
if (pe->read2IndexInFile != pe->read1IndexInFile) if (pe->read2IndexInFile != pe->read1IndexInFile)
dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2,
@ -149,6 +156,9 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
if (pe->read2IndexInFile != pe->read1IndexInFile) if (pe->read2IndexInFile != pe->read1IndexInFile)
opticalDupIdx->insert(pe->read2IndexInFile); opticalDupIdx->insert(pe->read2IndexInFile);
} }
} else {
// gMetrics.dupReads.erase(pe->read1IndexInFile); // for test
// gMetrics.dupReads.erase(pe->read2IndexInFile);
} }
} }
// 在输出的bam文件中添加tag, best作为dupset的代表 // 在输出的bam文件中添加tag, best作为dupset的代表
@ -174,6 +184,7 @@ static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs,
for (auto pe : vpRe) { for (auto pe : vpRe) {
if (!pe->IsPaired()) { if (!pe->IsPaired()) {
dupIdx->insert(pe->read1IndexInFile); dupIdx->insert(pe->read1IndexInFile);
// gMetrics.dupReads.insert(pe->read1IndexInFile);
} }
} }
} else { } else {
@ -191,6 +202,9 @@ static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs,
for (auto pe : vpRe) { for (auto pe : vpRe) {
if (pe != pBest) { if (pe != pBest) {
dupIdx->insert(pe->read1IndexInFile); dupIdx->insert(pe->read1IndexInFile);
// gMetrics.dupReads.insert(pe->read1IndexInFile);
} else {
// gMetrics.dupReads.erase(pe->read1IndexInFile);
} }
} }
} }
@ -202,39 +216,37 @@ static void getEqualRE(const ReadEnds &re, vector<ReadEnds> &src, vector<ReadEnd
dst->insert(dst->end(), range.first, range.second); dst->insert(dst->end(), range.first, range.second);
} }
/* 单线程生成readends (第一步)*/ #define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread))
static void generateReadEnds(MarkDupDataArg *arg) { #define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread))
auto &p = *arg;
auto &rnParser = g_vRnParser[0];
static void threadGenerateReadEnds(void *data, long idx, int tid) {
auto &p = (*(ParallelDataArg *)data).threadData[idx];
auto &mdArg = *(*(ParallelDataArg *)data).mdArg;
auto &rnParser = g_vRnParser[idx];
int nThread = g_gArg.num_threads;
p.pairs.clear(); p.pairs.clear();
p.frags.clear(); p.frags.clear();
p.unpairedDic.clear(); p.unpairedDic.clear();
p.unpairedPosArr.clear(); size_t start_id = LOWER_BOUND(idx, nThread, mdArg.bams.size());
size_t end_id = UPPER_BOUND(idx, nThread, mdArg.bams.size());
// if (mdArg.taskSeq == 163)
// cout << "tid: " << tid << "\t" << idx << "\t" << start_id << "\t" << end_id << "\t" << mdArg.bams.size() << endl;
// bam_num1 += p.bams.size(); for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read
/* 处理每个read创建ReadEnd并放入frag和pair中 */ BamWrap *bw = mdArg.bams[i];
// MDSet<ReadEnds> reSet; const int64_t bamIdx = mdArg.bamStartIdx + i;
// ReadEnds lastRe;
for (size_t i = 0; i < p.bams.size(); ++i) { // 循环处理每个read
BamWrap *bw = p.bams[i];
// ++bam_num1;
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag()) { if (bw->GetReadUnmappedFlag()) {
if (bw->b->core.tid == -1) if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break; break;
} else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对
ReadEnds fragEnd; ReadEnds fragEnd;
tm_arr[8].acc_start();
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
tm_arr[8].acc_end();
p.frags.push_back(fragEnd); // 添加进frag集合 p.frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了
string key = bw->query_name(); string key = bw->query_name();
if (p.unpairedDic.find(key) == p.unpairedDic.end()) { if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {p.taskSeq, fragEnd}; p.unpairedDic[key] = {mdArg.taskSeq, fragEnd};
} else { // 找到了pairend } else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds); modifyPairedEnds(fragEnd, &pairedEnds);
@ -244,14 +256,92 @@ static void generateReadEnds(MarkDupDataArg *arg) {
} }
} }
} }
tm_arr[9].acc_start(); // sortReadEndsArr(p.frags);
sortReadEndsArr(p.frags); sort(p.frags.begin(), p.frags.end());
// sort(p.frags.begin(), p.frags.end());
tm_arr[9].acc_end();
// cout << "sort pairs" << endl;
tm_arr[10].acc_start();
sort(p.pairs.begin(), p.pairs.end()); sort(p.pairs.begin(), p.pairs.end());
tm_arr[10].acc_end(); // cout << tid << "\tthread end" << endl;
// if (mdArg.taskSeq == 163)
// cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl;
}
/* 单线程生成readends (第一步)*/
static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
auto &p = *arg;
auto &rnParser = g_vRnParser[0];
p.pairs.clear();
p.frags.clear();
p.unpairedDic.clear();
p.unpairedPosArr.clear();
// cout << "read num: " << p.bams.size() << endl;
pdArg.mdArg = arg;
tm_arr[8].acc_start();
kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads);
tm_arr[8].acc_end();
// 合并各个线程数据
// 查找未匹配的frags
vector<ReadEnds> &pairs = pdArg.threadData[g_gArg.num_threads].pairs;
pairs.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) {
for (auto &val : pdArg.threadData[i].unpairedDic) {
const string &key = val.first;
const ReadEnds &fragEnd = val.second.unpairedRE;
if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {p.taskSeq, fragEnd};
} else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
pairs.push_back(pairedEnds);
p.unpairedDic.erase(key); // 删除找到的pairend
}
}
}
sort(pairs.begin(), pairs.end());
// 合并所有frags和pairs
pdArg.frags2d.clear();
pdArg.pairs2d.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) {
pdArg.frags2d.push_back(&pdArg.threadData[i].frags);
pdArg.pairs2d.push_back(&pdArg.threadData[i].pairs);
}
if (false) {
cout << "taskseq: " << p.taskSeq << endl;
for (int i = 0; i < g_gArg.num_threads; ++i) {
cout << "thread pair num: " << pdArg.threadData[i].pairs.size() << endl;
cout << "thread frag num: " << pdArg.threadData[i].frags.size() << endl;
cout << "thread dic num: " << pdArg.threadData[i].unpairedDic.size() << endl;
}
}
pdArg.pairs2d.push_back(&pdArg.threadData[g_gArg.num_threads].pairs);
ReadEndsQueue fragsQue, pairsQue;
fragsQue.Init(&pdArg.frags2d);
pairsQue.Init(&pdArg.pairs2d);
tm_arr[9].acc_start();
const ReadEnds *pRE;
while((pRE = fragsQue.Pop()) != nullptr) {
p.frags.push_back(*pRE);
//cout << pRE->posKey << endl;
}
tm_arr[9].acc_end();
tm_arr[10].acc_start();
while ((pRE = pairsQue.Pop()) != nullptr) {
p.pairs.push_back(*pRE);
//cout << pRE->posKey << endl;
}
// 记录位点上的未匹配的read个数 // 记录位点上的未匹配的read个数
for (auto &e : p.unpairedDic) { for (auto &e : p.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey; auto posKey = e.second.unpairedRE.posKey;
@ -260,13 +350,11 @@ static void generateReadEnds(MarkDupDataArg *arg) {
unpairArrInfo.taskSeq = p.taskSeq; unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first); unpairArrInfo.readNameSet.insert(e.first);
} }
if (p.taskSeq == 98) { tm_arr[10].acc_end();
for (auto &re : p.unpairedPosArr[14293757783047].pairArr) { // cout << "frags num: " << p.frags.size() << endl;
cout << "task 99 unpair reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl; // cout << "pairs num: " << p.pairs.size() << endl;
} // cout << "dic num : " << p.unpairedDic.size() << endl;
} // cout << "generate end" << endl;
// cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() <<
// endl;
} }
/* 处理pairs */ /* 处理pairs */
@ -404,13 +492,14 @@ static inline void refreshPairDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opt
lp.pairSingletonIdx.insert(idx); lp.pairSingletonIdx.insert(idx);
p.pairSingletonIdx.erase(idx); p.pairSingletonIdx.erase(idx);
} }
// for (auto idx : notDupIdx) { for (auto idx : notDupIdx) {
// if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl;
// if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl;
// if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl;
// if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl;
// lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx); lp.pairDupIdx.erase(idx);
// } p.pairDupIdx.erase(idx);
}
for (auto idx : notOpticalDupIdx) { for (auto idx : notOpticalDupIdx) {
lp.pairOpticalDupIdx.erase(idx); lp.pairOpticalDupIdx.erase(idx);
p.pairOpticalDupIdx.erase(idx); p.pairOpticalDupIdx.erase(idx);
@ -491,6 +580,9 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> &notDupI
dupArr = midArr; dupArr = midArr;
} }
static int64_t llp_frag_end_pos = 0;
static int64_t llp_pair_end_pos = 0;
static int64_t gSingletonNum = 0;
/* 处理相邻的两个任务,有相交叉的数据 */ /* 处理相邻的两个任务,有相交叉的数据 */
static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) { static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) {
auto &lp = *lastArg; auto &lp = *lastArg;
@ -510,7 +602,25 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg,
getIntersectData(lp.frags, p.frags, &reArr); getIntersectData(lp.frags, p.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx); processFrags(reArr, &dupIdx, &notDupIdx);
refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p); refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p);
// cout << "578" << endl;
if (!p.frags.empty())
if (llp_frag_end_pos >= p.frags.begin()->posKey) {
cout << "frags : " << llp_frag_end_pos << "\t" << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey
<< endl;
}
if (!p.pairs.empty())
if (llp_pair_end_pos >= p.pairs.begin()->posKey) {
cout << "pairs : " << llp_pair_end_pos << "\t" << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey
<< endl;
}
if (!p.frags.empty())
llp_frag_end_pos = lp.frags.rbegin()->posKey;
if (!p.pairs.empty())
llp_pair_end_pos = lp.pairs.rbegin()->posKey;
// cout << "frags lp: " << lp.frags[0].posKey << "\t" << lp.frags.rbegin()->posKey << endl;
// cout << "frags p : " << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey << endl;
// cout << "pairs lp: " << lp.pairs[0].posKey << "\t" << lp.pairs.rbegin()->posKey << endl;
// cout << "pairs p : " << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey << endl;
// 再处理重叠的pairs // 再处理重叠的pairs
reArr.clear(); reArr.clear();
dupIdx.clear(); dupIdx.clear();
@ -518,9 +628,9 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg,
getIntersectData(lp.pairs, p.pairs, &reArr, true); getIntersectData(lp.pairs, p.pairs, &reArr, true);
processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx, processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx,
&notSingletonIdx); &notSingletonIdx);
refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, singletonIdx, refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx,
&lp, &p); notSingletonIdx, &lp, &p);
// cout << "606" << endl;
// cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl; // cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl;
// cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl; // cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl;
// 处理之前未匹配的部分 // 处理之前未匹配的部分
@ -683,7 +793,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg,
// } // }
} }
} }
// cout << "769" << endl;
map<int64_t, TaskSeqDupInfo> taskChanged; map<int64_t, TaskSeqDupInfo> taskChanged;
MDSet<int64_t> posProcessed; MDSet<int64_t> posProcessed;
for (auto &e : recalcPos) { for (auto &e : recalcPos) {
@ -746,7 +856,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg,
t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中 t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中
} }
} }
// cout << "832" << endl;
// cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' <<
// g.unpairedPosArr.size() << endl; cout << "calc g time: " << // g.unpairedPosArr.size() << endl; cout << "calc g time: " <<
// t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据 // t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据
@ -781,6 +891,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg,
auto &vSingletonIdx = g.singletonIdxArr.back(); auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
gSingletonNum += lp.pairSingletonIdx.size();
} }
/* 当所有任务结束后global data里还有未处理的数据 */ /* 当所有任务结束后global data里还有未处理的数据 */
@ -821,7 +932,8 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
} else if (arr.size() == 1) { } else if (arr.size() == 1) {
// gMetrics.singletonReads.insert(arr[0].read1IndexInFile); // gMetrics.singletonReads.insert(arr[0].read1IndexInFile);
t.singletonIdx.insert(arr[0].read1IndexInFile);
} }
} }
// 更新结果 // 更新结果
@ -909,6 +1021,9 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
intCacheDupIdx, intMidArr); intCacheDupIdx, intMidArr);
for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr);
for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i)
refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i],
intCacheDupIdx, intMidArr);
g.dupIdxArr.push_back(vector<DupInfo>()); g.dupIdxArr.push_back(vector<DupInfo>());
auto &vIdx = g.dupIdxArr.back(); auto &vIdx = g.dupIdxArr.back();
@ -938,18 +1053,7 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
auto &vSingletonIdx = g.singletonIdxArr.back(); auto &vSingletonIdx = g.singletonIdxArr.back();
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
} gSingletonNum += lp.pairSingletonIdx.size();
static void calculateMetrics(MarkDupDataArg &lp, MarkDupDataArg &p, GlobalDataArg &g, DuplicationMetrics *pgMetrics) {
DuplicationMetrics &gMetrics = *pgMetrics;
// gMetrics.READ_PAIRS_EXAMINED /= 2;
cout << "calculateMetrics start: " << endl;
cout << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size() << endl;
cout << "all: " << (lp.unpairedDic.size() + p.unpairedDic.size() + g.unpairedDic.size()) << endl;
cout << "calculateMetrics end" << endl;
} }
/* 串行处理数据,标记冗余 */ /* 串行处理数据,标记冗余 */
@ -966,6 +1070,8 @@ void parallelMarkDups() {
MarkDupDataArg smdArg1, smdArg2; MarkDupDataArg smdArg1, smdArg2;
MarkDupDataArg *lastArgP = &smdArg1; MarkDupDataArg *lastArgP = &smdArg1;
MarkDupDataArg *curArgP = &smdArg2; MarkDupDataArg *curArgP = &smdArg2;
ParallelDataArg pdArg;
pdArg.Init(g_gArg.num_threads);
bool isFirstRound = true; bool isFirstRound = true;
int roundNum = 0; int roundNum = 0;
@ -975,6 +1081,8 @@ void parallelMarkDups() {
// 读取bam文件中的read // 读取bam文件中的read
tm_arr[4].acc_start(); tm_arr[4].acc_start();
size_t readNum = inBamBuf.ReadBam(); size_t readNum = inBamBuf.ReadBam();
if (readNum < 1)
break;
readNumSum += readNum; readNumSum += readNum;
// readNumSum += inBamBuf.GetBamArr().size(); // readNumSum += inBamBuf.GetBamArr().size();
tm_arr[4].acc_end(); tm_arr[4].acc_end();
@ -988,23 +1096,21 @@ void parallelMarkDups() {
tm_arr[0].acc_start(); tm_arr[0].acc_start();
Timer t1; Timer t1;
generateReadEnds(curArgP); generateReadEnds(pdArg, curArgP);
// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl;
// endl;
tm_arr[0].acc_end(); tm_arr[0].acc_end();
tm_arr[1].acc_start(); tm_arr[1].acc_start();
t1.reinit(); t1.reinit();
markdups(curArgP); markdups(curArgP);
// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
tm_arr[1].acc_end(); tm_arr[1].acc_end();
if (!isFirstRound) { if (!isFirstRound) {
tm_arr[2].acc_start(); tm_arr[2].acc_start();
t1.reinit(); t1.reinit();
handleIntersectData(lastArgP, curArgP, &gData); handleIntersectData(lastArgP, curArgP, &gData);
// cout << "intersect time: " << t1.seconds_elapsed() << " s" << cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl;
// endl;
// addTaskIdxToSet(lastArgP, &gData); // addTaskIdxToSet(lastArgP, &gData);
tm_arr[2].acc_end(); tm_arr[2].acc_end();
} else { } else {
@ -1024,14 +1130,11 @@ void parallelMarkDups() {
// cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl; // cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl;
} }
} }
// cout << "here" << endl; cout << "here" << endl;
tm_arr[3].acc_start(); tm_arr[3].acc_start();
// 处理剩下的全局数据 // 处理剩下的全局数据
handleLastTask(lastArgP, &gData); handleLastTask(lastArgP, &gData);
// 计算各种统计指标
// calculateMetrics(*lastArgP, *curArgP, gData, &gMetrics);
// cout << "here 2" << endl; // cout << "here 2" << endl;
tm_arr[3].acc_end(); tm_arr[3].acc_end();
@ -1041,20 +1144,44 @@ void parallelMarkDups() {
int64_t opticalDupNum = 0; int64_t opticalDupNum = 0;
int64_t singletonNum = 0; int64_t singletonNum = 0;
int64_t dupNumDic = 0;
int64_t singletonNumDic = 0;
map<int64_t, int> dup; map<int64_t, int> dup;
int taskSeq = 0; int taskSeq = 0;
// for (auto &arr : gData.singletonIdxArr) { // for (auto &arr : gData.dupIdxArr) {
// for (auto idx : arr) { // for (auto idx : arr) {
// if (dup.find(idx) != dup.end()) { // if (dup.find(idx.idx) != dup.end()) {
// // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' // // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t'
// // << idx << endl; // // << idx << endl;
// } // }
// dup[idx] = taskSeq; // dup[idx.idx] = taskSeq;
// } // }
// // cout << taskSeq << "\t" << arr.size() << endl; // // cout << taskSeq << "\t" << arr.size() << endl;
// taskSeq++; // taskSeq++;
// } // }
// dupNumDic = dup.size();
// dup.clear();
// int notInMetrics = 0;
// cout << "gmetrics single count: " << gMetrics.singletonReads.size() << endl;
// for (auto &arr : gData.singletonIdxArr) {
// for (auto idx : arr) {
// dup[idx] = 1;
// if (gMetrics.singletonReads.find(idx) == gMetrics.singletonReads.end()) {
//// cout << "not in gmetrics: " << idx << endl;
// ++notInMetrics;
// } else {
// gMetrics.singletonReads.erase(idx);
// }
// }
// }
// singletonNumDic = dup.size();
// cout << "not in arr: " << endl;
// for (auto idx : gMetrics.singletonReads) {
//// cout << idx << endl;
// }
// cout << "count: " << notInMetrics << "\t" << gMetrics.singletonReads.size() << endl;
// #include <fstream> // #include <fstream>
// ofstream out("tumor_dup.txt"); // ofstream out("tumor_dup.txt");
@ -1068,8 +1195,11 @@ void parallelMarkDups() {
for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size();
for (auto &arr : gData.singletonIdxArr) singletonNum += arr.size(); for (auto &arr : gData.singletonIdxArr) singletonNum += arr.size();
cout << "dup num : " << dupNum << '\t' << dup.size() << "\t" << zzhtestnum << endl; cout << "dup num : " << dupNum << '\t' << dupNumDic << "\t" << zzhtestnum << endl;
cout << "singleton : " << singletonNum << endl; cout << "singleton : " << singletonNum << "\t" << singletonNumDic << "\t" << gSingletonNum << endl;
cout << "singleton size: " << gMetrics.singletonReads.size() << endl;
cout << "dup read size: " << gMetrics.dupReads.size() << endl;
cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl;
cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl;
cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl; cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl;
@ -1082,7 +1212,6 @@ void parallelMarkDups() {
cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl;
cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl;
cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl;
cout << "singleton size: " << gMetrics.singletonReads.size() << endl;
Timer::log_time("parallel end "); Timer::log_time("parallel end ");
cout << "read num sum: " << readNumSum << endl; cout << "read num sum: " << readNumSum << endl;

View File

@ -144,8 +144,8 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
} }
for (auto pe : vpRe) { // 对非best read标记冗余 for (auto pe : vpRe) { // 对非best read标记冗余
if (pe != pBest) { // 非best if (pe != pBest) { // 非best
gMetrics.dupReads.insert(pe->read1IndexInFile); // gMetrics.dupReads.insert(pe->read1IndexInFile);
gMetrics.dupReads.insert(pe->read2IndexInFile); // gMetrics.dupReads.insert(pe->read2IndexInFile);
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1
if (pe->read2IndexInFile != pe->read1IndexInFile) if (pe->read2IndexInFile != pe->read1IndexInFile)
dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2,
@ -157,8 +157,8 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
opticalDupIdx->insert(pe->read2IndexInFile); opticalDupIdx->insert(pe->read2IndexInFile);
} }
} else { } else {
gMetrics.dupReads.erase(pe->read1IndexInFile); // for test // gMetrics.dupReads.erase(pe->read1IndexInFile); // for test
gMetrics.dupReads.erase(pe->read2IndexInFile); // gMetrics.dupReads.erase(pe->read2IndexInFile);
} }
} }
// 在输出的bam文件中添加tag, best作为dupset的代表 // 在输出的bam文件中添加tag, best作为dupset的代表
@ -184,7 +184,7 @@ static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs,
for (auto pe : vpRe) { for (auto pe : vpRe) {
if (!pe->IsPaired()) { if (!pe->IsPaired()) {
dupIdx->insert(pe->read1IndexInFile); dupIdx->insert(pe->read1IndexInFile);
gMetrics.dupReads.insert(pe->read1IndexInFile); // gMetrics.dupReads.insert(pe->read1IndexInFile);
} }
} }
} else { } else {
@ -202,9 +202,9 @@ static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs,
for (auto pe : vpRe) { for (auto pe : vpRe) {
if (pe != pBest) { if (pe != pBest) {
dupIdx->insert(pe->read1IndexInFile); dupIdx->insert(pe->read1IndexInFile);
gMetrics.dupReads.insert(pe->read1IndexInFile); // gMetrics.dupReads.insert(pe->read1IndexInFile);
} else { } else {
gMetrics.dupReads.erase(pe->read1IndexInFile); // gMetrics.dupReads.erase(pe->read1IndexInFile);
} }
} }
} }
@ -281,6 +281,9 @@ static void generateReadEnds(MarkDupDataArg *arg) {
} }
// cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << // cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() <<
// endl; // endl;
cout << "frags num: " << p.frags.size() << endl;
cout << "pairs num: " << p.pairs.size() << endl;
cout << "dic num : " << p.unpairedDic.size() << endl;
} }
/* 处理pairs */ /* 处理pairs */
@ -1013,14 +1016,13 @@ void serialMarkDups() {
tm_arr[0].acc_start(); tm_arr[0].acc_start();
Timer t1; Timer t1;
generateReadEnds(curArgP); generateReadEnds(curArgP);
// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl;
// endl;
tm_arr[0].acc_end(); tm_arr[0].acc_end();
tm_arr[1].acc_start(); tm_arr[1].acc_start();
t1.reinit(); t1.reinit();
markdups(curArgP); markdups(curArgP);
// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
tm_arr[1].acc_end(); tm_arr[1].acc_end();
if (!isFirstRound) { if (!isFirstRound) {