From 96887105735e8c18ec3ae00e4dd5d34992d1e631 Mon Sep 17 00:00:00 2001 From: zzh Date: Thu, 14 Nov 2024 23:16:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E5=AE=9E=E7=8E=B0=E7=9A=84?= =?UTF-8?q?=E5=88=9D=E6=AD=A5=E7=89=88=E6=9C=AC=EF=BC=8C=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E4=B8=80=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + run.sh | 10 +- src/CMakeLists.txt | 4 +- src/sam/markdups/markdups.cpp | 8 +- src/sam/markdups/md_types.h | 76 ++++++++ src/sam/markdups/parallel_md.cpp | 287 ++++++++++++++++++++++--------- src/sam/markdups/serial_md.cpp | 22 +-- 7 files changed, 314 insertions(+), 94 deletions(-) diff --git a/.gitignore b/.gitignore index b04fd59..9dcbadc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # for fast-markdup +*.bai core.* *.sam *.bam diff --git a/run.sh b/run.sh index c7b814f..4e8d418 100755 --- a/run.sh +++ b/run.sh @@ -1,3 +1,11 @@ +#nthread=1 +#nthread=2 +#nthread=4 +#nthread=8 +#nthread=16 +nthread=32 +#nthread=64 +#nthread=128 #input=~/data/bam/zy_normal.bam input=~/data/bam/zy_tumor.bam #input=~/data/bam/100w.bam @@ -14,7 +22,7 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ --INPUT $input \ --OUTPUT ./out.bam \ --INDEX_FORMAT BAI \ - --num_threads 1 \ + --num_threads $nthread \ --max_mem 2G \ --verbosity DEBUG \ --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null # diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e521568..9fa1ce6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,6 +9,8 @@ AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/common/hts HTS) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam SAM_SRC) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam/markdups SAM_MARKDUPS_SRC) +set(KTHREAD_FILE ${PROJECT_SOURCE_DIR}/lib/klib/kthread.c) + # 头文件目录 INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/src") INCLUDE_DIRECTORIES("${PROJECT_SOURCE_DIR}/lib") @@ -24,7 +26,7 @@ set(PG_NAME "picard_cpp") # 为程序添加依赖关系 ADD_EXECUTABLE(${PG_NAME} ${MAIN_SRC} ${COMMON} ${UTILS} ${HTS} - ${SAM_SRC} ${SAM_MARKDUPS_SRC}) + ${SAM_SRC} ${SAM_MARKDUPS_SRC} ${KTHREAD_FILE}) # 链接库 TARGET_LINK_LIBRARIES(${PG_NAME} libhts.a) diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 849d42b..e9e5194 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -177,7 +177,8 @@ int MarkDuplicates(int argc, char *argv[]) { /* 冗余检查和标记 */ if (g_gArg.num_threads == 1) { - serialMarkDups(); // 串行运行 + // serialMarkDups(); // 串行运行 + parallelMarkDups(); // 并行运行 } else { parallelMarkDups(); // 并行运行 } @@ -242,7 +243,7 @@ int MarkDuplicates(int argc, char *argv[]) { uint32_t duplicateSetSize = 0; int64_t realDupSize = 0; - // exit(0); + exit(0); while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); @@ -277,7 +278,8 @@ int MarkDuplicates(int argc, char *argv[]) { /* 判断是否冗余 */ if (bamIdx == dupIdx) { - ++realDupSize; // for test + // cerr << bamIdx << endl; + ++realDupSize; // for test isDup = true; if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { isInDuplicateSet = true; diff --git a/src/sam/markdups/md_types.h b/src/sam/markdups/md_types.h index 6f51a08..492c691 100644 --- a/src/sam/markdups/md_types.h +++ b/src/sam/markdups/md_types.h @@ -5,11 +5,14 @@ #include #include +#include #include #include +#include #include #include +using std::priority_queue; using std::set; using std::string; using std::unordered_set; @@ -127,6 +130,24 @@ struct MarkDupDataArg { UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 }; +/* 并行计算相关参数 */ +struct ParallelData { + vector pairs; // 成对的reads + vector frags; // 暂未找到配对的reads + UnpairedNameMap unpairedDic; // 用来寻找pair end +}; +struct ParallelDataArg { + vector threadData; + vector*> pairs2d; // 成对的reads + vector*> frags2d; // 暂未找到配对的reads + MarkDupDataArg *mdArg; + void Init(int nThread) { + for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组 + threadData.push_back(ParallelData()); + } + } +}; + /* 全局保留的数据,因为有些paired数据比对到了不同的染色体,相距甚远 */ struct GlobalDataArg { UnpairedNameMap unpairedDic; // 用来寻找pair end @@ -147,4 +168,59 @@ struct GlobalDataArg { vector> latterNotOpticalDupIdxArr; vector> latterNotRepIdxArr; vector> latterNotSingletonIdxArr; +}; + +struct ReadEndsArrIdIdx { + int arrId = 0; + uint64_t arrIdx = 0; + const ReadEnds *pE = nullptr; +}; + +struct ReadEndsGreaterThan { + bool operator()(const ReadEndsArrIdIdx &a, const ReadEndsArrIdIdx &b) { return *b.pE < *a.pE; } +}; +struct ReadEndsQueue { + // 将冗余索引和他对应的task vector对应起来 + vector*> *arr2d; + priority_queue, ReadEndsGreaterThan> minHeap; + uint64_t popNum = 0; + + int Init(vector*> *_arr2d) { + arr2d = _arr2d; + if (arr2d == nullptr) { + return -1; + } + for (int i = 0; i < arr2d->size(); ++i) { + auto v = (*arr2d)[i]; + if (!v->empty()) { + minHeap.push({i, 1, &(*v)[0]}); + } + } + return 0; + } + + const ReadEnds *Pop() { + const ReadEnds *ret = nullptr; + if (!minHeap.empty()) { + auto idx = minHeap.top(); + minHeap.pop(); + ++popNum; + ret = idx.pE; + auto v = (*arr2d)[idx.arrId]; + if (v->size() > idx.arrIdx) { + minHeap.push({idx.arrId, idx.arrIdx + 1, &(*v)[idx.arrIdx]}); + } + } + return ret; + } + + uint64_t Size() { + uint64_t len = 0; + if (arr2d != nullptr) { + for (auto v : *arr2d) { + len += v->size(); + } + } + return len - popNum; + } }; \ No newline at end of file diff --git a/src/sam/markdups/parallel_md.cpp b/src/sam/markdups/parallel_md.cpp index d00ca22..9519f15 100644 --- a/src/sam/markdups/parallel_md.cpp +++ b/src/sam/markdups/parallel_md.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -71,15 +72,19 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup // 这个统计可能会有误差,因为当前位点可能还有没匹配上的read,导致当前位点的read(paired)数量为1 // 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要 // gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile); - // singletonIdx->insert(vpRe[0]->read1IndexInFile); + singletonIdx->insert(vpRe[0]->read1IndexInFile); } return; } - // if (notSingletonIdx != nullptr) { - // for (auto pe : vpRe) { - // notSingletonIdx->insert(pe->read1IndexInFile); - // } - // } + for (auto pe : vpRe) { + // gMetrics.singletonReads.erase(pe->read1IndexInFile); + } + + if (notSingletonIdx != nullptr) { + for (auto pe : vpRe) { + notSingletonIdx->insert(pe->read1IndexInFile); + } + } int maxScore = 0; const ReadEnds *pBest = nullptr; @@ -139,6 +144,8 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup } for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best +// gMetrics.dupReads.insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read2IndexInFile); dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, @@ -149,6 +156,9 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup if (pe->read2IndexInFile != pe->read1IndexInFile) opticalDupIdx->insert(pe->read2IndexInFile); } + } else { +// gMetrics.dupReads.erase(pe->read1IndexInFile); // for test +// gMetrics.dupReads.erase(pe->read2IndexInFile); } } // 在输出的bam文件中添加tag, best作为dupset的代表 @@ -174,6 +184,7 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (!pe->IsPaired()) { dupIdx->insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read1IndexInFile); } } } else { @@ -191,6 +202,9 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (pe != pBest) { dupIdx->insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read1IndexInFile); + } else { +// gMetrics.dupReads.erase(pe->read1IndexInFile); } } } @@ -202,39 +216,37 @@ static void getEqualRE(const ReadEnds &re, vector &src, vectorinsert(dst->end(), range.first, range.second); } -/* 单线程生成readends (第一步)*/ -static void generateReadEnds(MarkDupDataArg *arg) { - auto &p = *arg; - auto &rnParser = g_vRnParser[0]; +#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread)) +#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread)) +static void threadGenerateReadEnds(void *data, long idx, int tid) { + auto &p = (*(ParallelDataArg *)data).threadData[idx]; + auto &mdArg = *(*(ParallelDataArg *)data).mdArg; + auto &rnParser = g_vRnParser[idx]; + int nThread = g_gArg.num_threads; p.pairs.clear(); p.frags.clear(); p.unpairedDic.clear(); - p.unpairedPosArr.clear(); + size_t start_id = LOWER_BOUND(idx, nThread, mdArg.bams.size()); + size_t end_id = UPPER_BOUND(idx, nThread, mdArg.bams.size()); + // if (mdArg.taskSeq == 163) + // cout << "tid: " << tid << "\t" << idx << "\t" << start_id << "\t" << end_id << "\t" << mdArg.bams.size() << endl; - // bam_num1 += p.bams.size(); - /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - // MDSet reSet; - // ReadEnds lastRe; - - for (size_t i = 0; i < p.bams.size(); ++i) { // 循环处理每个read - BamWrap *bw = p.bams[i]; - // ++bam_num1; - const int64_t bamIdx = p.bamStartIdx + i; + for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read + BamWrap *bw = mdArg.bams[i]; + const int64_t bamIdx = mdArg.bamStartIdx + i; if (bw->GetReadUnmappedFlag()) { if (bw->b->core.tid == -1) // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). break; } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 ReadEnds fragEnd; - tm_arr[8].acc_start(); buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); - tm_arr[8].acc_end(); p.frags.push_back(fragEnd); // 添加进frag集合 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 string key = bw->query_name(); if (p.unpairedDic.find(key) == p.unpairedDic.end()) { - p.unpairedDic[key] = {p.taskSeq, fragEnd}; + p.unpairedDic[key] = {mdArg.taskSeq, fragEnd}; } else { // 找到了pairend auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); @@ -244,14 +256,92 @@ static void generateReadEnds(MarkDupDataArg *arg) { } } } - tm_arr[9].acc_start(); - sortReadEndsArr(p.frags); - // sort(p.frags.begin(), p.frags.end()); - tm_arr[9].acc_end(); - // cout << "sort pairs" << endl; - tm_arr[10].acc_start(); + // sortReadEndsArr(p.frags); + sort(p.frags.begin(), p.frags.end()); sort(p.pairs.begin(), p.pairs.end()); - tm_arr[10].acc_end(); + // cout << tid << "\tthread end" << endl; +// if (mdArg.taskSeq == 163) +// cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl; +} + +/* 单线程生成readends (第一步)*/ +static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { + auto &p = *arg; + auto &rnParser = g_vRnParser[0]; + + p.pairs.clear(); + p.frags.clear(); + p.unpairedDic.clear(); + p.unpairedPosArr.clear(); + +// cout << "read num: " << p.bams.size() << endl; + + pdArg.mdArg = arg; + tm_arr[8].acc_start(); + kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads); + tm_arr[8].acc_end(); + // 合并各个线程数据 + // 查找未匹配的frags + + + vector &pairs = pdArg.threadData[g_gArg.num_threads].pairs; + pairs.clear(); + for (int i = 0; i < g_gArg.num_threads; ++i) { + for (auto &val : pdArg.threadData[i].unpairedDic) { + const string &key = val.first; + const ReadEnds &fragEnd = val.second.unpairedRE; + if (p.unpairedDic.find(key) == p.unpairedDic.end()) { + p.unpairedDic[key] = {p.taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + p.unpairedDic.erase(key); // 删除找到的pairend + } + } + } + + + + sort(pairs.begin(), pairs.end()); + + // 合并所有frags和pairs + pdArg.frags2d.clear(); + pdArg.pairs2d.clear(); + for (int i = 0; i < g_gArg.num_threads; ++i) { + pdArg.frags2d.push_back(&pdArg.threadData[i].frags); + pdArg.pairs2d.push_back(&pdArg.threadData[i].pairs); + } + + if (false) { + cout << "taskseq: " << p.taskSeq << endl; + for (int i = 0; i < g_gArg.num_threads; ++i) { + cout << "thread pair num: " << pdArg.threadData[i].pairs.size() << endl; + cout << "thread frag num: " << pdArg.threadData[i].frags.size() << endl; + cout << "thread dic num: " << pdArg.threadData[i].unpairedDic.size() << endl; + } + } + + pdArg.pairs2d.push_back(&pdArg.threadData[g_gArg.num_threads].pairs); + + ReadEndsQueue fragsQue, pairsQue; + fragsQue.Init(&pdArg.frags2d); + pairsQue.Init(&pdArg.pairs2d); + + tm_arr[9].acc_start(); + const ReadEnds *pRE; + while((pRE = fragsQue.Pop()) != nullptr) { + p.frags.push_back(*pRE); + //cout << pRE->posKey << endl; + } + tm_arr[9].acc_end(); + + tm_arr[10].acc_start(); + while ((pRE = pairsQue.Pop()) != nullptr) { + p.pairs.push_back(*pRE); + //cout << pRE->posKey << endl; + } + // 记录位点上的未匹配的read个数 for (auto &e : p.unpairedDic) { auto posKey = e.second.unpairedRE.posKey; @@ -260,13 +350,11 @@ static void generateReadEnds(MarkDupDataArg *arg) { unpairArrInfo.taskSeq = p.taskSeq; unpairArrInfo.readNameSet.insert(e.first); } - if (p.taskSeq == 98) { - for (auto &re : p.unpairedPosArr[14293757783047].pairArr) { - cout << "task 99 unpair reads: " << re.read1IndexInFile << "\t" << re.read2IndexInFile << endl; - } - } - // cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << - // endl; + tm_arr[10].acc_end(); + // cout << "frags num: " << p.frags.size() << endl; + // cout << "pairs num: " << p.pairs.size() << endl; + // cout << "dic num : " << p.unpairedDic.size() << endl; + // cout << "generate end" << endl; } /* 处理pairs */ @@ -404,13 +492,14 @@ static inline void refreshPairDupIdx(DPSet &dupIdx, MDSet &opt lp.pairSingletonIdx.insert(idx); p.pairSingletonIdx.erase(idx); } - // for (auto idx : notDupIdx) { - // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; - // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; - // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; - // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; - // lp.pairDupIdx.erase(idx); p.pairDupIdx.erase(idx); - // } + for (auto idx : notDupIdx) { + // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; + // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; + // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; + // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; + lp.pairDupIdx.erase(idx); + p.pairDupIdx.erase(idx); + } for (auto idx : notOpticalDupIdx) { lp.pairOpticalDupIdx.erase(idx); p.pairOpticalDupIdx.erase(idx); @@ -491,6 +580,9 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupI dupArr = midArr; } +static int64_t llp_frag_end_pos = 0; +static int64_t llp_pair_end_pos = 0; +static int64_t gSingletonNum = 0; /* 处理相邻的两个任务,有相交叉的数据 */ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) { auto &lp = *lastArg; @@ -510,7 +602,25 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, getIntersectData(lp.frags, p.frags, &reArr); processFrags(reArr, &dupIdx, ¬DupIdx); refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p); - + // cout << "578" << endl; + if (!p.frags.empty()) + if (llp_frag_end_pos >= p.frags.begin()->posKey) { + cout << "frags : " << llp_frag_end_pos << "\t" << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey + << endl; + } + if (!p.pairs.empty()) + if (llp_pair_end_pos >= p.pairs.begin()->posKey) { + cout << "pairs : " << llp_pair_end_pos << "\t" << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey + << endl; + } + if (!p.frags.empty()) + llp_frag_end_pos = lp.frags.rbegin()->posKey; + if (!p.pairs.empty()) + llp_pair_end_pos = lp.pairs.rbegin()->posKey; + // cout << "frags lp: " << lp.frags[0].posKey << "\t" << lp.frags.rbegin()->posKey << endl; + // cout << "frags p : " << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey << endl; + // cout << "pairs lp: " << lp.pairs[0].posKey << "\t" << lp.pairs.rbegin()->posKey << endl; + // cout << "pairs p : " << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey << endl; // 再处理重叠的pairs reArr.clear(); dupIdx.clear(); @@ -518,9 +628,9 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, getIntersectData(lp.pairs, p.pairs, &reArr, true); processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, ¬SingletonIdx); - refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, singletonIdx, - &lp, &p); - + refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, + notSingletonIdx, &lp, &p); +// cout << "606" << endl; // cout << (g.unpairedDic.find("A01415:368:HL7NTDSX3:3:1104:5195:34757") != g.unpairedDic.end()) << endl; // cout << (g.unpairedPosArr.find(14293757783047) != g.unpairedPosArr.end()) << endl; // 处理之前未匹配的部分 @@ -683,7 +793,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, // } } } - +// cout << "769" << endl; map taskChanged; MDSet posProcessed; for (auto &e : recalcPos) { @@ -746,7 +856,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中 } } - +// cout << "832" << endl; // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << // g.unpairedPosArr.size() << endl; cout << "calc g time: " << // t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据 @@ -781,6 +891,7 @@ static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, auto &vSingletonIdx = g.singletonIdxArr.back(); vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); + gSingletonNum += lp.pairSingletonIdx.size(); } /* 当所有任务结束后,global data里还有未处理的数据 */ @@ -821,7 +932,8 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); } else if (arr.size() == 1) { - // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); + // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); + t.singletonIdx.insert(arr[0].read1IndexInFile); } } // 更新结果 @@ -909,6 +1021,9 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { intCacheDupIdx, intMidArr); for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); + for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i], + intCacheDupIdx, intMidArr); g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); @@ -938,18 +1053,7 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { auto &vSingletonIdx = g.singletonIdxArr.back(); vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); -} - -static void calculateMetrics(MarkDupDataArg &lp, MarkDupDataArg &p, GlobalDataArg &g, DuplicationMetrics *pgMetrics) { - DuplicationMetrics &gMetrics = *pgMetrics; - - // gMetrics.READ_PAIRS_EXAMINED /= 2; - - cout << "calculateMetrics start: " << endl; - - cout << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size() << endl; - cout << "all: " << (lp.unpairedDic.size() + p.unpairedDic.size() + g.unpairedDic.size()) << endl; - cout << "calculateMetrics end" << endl; + gSingletonNum += lp.pairSingletonIdx.size(); } /* 串行处理数据,标记冗余 */ @@ -966,6 +1070,8 @@ void parallelMarkDups() { MarkDupDataArg smdArg1, smdArg2; MarkDupDataArg *lastArgP = &smdArg1; MarkDupDataArg *curArgP = &smdArg2; + ParallelDataArg pdArg; + pdArg.Init(g_gArg.num_threads); bool isFirstRound = true; int roundNum = 0; @@ -975,6 +1081,8 @@ void parallelMarkDups() { // 读取bam文件中的read tm_arr[4].acc_start(); size_t readNum = inBamBuf.ReadBam(); + if (readNum < 1) + break; readNumSum += readNum; // readNumSum += inBamBuf.GetBamArr().size(); tm_arr[4].acc_end(); @@ -988,23 +1096,21 @@ void parallelMarkDups() { tm_arr[0].acc_start(); Timer t1; - generateReadEnds(curArgP); - // cout << "calc read end time: " << t1.seconds_elapsed() << " s" << - // endl; + generateReadEnds(pdArg, curArgP); + cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[0].acc_end(); tm_arr[1].acc_start(); t1.reinit(); markdups(curArgP); - // cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; + cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[1].acc_end(); if (!isFirstRound) { tm_arr[2].acc_start(); t1.reinit(); handleIntersectData(lastArgP, curArgP, &gData); - // cout << "intersect time: " << t1.seconds_elapsed() << " s" << - // endl; + cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; // addTaskIdxToSet(lastArgP, &gData); tm_arr[2].acc_end(); } else { @@ -1024,14 +1130,11 @@ void parallelMarkDups() { // cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl; } } - // cout << "here" << endl; + cout << "here" << endl; tm_arr[3].acc_start(); // 处理剩下的全局数据 handleLastTask(lastArgP, &gData); - // 计算各种统计指标 - // calculateMetrics(*lastArgP, *curArgP, gData, &gMetrics); - // cout << "here 2" << endl; tm_arr[3].acc_end(); @@ -1041,20 +1144,44 @@ void parallelMarkDups() { int64_t opticalDupNum = 0; int64_t singletonNum = 0; + int64_t dupNumDic = 0; + int64_t singletonNumDic = 0; + map dup; int taskSeq = 0; - // for (auto &arr : gData.singletonIdxArr) { + // for (auto &arr : gData.dupIdxArr) { // for (auto idx : arr) { - // if (dup.find(idx) != dup.end()) { + // if (dup.find(idx.idx) != dup.end()) { // // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' // // << idx << endl; // } - // dup[idx] = taskSeq; + // dup[idx.idx] = taskSeq; // } // // cout << taskSeq << "\t" << arr.size() << endl; // taskSeq++; // } + // dupNumDic = dup.size(); + // dup.clear(); + // int notInMetrics = 0; + // cout << "gmetrics single count: " << gMetrics.singletonReads.size() << endl; + // for (auto &arr : gData.singletonIdxArr) { + // for (auto idx : arr) { + // dup[idx] = 1; + // if (gMetrics.singletonReads.find(idx) == gMetrics.singletonReads.end()) { + //// cout << "not in gmetrics: " << idx << endl; + // ++notInMetrics; + // } else { + // gMetrics.singletonReads.erase(idx); + // } + // } + // } + // singletonNumDic = dup.size(); + // cout << "not in arr: " << endl; + // for (auto idx : gMetrics.singletonReads) { + //// cout << idx << endl; + // } + // cout << "count: " << notInMetrics << "\t" << gMetrics.singletonReads.size() << endl; // #include // ofstream out("tumor_dup.txt"); @@ -1068,8 +1195,11 @@ void parallelMarkDups() { for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); for (auto &arr : gData.singletonIdxArr) singletonNum += arr.size(); - cout << "dup num : " << dupNum << '\t' << dup.size() << "\t" << zzhtestnum << endl; - cout << "singleton : " << singletonNum << endl; + cout << "dup num : " << dupNum << '\t' << dupNumDic << "\t" << zzhtestnum << endl; + cout << "singleton : " << singletonNum << "\t" << singletonNumDic << "\t" << gSingletonNum << endl; + cout << "singleton size: " << gMetrics.singletonReads.size() << endl; + cout << "dup read size: " << gMetrics.dupReads.size() << endl; + cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl; @@ -1082,7 +1212,6 @@ void parallelMarkDups() { cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; - cout << "singleton size: " << gMetrics.singletonReads.size() << endl; Timer::log_time("parallel end "); cout << "read num sum: " << readNumSum << endl; diff --git a/src/sam/markdups/serial_md.cpp b/src/sam/markdups/serial_md.cpp index 5c3b7d9..218ab32 100644 --- a/src/sam/markdups/serial_md.cpp +++ b/src/sam/markdups/serial_md.cpp @@ -144,8 +144,8 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup } for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best - gMetrics.dupReads.insert(pe->read1IndexInFile); - gMetrics.dupReads.insert(pe->read2IndexInFile); +// gMetrics.dupReads.insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read2IndexInFile); dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, @@ -157,8 +157,8 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup opticalDupIdx->insert(pe->read2IndexInFile); } } else { - gMetrics.dupReads.erase(pe->read1IndexInFile); // for test - gMetrics.dupReads.erase(pe->read2IndexInFile); +// gMetrics.dupReads.erase(pe->read1IndexInFile); // for test +// gMetrics.dupReads.erase(pe->read2IndexInFile); } } // 在输出的bam文件中添加tag, best作为dupset的代表 @@ -184,7 +184,7 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (!pe->IsPaired()) { dupIdx->insert(pe->read1IndexInFile); - gMetrics.dupReads.insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read1IndexInFile); } } } else { @@ -202,9 +202,9 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (pe != pBest) { dupIdx->insert(pe->read1IndexInFile); - gMetrics.dupReads.insert(pe->read1IndexInFile); +// gMetrics.dupReads.insert(pe->read1IndexInFile); } else { - gMetrics.dupReads.erase(pe->read1IndexInFile); +// gMetrics.dupReads.erase(pe->read1IndexInFile); } } } @@ -281,6 +281,9 @@ static void generateReadEnds(MarkDupDataArg *arg) { } // cout << "依赖比例:" << (float)p.unpairedDic.size() / p.frags.size() << // endl; + cout << "frags num: " << p.frags.size() << endl; + cout << "pairs num: " << p.pairs.size() << endl; + cout << "dic num : " << p.unpairedDic.size() << endl; } /* 处理pairs */ @@ -1013,14 +1016,13 @@ void serialMarkDups() { tm_arr[0].acc_start(); Timer t1; generateReadEnds(curArgP); - // cout << "calc read end time: " << t1.seconds_elapsed() << " s" << - // endl; + cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[0].acc_end(); tm_arr[1].acc_start(); t1.reinit(); markdups(curArgP); - // cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; + cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; tm_arr[1].acc_end(); if (!isFirstRound) {