diff --git a/.vscode/launch.json b/.vscode/launch.json index fcd3c60..2aabd92 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,11 +13,11 @@ "program": "${workspaceRoot}/build/bin/picard_cpp", "args": [ "MarkDuplicates", - "--INPUT", "~/data/bam/zy_normal.bam", + "--INPUT", "~/data/bam/100w.bam", "--OUTPUT", "out.bam", "--METRICS_FILE", "metrics.txt", "--num_threads", "1", - "--max_mem", "100G", + "--max_mem", "1G", "--verbosity", "DEBUG", "--asyncio", "true", ], diff --git a/run.sh b/run.sh index b0f8145..c60a5f5 100755 --- a/run.sh +++ b/run.sh @@ -1,6 +1,6 @@ -input=~/data/bam/zy_normal.bam +#input=~/data/bam/zy_normal.bam #input=~/data/bam/zy_tumor.bam -#input=~/data/bam/100w.bam +input=~/data/bam/100w.bam #input=~/data/bam/1kw.sam #input=~/data/bam/n1kw.sam @@ -10,7 +10,7 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ --OUTPUT ~/data/bam/out.bam \ --INDEX_FORMAT BAI \ --num_threads 1 \ - --max_mem 1G \ + --max_mem 2G \ --verbosity DEBUG \ --asyncio true #\ #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 32b48a2..a93051e 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -154,14 +154,13 @@ int MarkDuplicates(int argc, char *argv[]) { // BamBufType inBuf(false); BamBufType inBuf(g_gArg.use_asyncio); inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); - DupIdxQueue idxQue; - idxQue.Init(&gData.dupIdxArr); + DupIdxQueue dupIdxQue; + dupIdxQue.Init(&gData.dupIdxArr); Timer tw; - cout << "dupsize: " << idxQue.Size() << endl; + cout << "dupsize: " << dupIdxQue.Size() << endl; uint64_t bamIdx = 0; - uint64_t dupIdx = idxQue.Pop(); - cout << "dup arr size: " << gData.dupIdxArr.size() << endl; - cout << "first dup: " << dupIdx << endl; + DupInfo dupIdx = dupIdxQue.Pop(); + while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); @@ -169,8 +168,12 @@ int MarkDuplicates(int argc, char *argv[]) { for (size_t i = 0; i < inBuf.Size(); ++i) { /* 判断是否冗余 */ if (bamIdx == dupIdx) { - // cerr << bamIdx << endl; - dupIdx = idxQue.Pop(); + cerr << bamIdx << " " << dupIdx.repIdx << " " << dupIdx.dupSet << endl; + // 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同 + while((dupIdx = dupIdxQue.Pop()) == bamIdx); + } + if (bamIdx == -1) { // repressent + } if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); @@ -183,7 +186,7 @@ int MarkDuplicates(int argc, char *argv[]) { inBuf.ClearAll(); cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl; } - cout << "dupsize: " << idxQue.Size() << endl; + cout << "dupsize: " << dupIdxQue.Size() << endl; if (sam_idx_save(g_outBamFp) < 0) { Error("writing index failed"); sam_close(g_outBamFp); diff --git a/src/sam/markdups/md_funcs.h b/src/sam/markdups/md_funcs.h index 054c3a6..81dd9a3 100644 --- a/src/sam/markdups/md_funcs.h +++ b/src/sam/markdups/md_funcs.h @@ -71,29 +71,31 @@ struct DupContainer { } }; - /* * 优先队列,用最小堆来实现对所有冗余索引的排序 */ +template struct PairArrIdIdx { int arrId = 0; uint64_t arrIdx = 0; - int64_t dupIdx = 0; + T dupIdx = 0; }; +template struct IdxGreaterThan { - bool operator()(const PairArrIdIdx &a, const PairArrIdIdx &b) { return a.dupIdx > b.dupIdx; } + bool operator()(const PairArrIdIdx &a, const PairArrIdIdx &b) { return a.dupIdx > b.dupIdx; } }; +template struct DupIdxQueue { // 将冗余索引和他对应的task vector对应起来 // 由于是多个task来查找冗余,所以每次找到的冗余index都放在一个独立的vector中,vector之间可能有重叠,所以需要用一个最小堆来维护 - vector> *dupIdx2DArr; - priority_queue, IdxGreaterThan> minHeap; + vector> *dupIdx2DArr; + priority_queue, vector>, IdxGreaterThan> minHeap; uint64_t popNum = 0; - int Init(vector> *_dupIdx2DArr) { + int Init(vector> *_dupIdx2DArr) { dupIdx2DArr = _dupIdx2DArr; if (dupIdx2DArr == nullptr) { return -1; @@ -107,8 +109,8 @@ struct DupIdxQueue { return 0; } - int64_t Pop() { - int64_t ret = -1; + T Pop() { + T ret = -1; if (!minHeap.empty()) { auto idx = minHeap.top(); minHeap.pop(); @@ -133,6 +135,7 @@ struct DupIdxQueue { } }; + /* * 用来检测optical duplication的graph */ diff --git a/src/sam/markdups/serial_md.cpp b/src/sam/markdups/serial_md.cpp index b1895b5..62359b7 100644 --- a/src/sam/markdups/serial_md.cpp +++ b/src/sam/markdups/serial_md.cpp @@ -65,8 +65,8 @@ static inline void sortReadEndsArr(vector &arr) { } /* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ -static void markDupsForPairs(vector &vpRe, set *dupIdx, set *opticalDupIdx, - set *notDupIdx = nullptr) { +static void markDupsForPairs(vector &vpRe, set *dupIdx, set *opticalDupIdx = nullptr, + set *notDupIdx = nullptr, set *notOpticalDupIdx = nullptr) { if (vpRe.size() < 2) { if (vpRe.size() == 1) { // addSingletonToCount(libraryIdGenerator); @@ -78,11 +78,11 @@ static void markDupsForPairs(vector &vpRe, set *dupId } int maxScore = 0; const ReadEnds *pBest = nullptr; - int maxOperateTime = 0; +// int maxOperateTime = 0; /** All read ends should have orientation FF, FR, RF, or RR **/ for (auto pe : vpRe) { // 找分数最高的readend - maxOperateTime = max(maxOperateTime, pe->oprateTime); - (const_cast(pe))->oprateTime ++; +// maxOperateTime = max(maxOperateTime, pe->oprateTime); +// (const_cast(pe))->oprateTime ++; if (pe->score > maxScore || pBest == nullptr) { maxScore = pe->score; pBest = pe; @@ -94,15 +94,40 @@ static void markDupsForPairs(vector &vpRe, set *dupId notDupIdx->insert(pBest->read1IndexInFile); notDupIdx->insert(pBest->read2IndexInFile); } + //if (false) { if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余 // trackOpticalDuplicates + vector prevOpticalRe; + if (notOpticalDupIdx != nullptr) { + for (auto pe : vpRe) { + if (pe->isOpticalDuplicate) { + prevOpticalRe.push_back(pe); + } + } + } trackOpticalDuplicates(vpRe, pBest); + // 由于重叠问题,可能会更新数据 + if (notOpticalDupIdx != nullptr) { + for (auto pe : prevOpticalRe) { + if (!pe->isOpticalDuplicate) { + notOpticalDupIdx->insert(pe->read1IndexInFile); + notOpticalDupIdx->insert(pe->read2IndexInFile); + } + } + } } - for (auto pe : vpRe) { // 对非best read标记冗余 + for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best - dupIdx->insert(pe->read1IndexInFile); // 添加read1 + dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) - dupIdx->insert(pe->read2IndexInFile); // 添加read2 + dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); //read2, + // 注意这里代表是read1的索引 + // 检查是否optical dup + if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) { + opticalDupIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + opticalDupIdx->insert(pe->read2IndexInFile); + } } } // 在输出的bam文件中添加tag @@ -112,7 +137,7 @@ static void markDupsForPairs(vector &vpRe, set *dupId } /* 处理一组非paired的readends,标记冗余 */ -static void markDupsForFrags(vector &vpRe, bool containsPairs, set *dupIdx, +static void markDupsForFrags(vector &vpRe, bool containsPairs, set *dupIdx, set *notDupIdx = nullptr) { if (containsPairs) { for (auto pe : vpRe) { @@ -207,25 +232,25 @@ static void generateReadEnds(SerailMarkDupArg *arg) { } /* 处理pairs */ -static void processPairs(vector &readEnds, set *dupIdx, set *opticalDupIdx, - set *notDupIdx = nullptr) { +static void processPairs(vector &readEnds, set *dupIdx, set *opticalDupIdx, + set *notDupIdx = nullptr, set *notOpticalDupIdx = nullptr) { vector vpCache; // 有可能是冗余的reads const ReadEnds *pReadEnd = nullptr; for (auto &re : readEnds) { if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 vpCache.push_back(&re); // 处理一个潜在的冗余组 else { - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); // 不一样 + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); // 不一样 vpCache.clear(); vpCache.push_back(&re); pReadEnd = &re; } } - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx); + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx); } /* 处理frags */ -static void processFrags(vector &readEnds, set *dupIdx, set *notDupIdx = nullptr) { +static void processFrags(vector &readEnds, set *dupIdx, set *notDupIdx = nullptr) { bool containsPairs = false; bool containsFrags = false; vector vpCache; // 有可能是冗余的reads @@ -296,7 +321,7 @@ static inline void getIntersectData(vector &leftArr, vector } /* 将frags重叠部分的dup idx放进数据中 */ -static inline void refreshFragDupIdx(set &dupIdx, set ¬DupIdx, SerailMarkDupArg *lastArg, +static inline void refreshFragDupIdx(set &dupIdx, set ¬DupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { auto &lp = *lastArg; auto &p = *curArg; @@ -311,8 +336,8 @@ static inline void refreshFragDupIdx(set &dupIdx, set ¬DupI } /* 将pairs重叠部分的dup idx放进数据中 */ -static inline void refreshPairDupIdx(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, - SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { +static inline void refreshPairDupIdx(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, + set ¬OpticalDupIdx, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) { auto &lp = *lastArg; auto &p = *curArg; for (auto idx : dupIdx) { @@ -325,31 +350,38 @@ static inline void refreshPairDupIdx(set &dupIdx, set &optical } for (auto idx : notDupIdx) { lp.pairDupIdx.erase(idx); - lp.pairOpticalDupIdx.erase(idx); + //lp.pairOpticalDupIdx.erase(idx); p.pairDupIdx.erase(idx); + //p.pairOpticalDupIdx.erase(idx); + } + for (auto idx : notOpticalDupIdx) { + lp.pairOpticalDupIdx.erase(idx); p.pairOpticalDupIdx.erase(idx); } } // 用来分别处理dup和optical dup -static void refeshTaskDupInfo(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, - set &latterDupIdx, set &latterOpticalDupIdx, - set &latterNotDupIdx) { +static void refeshTaskDupInfo(set &dupIdx, set &opticalDupIdx, set ¬DupIdx, + set ¬OpticalDupIdx, set &latterDupIdx, + set &latterOpticalDupIdx, set &latterNotDupIdx, + set &latterNotOpticalDupIdx) { for (auto idx : dupIdx) latterDupIdx.insert(idx); for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); for (auto idx : notDupIdx) latterNotDupIdx.insert(idx); + for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx); } /* 最后合并数据并排序 */ -static void refeshFinalTaskDupInfo(set &dupIdx, set ¬DupIdx, vector &dupArr) { - vector midArr; +template +static void refeshFinalTaskDupInfo(set &dupIdx, set ¬DupIdx, vector &dupArr) { + vector midArr; auto ai = dupArr.begin(); auto bi = dupIdx.begin(); auto ae = dupArr.end(); auto be = dupIdx.end(); - int64_t val = 0; + T val = 0; while (ai != ae && bi != be) { if (*ai < *bi) { val = *ai++; @@ -385,7 +417,8 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur auto &g = *gDataArg; vector reArr; - set dupIdx; + set dupIdx; + set notOpticalDupIdx; set notDupIdx; // 先处理重叠的frags getIntersectData(lp.frags, p.frags, &reArr); @@ -398,8 +431,8 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur notDupIdx.clear(); set opticalDupIdx; getIntersectData(lp.pairs, p.pairs, &reArr, true); - processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx); - refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, &lp, &p); + processPairs(reArr, &dupIdx, &opticalDupIdx, ¬DupIdx, ¬OpticalDupIdx); + refreshPairDupIdx(dupIdx, opticalDupIdx, notDupIdx, notOpticalDupIdx, &lp, &p); // 处理之前未匹配的部分 map recalcPos; @@ -550,7 +583,7 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur pairArrP = &g.unpairedPosArr[posKey].pairArr; else pairArrP = &lp.unpairedPosArr[posKey].pairArr; - processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx); + processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.notDupIdx, &t.notOpticalDupIdx); if (taskSeq < lp.taskSeq) g.unpairedPosArr.erase(posKey); } @@ -564,23 +597,24 @@ static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *cur auto taskSeq = e.first; auto &t = e.second; if (taskSeq < lp.taskSeq) { - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, g.latterDupIdxArr[taskSeq], - g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]); + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], + g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); } else if (taskSeq == lp.taskSeq) { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &lp, &p); + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &lp, &p); } else { - refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, &p, &lp); // 把结果放到p中 + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, &p, &lp); // 把结果放到p中 } } // cout << "remain unpaired: " << g.unpairedDic.size() << '\t' << // g.unpairedPosArr.size() << endl; cout << "calc g time: " << // t.seconds_elapsed() << " s" << endl; 将dupidx放进全局数据 - g.latterDupIdxArr.push_back(set()); + g.latterDupIdxArr.push_back(set()); g.latterOpticalDupIdxArr.push_back(set()); g.latterNotDupIdxArr.push_back(set()); + g.latterNotOpticalDupIdxArr.push_back(set()); - g.dupIdxArr.push_back(vector()); + g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); @@ -629,8 +663,8 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { for (auto &e : taskChanged) { auto taskSeq = e.first; auto &t = e.second; - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, g.latterDupIdxArr[taskSeq], - g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq]); + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.notDupIdx, t.notOpticalDupIdx, g.latterDupIdxArr[taskSeq], + g.latterOpticalDupIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq]); } cout << "last unpair info: " << g.unpairedDic.size() << '\t' << g.unpairedPosArr.size() << endl; @@ -641,9 +675,9 @@ static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) { for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i]); for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotDupIdxArr[i], g.opticalDupIdxArr[i]); + refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i]); - g.dupIdxArr.push_back(vector()); + g.dupIdxArr.push_back(vector()); auto &vIdx = g.dupIdxArr.back(); lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); @@ -734,19 +768,21 @@ void serialMarkDups() { tm_arr[5].acc_end(); // 统计所有冗余index数量 int64_t dupNum = 0; - map dup; + int64_t opticalDupNum = 0; + + map dup; +// int taskSeq = 0; +// for (auto &arr : gData.dupIdxArr) { +// for (auto idx : arr) { +// if (dup.find(idx.idx) != dup.end()) { +// // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' +// // << idx << endl; +// } +// dup[idx.idx] = taskSeq; +// } +// taskSeq++; +// } - int taskSeq = 0; - for (auto &arr : gData.dupIdxArr) { - for (auto idx : arr) { - if (dup.find(idx) != dup.end()) { - // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' - // << idx << endl; - } - dup[idx] = taskSeq; - } - taskSeq++; - } // #include // ofstream out("tumor_dup.txt"); // for (auto idx : dup) @@ -756,6 +792,7 @@ void serialMarkDups() { // out.close(); for (auto &arr : gData.dupIdxArr) dupNum += arr.size(); + for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); cout << "dup num : " << dupNum << '\t' << dup.size() << endl; @@ -774,6 +811,7 @@ void serialMarkDups() { << gMetrics.OpticalDuplicatesCountHist << "\t" << gMetrics.OpticalDuplicatesByLibraryId << endl; cout << "optical dup: " << zzhopticalSet.size() << endl; cout << "optical arr dup: " << zzhopticalArr.size() << endl; + cout << "optical size: " << opticalDupNum << endl; Timer::log_time("serial end "); diff --git a/src/sam/markdups/serial_md.h b/src/sam/markdups/serial_md.h index 4a1f392..78fc550 100644 --- a/src/sam/markdups/serial_md.h +++ b/src/sam/markdups/serial_md.h @@ -30,11 +30,31 @@ struct CalcKey { } }; +/* 用来记录冗余索引相关的信息 */ +struct DupInfo { + int64_t idx; + int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引 + int16_t dupSet = 0; // dup set size + + DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) { } + DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {} + bool operator<(const DupInfo &o) const { + return idx < o.idx; + } + bool operator>(const DupInfo &o) const { + return idx > o.idx; + } + operator int64_t() const { + return idx; + } +}; + /* 当遗留数据在当前任务找到了pair read后,进行冗余计算时候存放结果的数据结构 */ struct TaskSeqDupInfo { - set dupIdx; + set dupIdx; set opticalDupIdx; set notDupIdx; + set notOpticalDupIdx; }; /* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */ @@ -57,9 +77,9 @@ struct SerailMarkDupArg { vector bams; // 存放待处理的bam read vector pairs; // 成对的reads vector frags; // 暂未找到配对的reads - set pairDupIdx; // pair的冗余read的索引 + set pairDupIdx; // pair的冗余read的索引 set pairOpticalDupIdx; // optical冗余read的索引 - set fragDupIdx; // frag的冗余read的索引 + set fragDupIdx; // frag的冗余read的索引 UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 }; @@ -70,13 +90,14 @@ struct GlobalDataArg { UnpairedPositionMap unpairedPosArr; // 每个task对应一个vector - vector> dupIdxArr; + vector> dupIdxArr; vector> opticalDupIdxArr; // 用来存放后续计算的数据 - vector> latterDupIdxArr; + vector> latterDupIdxArr; vector> latterOpticalDupIdxArr; vector> latterNotDupIdxArr; + vector> latterNotOpticalDupIdxArr; }; // 串行运行mark duplicate