diff --git a/src/markdup/markdup.cpp b/src/markdup/markdup.cpp index d44c2cb..6ec50ef 100644 --- a/src/markdup/markdup.cpp +++ b/src/markdup/markdup.cpp @@ -18,7 +18,8 @@ Date : 2023/10/23 #include "fastdup_version.h" #include "md_args.h" #include "md_funcs.h" -#include "pipeline_md.h" +// #include "pipeline_md.h" +#include "new_pipe.h" #include "read_name_parser.h" #include "util/profiling.h" @@ -33,9 +34,9 @@ sam_hdr_t *gInBamHeader; // 输入的bam文件头信息 samFile *gOutBamFp; // 输出文件, sam或者bam格式 sam_hdr_t *gOutBamHeader; // 输出文件的header DuplicationMetrics gMetrics; // 统计信息 -PipelineArg gPipe; - -}; +DupResult gDupRes; +PipelineArg gPipe(&gDupRes); +}; // namespace nsgv // 字节缓冲区 struct ByteBuf { @@ -98,7 +99,8 @@ int MarkDuplicates() { /* 冗余检查和标记 */ PROF_START(markdup_all); - pipelineMarkDups(); + //pipelineMarkDups(); + NewPipeMarkDups(); PROF_END(gprof[GP_markdup_all], markdup_all); /* 标记冗余, 将处理后的结果写入文件 */ @@ -157,10 +159,15 @@ int MarkDuplicates() { DupIdxQueue dupIdxQue, repIdxQue; DupIdxQueue opticalIdxQue; - dupIdxQue.Init(&nsgv::gPipe.intersectData.dupIdxArr); - repIdxQue.Init(&nsgv::gPipe.intersectData.repIdxArr); - opticalIdxQue.Init(&nsgv::gPipe.intersectData.opticalDupIdxArr); + dupIdxQue.Init(&nsgv::gDupRes.dupIdxArr); + repIdxQue.Init(&nsgv::gDupRes.repIdxArr); + opticalIdxQue.Init(&nsgv::gDupRes.opticalDupIdxArr); spdlog::info("{} duplicate reads found", dupIdxQue.Size()); + spdlog::info("{} optical reads found", opticalIdxQue.Size()); + spdlog::info("{} represent reads found", repIdxQue.Size()); + spdlog::info("real dup size: {}", dupIdxQue.RealSize()); + + return 0; uint64_t bamIdx = 0; DupInfo dupIdx = dupIdxQue.Pop(); @@ -177,7 +184,12 @@ int MarkDuplicates() { int64_t realDupSize = 0; - return 0; + ofstream ofs("newdup.txt"); + + // return 0; + // for debug + int64_t maxDiff = 0; + int64_t minDiff = 0; PROF_START(write); while (inBuf.ReadStat() >= 0) { @@ -214,15 +226,25 @@ int MarkDuplicates() { } /* 判断是否冗余 */ + // cout << dupIdx << endl; if (bamIdx == dupIdx) { + // ofs << bamIdx << endl; ++realDupSize; // for test isDup = true; if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { isInDuplicateSet = true; - representativeReadIndexInFile = dupIdx.repIdx; + representativeReadIndexInFile = dupIdx.GetRepIdx(); duplicateSetSize = dupIdx.dupSet; } +#if 1 + // spdlog::info("diff: {}", dupIdx.idx - dupIdx.repIdx); + //maxDiff = std::max(maxDiff, dupIdx.idx - dupIdx.repIdx); + //minDiff = std::min(minDiff, dupIdx.idx - dupIdx.repIdx); + //spdlog::info("min: {}, max: {}", minDiff, maxDiff); + +#endif + // 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同 while ((dupIdx = dupIdxQue.Pop()) == bamIdx); if (opticalIdx == bamIdx) @@ -262,7 +284,7 @@ int MarkDuplicates() { } if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent isInDuplicateSet = true; - representativeReadIndexInFile = repIdx.repIdx; + representativeReadIndexInFile = repIdx.GetRepIdx(); duplicateSetSize = repIdx.dupSet; while (repIdx == bamIdx || repIdx.dupSet == 0) { if (repIdxQue.Size() > 0) @@ -277,6 +299,7 @@ int MarkDuplicates() { if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) { if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { // cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; + // ofs << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); @@ -304,7 +327,7 @@ int MarkDuplicates() { bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1, (const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str()); } -#if 1 +#if 0 if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) { spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str()); sam_close(nsgv::gOutBamFp); @@ -322,8 +345,7 @@ int MarkDuplicates() { // 计算统计信息 nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2; nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2; - for (auto &arr : nsgv::gPipe.intersectData.opticalDupIdxArr) - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); + for (auto &arr : nsgv::gDupRes.opticalDupIdxArr) nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES = nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2; nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE = estimateLibrarySize(nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES, @@ -376,5 +398,7 @@ int MarkDuplicates() { sam_close(nsgv::gOutBamFp); sam_close(nsgv::gInBamFp); + ofs.close(); + return 0; } \ No newline at end of file diff --git a/src/markdup/md_args.h b/src/markdup/md_args.h index d629eed..e68b11f 100644 --- a/src/markdup/md_args.h +++ b/src/markdup/md_args.h @@ -64,7 +64,7 @@ struct MarkDupsArg { int NUM_THREADS = 1; - size_t MAX_MEM = ((size_t)1) << 31; // 最小2G + size_t MAX_MEM = ((size_t)1) << 30; // << 31 // 最小2G bool DUPLEX_IO = true; // 同时读写 diff --git a/src/markdup/md_types.h b/src/markdup/md_types.h index 805843c..db5d31c 100644 --- a/src/markdup/md_types.h +++ b/src/markdup/md_types.h @@ -49,13 +49,22 @@ struct CalcKeyHash { /* 用来记录冗余索引相关的信息 */ struct DupInfo { - int64_t idx; - int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引 int16_t dupSet = 0; // dup set size + uint16_t repIdxHigh = 0; // 这一批冗余中的非冗余read 代表的索引 + uint32_t repIdxLow = 0; + int64_t idx; DupInfo() : DupInfo(-1, 0, 0) {} DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) {} - DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {} + DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), dupSet(dupSet_) { + repIdxHigh = repIdx_ >> 32; + repIdxLow = (uint32_t)repIdx_; + } + int64_t GetRepIdx() { + int64_t repIdx = repIdxHigh; + repIdx = (repIdx << 32) | repIdxLow; + return repIdx; + } bool operator<(const DupInfo &o) const { return idx < o.idx; } bool operator>(const DupInfo &o) const { return idx > o.idx; } operator int64_t() const { return idx; } @@ -110,8 +119,8 @@ struct UnpairedPosInfo { // typedef unordered_map UnpairedPositionMap; typedef tsl::robin_map UnpairedNameMap; // 以read name为索引,保存未匹配的pair read -typedef tsl::robin_map - UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量 +typedef tsl::robin_map UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量 +typedef map> CkeyReadEndsMap; // 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds /* 单线程处理冗余参数结构体 */ struct MarkDupDataArg { @@ -191,4 +200,36 @@ struct DupIdxQueue { } return len - popNum; } + + uint64_t RealSize() { + uint64_t len = 0; + auto preTop = minHeap.top(); + DupInfo dupIdx = this->Pop(); + DupInfo nextDup = dupIdx; + auto topIdx = minHeap.top(); + + ofstream ofs("dupn-noxyz.txt"); + + while (dupIdx != -1) { + ++len; + while (true) { + topIdx = minHeap.top(); + nextDup = this->Pop(); + if (nextDup != dupIdx) { + dupIdx = nextDup; + break; + } else { + cout << "the same dup: " << dupIdx << '\t' << preTop.arrId << '\t' << preTop.arrIdx << '\t' + << preTop.dupIdx << '\t' << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx + << endl; + } + } + //ofs << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; + ofs << topIdx.dupIdx << endl; + dupIdx = nextDup; + preTop = topIdx; + } + ofs.close(); + return len; + } }; \ No newline at end of file diff --git a/src/markdup/new_pipe.cpp b/src/markdup/new_pipe.cpp new file mode 100644 index 0000000..07b4e3a --- /dev/null +++ b/src/markdup/new_pipe.cpp @@ -0,0 +1,1017 @@ +#include "new_pipe.h" + +#include +#include +#include + +#include +#include +#include + +#include "dup_metrics.h" +#include "md_args.h" +#include "md_funcs.h" +#include "read_ends.h" +#include "read_name_parser.h" +#include "util/bam_buf.h" +#include "util/profiling.h" +#include "util/yarn.h" + +using std::vector; + +namespace nsgv { + +extern MarkDupsArg gMdArg; // 用来测试性能 +extern samFile *gInBamFp; // 输入的bam文件 +extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息 +extern DuplicationMetrics gMetrics; // 统计信息 +extern vector gNameParsers; +extern DupResult gDupRes; +extern PipelineArg gPipe; +}; // namespace nsgv + +/* 排序 */ +static inline void sortReadEndsArr(vector &arr) { + size_t blockSize = 64 * 1024; + if (arr.size() < blockSize) { + std::sort(arr.begin(), arr.end()); + return; + } + size_t blockNum = (arr.size() + blockSize - 1) / blockSize; + size_t crossNum = 1024; + size_t start, end, i, left, right; + std::sort(arr.begin(), arr.begin() + blockSize); + for (i = 1; i < blockNum; ++i) { + start = i * blockSize; + end = min(start + blockSize, arr.size()); + std::sort(arr.begin() + start, arr.begin() + end); + left = crossNum; + while (!(arr[start - left] < arr[start])) { + left = left << 1; + if (left >= blockSize) { + std::sort(arr.begin(), arr.end()); // 退化到普通排序 + return; + } + } + right = min(crossNum, end - start - 1); + + while (!(arr[start - 1] < arr[start + right])) { + right = min(right << 1, end - start - 1); + if (right == end - start - 1) + break; + } + std::sort(arr.begin() + start - left, arr.begin() + start + right); + } +} + +/* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ +static void markDupsForPairs(vector &vpRe, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { + if (vpRe.size() < 2) { + return; + } + + int maxScore = 0; + const ReadEnds *pBest = nullptr; + /** All read ends should have orientation FF, FR, RF, or RR **/ + for (auto pe : vpRe) { // 找分数最高的readend + if (pe->score > maxScore || pBest == nullptr) { + maxScore = pe->score; + pBest = pe; + } + } + + if (notDupIdx != nullptr) { + notDupIdx->insert(pBest->read1IndexInFile); + notDupIdx->insert(pBest->read2IndexInFile); + } + + if (nsgv::gMdArg.CHECK_OPTICAL_DUP) { // 检查光学冗余 + // trackOpticalDuplicates + vector prevOpticalRe; + if (notOpticalDupIdx != nullptr) { + for (auto pe : vpRe) { + if (pe->isOpticalDuplicate) { + prevOpticalRe.push_back(pe); + } + } + } + trackOpticalDuplicates(vpRe, pBest); + // 由于重叠问题,可能会更新数据 + if (notOpticalDupIdx != nullptr) { + for (auto pe : prevOpticalRe) { + if (!pe->isOpticalDuplicate) { + notOpticalDupIdx->insert(pe->read1IndexInFile); + notOpticalDupIdx->insert(pe->read2IndexInFile); + } + } + } + } + for (auto pe : vpRe) { // 对非best read标记冗余 + if (pe != pBest) { // 非best + if (pe->read1IndexInFile == 1165334 || pe->read1IndexInFile == 1165335) { + for (auto p : vpRe) { + cout << "zzh find in pairs " << p->read1IndexInFile << " " << p->score << endl; + } + // cout << "zzh find in pairs " << pe->read1IndexInFile << " " << (notOpticalDupIdx == nullptr) << endl; + } + dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 + if (pe->read2IndexInFile != pe->read1IndexInFile) + dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, + // 注意这里代表是read1的索引 + // 检查是否optical dup + if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) { + opticalDupIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + opticalDupIdx->insert(pe->read2IndexInFile); + } + } + } + // 在输出的bam文件中添加tag, best作为dupset的代表 + if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS) { + repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + if (notRepIdx != nullptr) { + for (auto pe : vpRe) { + if (pe != pBest) { + notRepIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + notRepIdx->insert(pe->read2IndexInFile); + } + } + } + } +} + +/* 处理一组非paired的readends,标记冗余 */ +static void markDupsForFrags(vector &vpRe, bool containsPairs, DPSet *dupIdx, + MDSet *notDupIdx = nullptr) { + if (containsPairs) { + for (auto pe : vpRe) { + if (!pe->IsPaired()) { + //if (pe->read1IndexInFile == 143876) { + // cout << "zzh find in frags" << endl; + //} + dupIdx->insert(pe->read1IndexInFile); + } + } + } else { + int maxScore = 0; + const ReadEnds *pBest = nullptr; + for (auto pe : vpRe) { + if (pe->score > maxScore || pBest == nullptr) { + maxScore = pe->score; + pBest = pe; + } + } + if (notDupIdx != nullptr) { + notDupIdx->insert(pBest->read1IndexInFile); + } + for (auto pe : vpRe) { + if (pe != pBest) { + //if (pe->read1IndexInFile == 143876) { + // cout << "zzh find in frags" << endl; + //} + dupIdx->insert(pe->read1IndexInFile); + } + } + } +} + +/* 找到与readend pos相等的所有readend */ +static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { + auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::PairsLittleThan); // 只比对位点 + dst->insert(dst->end(), range.first, range.second); +} + +#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread)) +#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread)) + +/* 处理pairs */ +static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { + // return; + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 + vpCache.push_back(&re); // 处理一个潜在的冗余组 + else { + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, + notRepIdx); // 不一样 + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + } + } + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); +} + +/* 处理frags */ +static void processFrags(vector &readEnds, DPSet *dupIdx, MDSet *notDupIdx = nullptr) { + bool containsPairs = false; + bool containsFrags = false; + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { + vpCache.push_back(&re); + containsPairs = containsPairs || re.IsPaired(); + containsFrags = containsFrags || !re.IsPaired(); + } else { + if (vpCache.size() > 1 && containsFrags) { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + containsPairs = re.IsPaired(); + containsFrags = !re.IsPaired(); + } + } + if (vpCache.size() > 1 && containsFrags) { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } +} + +/* 单线程markdup (第二步)*/ +static void markdups(MarkDupDataArg *arg) { + auto &p = *arg; + p.fragDupIdx.clear(); + p.pairDupIdx.clear(); + p.pairOpticalDupIdx.clear(); + p.pairRepIdx.clear(); + /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ + // 先处理 pair + processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx); + // 再处理frag + processFrags(p.frags, &p.fragDupIdx); +} + +/* 获取交叉部分的数据 */ +static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, + bool isPairCmp = false) { + if (leftArr.empty() || rightArr.empty()) { + return; + } + const size_t leftEndIdx = leftArr.size() - 1; + const size_t rightStartIdx = 0; + size_t leftSpan = 0; + size_t rightSpan = 0; + + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { + leftSpan += 1; + if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了(可能会有bug,上上个也与下一个有交集呢?) + leftSpan = leftArr.size() - 1; + break; + } + } + + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { + rightSpan += 1; + if (rightSpan == rightArr.size() - 1) // 同上,可能会有bug + break; + } + dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); + dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); + std::sort(dst->begin(), dst->end()); +} + +/* 将frags重叠部分的dup idx放进数据中 */ +static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬DupIdx, MarkDupDataArg *lastArg, + MarkDupDataArg *curArg) { + auto &lp = *lastArg; + auto &p = *curArg; + for (auto idx : dupIdx) { + lp.fragDupIdx.insert(idx); + p.fragDupIdx.erase(idx); + } + for (auto idx : notDupIdx) { + lp.fragDupIdx.erase(idx); + p.fragDupIdx.erase(idx); + } +} + +/* 最后合并数据并排序 */ +template +static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, + vector &cacheDupIdx1, vector &midArr1) { + // midArr.resize(0); + // cacheDupIdx.resize(0); + + vector cacheDupIdx; + vector midArr; + + cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); + std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); + + auto ai = dupArr.begin(); + auto ae = dupArr.end(); + auto bi = cacheDupIdx.begin(); + auto be = cacheDupIdx.end(); + + T val = 0; + while (ai != ae && bi != be) { + if (*ai < *bi) { + val = *ai++; + } else if (*bi < *ai) { + val = *bi++; + } else { + val = *bi++; // 相等的时候取后面的作为结果 + ai++; + } + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + while (ai != ae) { + val = *ai++; + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + while (bi != be) { + val = *bi++; + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + // spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); + // dupArr = midArr; + dupArr.clear(); + dupArr.assign(midArr.begin(), midArr.end()); + spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); +} + +// for step 2 generate read ends + +// multi-thread generate read ends +static void mtGenerateReadEnds(void *data, long idx, int tid) { + auto &p = *(PipelineArg *)data; + auto &rnParser = nsgv::gNameParsers[idx]; + int nThread = p.numThread; + auto &bams = p.readData.bams; + int64_t bamStartIdx = p.readData.bamStartIdx; + int64_t taskSeq = p.readData.taskSeq; + GenREData &genREData = p.genREData[p.genREOrder % p.GENBUFNUM]; + auto &pairs = genREData.pairsArr[idx]; + auto &frags = genREData.fragsArr[idx]; + auto &unpairedDic = genREData.unpairedDicArr[idx]; + + pairs.clear(); + frags.clear(); + unpairedDic.clear(); + + PROF_START(gen); + size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); + size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); + for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read + BamWrap *bw = bams[i]; + const int64_t bamIdx = bamStartIdx + i; + if (bw->GetReadUnmappedFlag()) { + if (bw->b->core.tid == -1) + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + break; + } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 + ReadEnds fragEnd; + buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); + frags.push_back(fragEnd); // 添加进frag集合 + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 + string key = bw->query_name(); + if (unpairedDic.find(key) == unpairedDic.end()) { + unpairedDic[key] = {taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + unpairedDic.erase(key); // 删除找到的pairend + } + } + } + } + PROF_END(tprof[TP_gen][tid], gen); + + PROF_START(sort_frag); + // sortReadEndsArr(frags); + sort(frags.begin(), frags.end()); + PROF_END(tprof[TP_sort_frag][tid], sort_frag); + + PROF_START(sort_pair); + sort(pairs.begin(), pairs.end()); + PROF_END(tprof[TP_sort_pair][tid], sort_pair); +} + +static void doGenRE(PipelineArg &pipeArg) { + // return; + GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; + ReadData &readData = pipeArg.readData; + + // generate read ends + const int numThread = pipeArg.numThread; + + kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); + // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 + // 轮询每个线程中未找到匹配的read,找到匹配的 + genREData.unpairedDic.clear(); + vector &pairs = genREData.pairsArr[numThread]; + pairs.clear(); + for (int i = 0; i < numThread; ++i) { + for (auto &val : genREData.unpairedDicArr[i]) { + const string &key = val.first; + const ReadEnds &fragEnd = val.second.unpairedRE; + if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) { + genREData.unpairedDic[key] = {readData.taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + genREData.unpairedDic.erase(key); // 删除找到的pairend + } + } + } + sort(pairs.begin(), pairs.end()); +} +// end for step 2 generate read ends + +// for step-3 sort +static void doSort(PipelineArg &pipeArg) { + // return; + GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; + SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; + SortMarkData &smd = *(SortMarkData *)sortData.dataPtr; + + smd.unpairedDic = genREData.unpairedDic; + + smd.pairs.clear(); + smd.frags.clear(); + const ReadEnds *pRE; + ReadEndsHeap pairsHeap, fragsHeap; + PROF_START(sort_pair); + pairsHeap.Init(&genREData.pairsArr); + while ((pRE = pairsHeap.Pop()) != nullptr) { + smd.pairs.push_back(*pRE); + } + PROF_END(gprof[GP_sort_pair], sort_pair); + PROF_START(sort_frag); + fragsHeap.Init(&genREData.fragsArr); + while ((pRE = fragsHeap.Pop()) != nullptr) { + smd.frags.push_back(*pRE); + } + PROF_END(gprof[GP_sort_frag], sort_frag); +} +// for step-4 sort +static void doMarkDup(PipelineArg &pipeArg) { + // return; + MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; + SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; + mdData.taskSeq = pipeArg.markDupOrder; + mdData.clear(); + + auto tmpPtr = mdData.dataPtr; + mdData.dataPtr = sortData.dataPtr; + sortData.dataPtr = tmpPtr; + + SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; + // 先处理 pair + PROF_START(markdup_pair); + processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx); + PROF_END(gprof[GP_markdup_pair], markdup_pair); + // 再处理frag + PROF_START(markdup_frag); + processFrags(smd.frags, &mdData.fragDupIdx); + PROF_END(gprof[GP_markdup_frag], markdup_frag); +} + +template +static void refreshDupIdx(T &srcArr, T &insArr) { + for (auto dup : srcArr) { + insArr.insert(dup); + } +} +template +static void refreshNotDupIdx(T1 &srcArr, T2 &delArr) { + for (auto dup : srcArr) { + delArr.erase(dup); + } +} + +static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + MarkDupData &lp) { + refreshDupIdx(dupIdx, lp.pairDupIdx); + refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx); + refreshDupIdx(repIdx, lp.pairRepIdx); + refreshNotDupIdx(notDupIdx, lp.pairDupIdx); + refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx); + refreshNotDupIdx(notRepIdx, lp.pairRepIdx); +} + +static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + MarkDupData &lp, MarkDupData &p) { + refreshDupIdx(dupIdx, lp.pairDupIdx); + refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx); + refreshDupIdx(repIdx, lp.pairRepIdx); + refreshNotDupIdx(notDupIdx, lp.pairDupIdx); + refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx); + refreshNotDupIdx(notRepIdx, lp.pairRepIdx); + refreshNotDupIdx(notDupIdx, p.pairDupIdx); + refreshNotDupIdx(notOpticalDupIdx, p.pairOpticalDupIdx); + refreshNotDupIdx(notRepIdx, p.pairRepIdx); + refreshNotDupIdx(dupIdx, p.pairDupIdx); + refreshNotDupIdx(opticalDupIdx, p.pairOpticalDupIdx); + refreshNotDupIdx(repIdx, p.pairRepIdx); +} + +// 处理相邻数据块之间重叠的部分 +static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) { + SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + + vector reArr; + DPSet dupIdx; + MDSet opticalDupIdx; + DPSet repIdx; + MDSet notOpticalDupIdx; + MDSet notDupIdx; + MDSet notRepIdx; + // 先处理重叠的frags + getIntersectData(lsm.frags, csm.frags, &reArr); + processFrags(reArr, &dupIdx, ¬DupIdx); + refreshDupIdx(dupIdx, lp.fragDupIdx); + refreshNotDupIdx(dupIdx, cp.fragDupIdx); + refreshNotDupIdx(notDupIdx, lp.fragDupIdx); + refreshNotDupIdx(notDupIdx, cp.fragDupIdx); + + // 再处理重叠的pairs + reArr.clear(); + dupIdx.clear(); + notDupIdx.clear(); + + getIntersectData(lsm.pairs, csm.pairs, &reArr, true); + processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx); + refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, lp, cp); +} + +// 在相邻的数据块之间寻找未匹配的readends, 找到匹配的放到lp里 +static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { + auto &interPairedData = lp.ckeyReadEndsMap; + SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + + for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // 遍历上一个任务中的每个未匹配的read + auto &lastUnpair = *itr; + auto &readName = lastUnpair.first; + auto &lastUnpairInfo = lastUnpair.second; + auto lastRE = lastUnpairInfo.unpairedRE; // 未匹配的read end + if (csm.unpairedDic.find(readName) != csm.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read + auto &curUnpairInfo = csm.unpairedDic[readName]; + auto &curRE = curUnpairInfo.unpairedRE; + // 在某些clip情况下,poskey可能是后面的read + int64_t lastPosKey = lastRE.posKey; + int64_t curPosKey = curRE.posKey; + modifyPairedEnds(curRE, &lastRE); // lastRE当做找到匹配后,完整的ReadEnds + CalcKey ck = {min(lastPosKey, curPosKey), max(lastPosKey, curPosKey)}; + auto &pairArr = interPairedData[ck]; + pairArr.push_back(lastRE); + // 从dict中清除配对后的readends + csm.unpairedDic.erase(readName); + itr = lsm.unpairedDic.erase(itr); + } else { + ++itr; + } + } +} + +// 在global和lp中寻找未匹配的readends, 找到匹配的放到global里 +static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { + auto &interPairedData = g.ckeyReadEndsMap; + SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + + for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // 遍历上一个任务中的每个未匹配的read + auto &lastUnpair = *itr; + auto &readName = lastUnpair.first; + auto &lastUnpairInfo = lastUnpair.second; + auto lastRE = lastUnpairInfo.unpairedRE; // 未匹配的read end + if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在global里找到了这个未匹配的read + auto &gUnpairInfo = g.unpairedDic[readName]; + auto &gRE = gUnpairInfo.unpairedRE; + // 在某些clip情况下,poskey可能是后面的read + int64_t lastPosKey = lastRE.posKey; + int64_t gPosKey = gRE.posKey; + modifyPairedEnds(lastRE, &gRE); // gRE当做找到匹配后,完整的ReadEnds + CalcKey ck = {min(lastPosKey, gPosKey), max(lastPosKey, gPosKey)}; + auto &pairArr = interPairedData[ck]; + pairArr.push_back(gRE); + // 从dict中清除配对后的readends + g.unpairedDic.erase(readName); + itr = lsm.unpairedDic.erase(itr); + } else { + ++itr; + } + } + for (auto &unPair : lsm.unpairedDic) { + g.unpairedDic.insert(unPair); + } +} + +static void putDupinfoToGlobal(IntersectData &g, MarkDupData &lp) { + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); +} + +// for step-5 handle intersect data +static void doIntersect(PipelineArg &pipeArg) { + // spdlog::info("intersect order: {}", pipeArg.intersectOrder); + // return; + // last, current, next + const int kInitIntersectOrder = 2; + IntersectData &g = pipeArg.intersectData; + MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 2) % pipeArg.MARKBUFNUM]; + MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; + MarkDupData &np = pipeArg.markDupData[(pipeArg.intersectOrder) % pipeArg.MARKBUFNUM]; + + SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + SortMarkData &nsm = *(SortMarkData *)np.dataPtr; + + // 处理相邻数据块之间重叠的部分 + if (pipeArg.intersectOrder == kInitIntersectOrder) { // 第一次运行,需要处理lp和cp + processIntersectFragPairs(lp, cp); + } + processIntersectFragPairs(cp, np); + + // 检查确保lp和np之间没有数据交叉 + int64_t lastRightPos = 0, curLeftPos = INT64_MAX, nextLeftPos = INT64_MAX; + if (lsm.frags.size() > 0) lastRightPos = lsm.frags.back().posKey; + if (csm.frags.size() > 0) curLeftPos = csm.frags[0].posKey; + if (nsm.frags.size() > 0) nextLeftPos = nsm.frags[0].posKey; + if (lastRightPos >= nextLeftPos) { + spdlog::error("previous data can not contain readends included by next data block! {} {}", lastRightPos, nextLeftPos); + } + + // 在相邻数据块之间查找之前未匹配的readends + if (pipeArg.intersectOrder == kInitIntersectOrder) { // 第一次运行,需要处理lp和cp + findUnpairedInDatas(lp, cp); + } + findUnpairedInDatas(lp, np); // 找到的匹配放到lp里 + findUnpairedInDatas(cp, np); // 找到的匹配放到cp里 + // 把lp中未匹配的放到global里保存 + findUnpairedInGlobal(g, lp); + + // 处理lp中的新找到的匹配 + TaskSeqDupInfo t; + for (auto &ckVal : lp.ckeyReadEndsMap) { + auto &ck = ckVal.first; + auto &pairArr = ckVal.second; + getEqualRE(pairArr[0], lsm.pairs, &pairArr); + getEqualRE(pairArr[0], csm.pairs, &pairArr); + // 在cp的ckeyReadEndsMap里找一下 + auto cpKeyItr = cp.ckeyReadEndsMap.find(ck); + if (cpKeyItr != cp.ckeyReadEndsMap.end()) { + auto &cpPairArr = cpKeyItr->second; + pairArr.insert(pairArr.end(), cpPairArr.begin(), cpPairArr.end()); + cp.ckeyReadEndsMap.erase(cpKeyItr); + } + sort(pairArr.begin(), pairArr.end()); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, + &t.notRepIdx); + } + // 处理找到匹配的global数据 + for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { + auto &ckVal = *itr; + auto &ck = ckVal.first; + if (ck.read2Pos < curLeftPos) { + auto &pairArr = ckVal.second; + sort(pairArr.begin(), pairArr.end()); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); + itr = g.ckeyReadEndsMap.erase(itr); + } else { + ++itr; + } + } + lp.ckeyReadEndsMap.clear(); + // 更新一下冗余结果 + refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); + // 处理g中新找到的匹配 + + putDupinfoToGlobal(g, lp); +} + +static void *pipeRead(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO); + inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); + int64_t readNumSum = 0; + while (1) { + PROF_START(read_wait); + yarn::POSSESS(pipeArg.readSig); + yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 1); // 只有一个坑,因为在bambuf内部支持异步读取 + PROF_END(gprof[GP_read_wait], read_wait); + size_t readNum = 0; + PROF_START(read); + if (inBamBuf.ReadStat() >= 0) + readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 + PROF_END(gprof[GP_read], read); + if (readNum < 1) { + pipeArg.readFinish = 1; + yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 + break; + } + spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); + + pipeArg.readData.bams = inBamBuf.GetBamArr(); + + pipeArg.readData.bamStartIdx = readNumSum; + pipeArg.readData.taskSeq = pipeArg.readOrder; + + readNumSum += readNum; + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + pipeArg.readOrder += 1; // for next + yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 + } + spdlog::info("total reads processed {}", readNumSum); + return 0; +} +static void *pipeGenRE(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + auto &genREData = pipeArg.genREData; + // init generate read ends data by num_thread + int genREThread = pipeArg.numThread; + for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { + genREData[i].Init(genREThread); + } + + while (1) { + PROF_START(gen_wait); + yarn::POSSESS(pipeArg.readSig); + yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 0); // 等待有数据 + yarn::POSSESS(pipeArg.genRESig); + PROF_END(gprof[GP_gen_wait], gen_wait); + + yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑 + yarn::RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放 + + if (pipeArg.readFinish) { + yarn::POSSESS(pipeArg.genRESig); + pipeArg.genREFinish = 1; + yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); + yarn::TWIST(pipeArg.readSig, yarn::BY, -1); + break; + } + /* 处理bam,生成readends */ + PROF_START(gen); + doGenRE(pipeArg); + PROF_END(gprof[GP_gen], gen); + + yarn::POSSESS(pipeArg.genRESig); + pipeArg.genREOrder += 1; + yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); + yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据 + } + return 0; +} +static void *pipeSort(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + + while (1) { + PROF_START(sort_wait); + yarn::POSSESS(pipeArg.genRESig); + yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, 0); // 等待有数据 + yarn::RELEASE(pipeArg.genRESig); + PROF_END(gprof[GP_sort_wait], sort_wait); + + yarn::POSSESS(pipeArg.sortSig); + yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置 + yarn::RELEASE(pipeArg.sortSig); + + if (pipeArg.genREFinish) { + // 处理完剩余数据 + while (pipeArg.sortOrder < pipeArg.genREOrder) { + PROF_START(sort); + doSort(pipeArg); + PROF_END(gprof[GP_sort], sort); + pipeArg.sortOrder += 1; + } + yarn::POSSESS(pipeArg.sortSig); + pipeArg.sortFinish = 1; + yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); + break; + } + /* 排序 readends */ + PROF_START(sort); + doSort(pipeArg); + PROF_END(gprof[GP_sort], sort); + + yarn::POSSESS(pipeArg.genRESig); + yarn::TWIST(pipeArg.genRESig, yarn::BY, -1); + + yarn::POSSESS(pipeArg.sortSig); + pipeArg.sortOrder += 1; + yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); + } + return 0; +} +static void *pipeMarkDup(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + + while (1) { + PROF_START(markdup_wait); + yarn::POSSESS(pipeArg.sortSig); + yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, 0); // 等待有数据 + yarn::RELEASE(pipeArg.sortSig); + PROF_END(gprof[GP_markdup_wait], markdup_wait); + + yarn::POSSESS(pipeArg.markDupSig); + yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); + yarn::RELEASE(pipeArg.markDupSig); + + if (pipeArg.sortFinish) { + // 应该还得处理剩余的数据 + while (pipeArg.markDupOrder < pipeArg.sortOrder) { + PROF_START(markdup); + doMarkDup(pipeArg); + PROF_END(gprof[GP_markdup], markdup); + pipeArg.markDupOrder += 1; + } + yarn::POSSESS(pipeArg.markDupSig); + pipeArg.markDupFinish = 1; + yarn::TWIST(pipeArg.markDupSig, yarn::BY, 2); + break; + } + /* 冗余检测 readends */ + PROF_START(markdup); + doMarkDup(pipeArg); + PROF_END(gprof[GP_markdup], markdup); + yarn::POSSESS(pipeArg.sortSig); + yarn::TWIST(pipeArg.sortSig, yarn::BY, -1); + + yarn::POSSESS(pipeArg.markDupSig); + pipeArg.markDupOrder += 1; + yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); + } + return 0; +} +static void *pipeIntersect(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + pipeArg.intersectOrder = 2; + while (1) { + PROF_START(intersect_wait); + yarn::POSSESS(pipeArg.markDupSig); + yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, 2); // 等待有数据 + yarn::RELEASE(pipeArg.markDupSig); + PROF_END(gprof[GP_intersect_wait], intersect_wait); + + if (pipeArg.markDupFinish) { + while (pipeArg.intersectOrder < pipeArg.markDupOrder) { + PROF_START(intersect); + doIntersect(pipeArg); + PROF_END(gprof[GP_intersect], intersect); + pipeArg.intersectOrder += 1; + } + break; + } + /* 交叉数据处理 readends */ + PROF_START(intersect); + doIntersect(pipeArg); + PROF_END(gprof[GP_intersect], intersect); + + yarn::POSSESS(pipeArg.markDupSig); + yarn::TWIST(pipeArg.markDupSig, yarn::BY, -1); + + pipeArg.intersectOrder += 1; + } + return 0; +} + +/* 当所有任务结束后,global data里还有未处理的数据 */ +static void processLastData(PipelineArg &pipeArg) { + IntersectData &g = pipeArg.intersectData; + MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 2) % pipeArg.MARKBUFNUM]; + MarkDupData &cp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; + + SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + + spdlog::info("last data size: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); + + // 把lp中未匹配的放到global里保存 + findUnpairedInGlobal(g, lp); + findUnpairedInGlobal(g, cp); + + // 处理lp中的新找到的匹配 + TaskSeqDupInfo t; + for (auto &ckVal : lp.ckeyReadEndsMap) { + auto &ck = ckVal.first; + auto &pairArr = ckVal.second; + getEqualRE(pairArr[0], lsm.pairs, &pairArr); + getEqualRE(pairArr[0], csm.pairs, &pairArr); + sort(pairArr.begin(), pairArr.end()); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); + } + // 处理找到匹配的global数据 + for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { + auto &ckVal = *itr; + auto &ck = ckVal.first; + auto &pairArr = ckVal.second; + sort(pairArr.begin(), pairArr.end()); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); + itr = g.ckeyReadEndsMap.erase(itr); + } + // 更新一下冗余结果 + refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); + // 处理g中新找到的匹配 + + putDupinfoToGlobal(g, lp); + putDupinfoToGlobal(g, cp); + + spdlog::info("last data size: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); +} + +static void parallelPipeline() { + PipelineArg pipeArg(&nsgv::gDupRes); + // PipelineArg &pipeArg = nsgv::gPipe; + pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; + + pthread_t tidArr[5]; + pthread_create(&tidArr[0], 0, pipeRead, &pipeArg); + pthread_create(&tidArr[1], 0, pipeGenRE, &pipeArg); + pthread_create(&tidArr[2], 0, pipeSort, &pipeArg); + pthread_create(&tidArr[3], 0, pipeMarkDup, &pipeArg); + pthread_create(&tidArr[4], 0, pipeIntersect, &pipeArg); + for (int i = 0; i < 5; ++i) pthread_join(tidArr[i], 0); + PROF_START(merge_result); + processLastData(pipeArg); + PROF_END(gprof[GP_merge_result], merge_result); + + spdlog::info("pipeArg size : {}", pipeArg.byteSize()); + + size_t repNum = 0; + for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); + spdlog::info("rep num : {}", repNum); + + spdlog::info("result size : {}", nsgv::gDupRes.byteSize()); +} + +/* 并行流水线方式处理数据,标记冗余 */ +void NewPipeMarkDups() { + if (nsgv::gMdArg.NUM_THREADS > 1) + return parallelPipeline(); + + PipelineArg pipeArg(&nsgv::gDupRes); + pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; + BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO); + inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); + int64_t readNumSum = 0; + for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { + pipeArg.genREData[i].Init(pipeArg.numThread); + } + pipeArg.intersectOrder = 1; // do intersect 从1开始 + while (1) { + size_t readNum = 0; + if (inBamBuf.ReadStat() >= 0) + readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 + if (readNum < 1) { + break; + } + spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); + + pipeArg.readData.bams = inBamBuf.GetBamArr(); + pipeArg.readData.bamStartIdx = readNumSum; + pipeArg.readData.taskSeq = pipeArg.readOrder; + // 1. do generate read ends + doGenRE(pipeArg); + pipeArg.genREOrder += 1; + // 2. do sort + doSort(pipeArg); + pipeArg.sortOrder += 1; + // 3. do markduplicate + doMarkDup(pipeArg); + pipeArg.markDupOrder += 1; + // 4. do intersect data + if (pipeArg.markDupOrder > 1) { + doIntersect(pipeArg); + pipeArg.intersectOrder += 1; + } + + readNumSum += readNum; + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + pipeArg.readOrder += 1; // for next + } + processLastData(pipeArg); +} \ No newline at end of file diff --git a/src/markdup/new_pipe.h b/src/markdup/new_pipe.h new file mode 100644 index 0000000..6e098b3 --- /dev/null +++ b/src/markdup/new_pipe.h @@ -0,0 +1,264 @@ +#pragma once + +#include +#include +#include + +#include "md_types.h" + +struct ReadData { + vector bams; // read step output + int64_t bamStartIdx = 0; // 每轮读入的bam数组,起始位置在全局bam中的索引 + int64_t taskSeq = 0; // 任务序号 +}; + +struct GenREData { + vector> pairsArr; // 成对的reads + vector> fragsArr; // 暂未找到配对的reads + vector unpairedDicArr; // 用来寻找pair end + void Init(int nThread) { + for (int i = 0; i <= nThread; ++i) { // 比线程多一个,主要是pairs多一个,其他没用 + pairsArr.push_back(vector()); + fragsArr.push_back(vector()); + unpairedDicArr.push_back(UnpairedNameMap()); + } + } + UnpairedNameMap unpairedDic; // 代替sort step中一部分计算 + size_t byteSize() { + size_t bytes = 0; + for (auto &v : pairsArr) + for (auto &r : v) bytes += sizeof(r); + for (auto &v : fragsArr) + for (auto &r : v) bytes += sizeof(r); + for (auto &m : unpairedDicArr) bytes += m.size() * 100; + bytes += unpairedDic.size() * 100; + return bytes; + } +}; + +struct SortMarkData { + vector pairs; // 成对的reads + vector frags; // 暂未找到配对的reads + UnpairedNameMap unpairedDic; // 用来寻找pair end + size_t byteSize() { + size_t bytes = 0; + for (auto &r : pairs) bytes += sizeof(r); + for (auto &r : frags) bytes += sizeof(r); + bytes += unpairedDic.size() * 100; + return bytes; + } +}; + +struct SortData { + volatile void *dataPtr; // SortMarkData pointer +}; + +struct MarkDupData { + int64_t taskSeq = 0; // 任务序号 + DPSet pairDupIdx; // pair的冗余read的索引 + MDSet pairOpticalDupIdx; // optical冗余read的索引 + DPSet fragDupIdx; // frag的冗余read的索引 + DPSet pairRepIdx; // pair的dupset代表read的索引 + CkeyReadEndsMap ckeyReadEndsMap; + + volatile void *dataPtr; // SortMarkData pointer + + void clear() { + fragDupIdx.clear(); + pairDupIdx.clear(); + pairOpticalDupIdx.clear(); + pairRepIdx.clear(); + ckeyReadEndsMap.clear(); + } + + size_t byteSize() { + size_t bytes = 0; + bytes += pairDupIdx.size() * 100; + bytes += pairOpticalDupIdx.size() * 100; + bytes += fragDupIdx.size() * 100; + bytes += pairRepIdx.size() * 100; + return bytes; + } +}; + +struct DupResult { + vector> dupIdxArr; + vector> opticalDupIdxArr; + vector> repIdxArr; + size_t byteSize() { + size_t bytes = 0; + size_t tmp = 0; + for (auto &v : dupIdxArr) + for (auto &r : v) tmp += sizeof(r); + spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + tmp = 0; + for (auto &v : opticalDupIdxArr) + for (auto &r : v) tmp += sizeof(r); + spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + tmp = 0; + for (auto &v : repIdxArr) + for (auto &r : v) tmp += sizeof(r); + spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); + + return bytes; + } +}; + +struct IntersectData { + UnpairedNameMap unpairedDic; // 用来寻找pair end + CkeyReadEndsMap ckeyReadEndsMap; + + // 每个task对应一个vector + vector> &dupIdxArr; + vector> &opticalDupIdxArr; + vector> &repIdxArr; + + IntersectData(DupResult *resPtr) + : dupIdxArr(resPtr->dupIdxArr), opticalDupIdxArr(resPtr->opticalDupIdxArr), repIdxArr(resPtr->repIdxArr) {} + + size_t byteSize() { + size_t bytes = 0; + bytes += unpairedDic.size() * 100; + for (auto &v : dupIdxArr) + for (auto &r : v) bytes += sizeof(r); + for (auto &v : opticalDupIdxArr) + for (auto &r : v) bytes += sizeof(r); + for (auto &v : repIdxArr) + for (auto &r : v) bytes += sizeof(r); + spdlog::info("result size : {}", bytes); + + return bytes; + } +}; + +// 记录流水线状态,task的序号,以及某阶段是否结束 +struct PipelineArg { + static const int GENBUFNUM = 2; + static const int SORTBUFNUM = 2; + static const int MARKBUFNUM = 4; + uint64_t readOrder = 0; + uint64_t genREOrder = 0; + uint64_t sortOrder = 0; + uint64_t markDupOrder = 0; + uint64_t intersectOrder = 0; + int numThread = 0; + + volatile int readFinish = 0; + volatile int genREFinish = 0; + volatile int sortFinish = 0; + volatile int markDupFinish = 0; + + yarn::lock_t *readSig; + yarn::lock_t *genRESig; + yarn::lock_t *sortSig; + yarn::lock_t *markDupSig; + + PipelineArg(DupResult *resPtr) : intersectData(resPtr) { + readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明 + genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer + sortSig = yarn::NEW_LOCK(0); + markDupSig = yarn::NEW_LOCK(0); + for (int i = 0; i < SORTBUFNUM; ++i) { + sortData[i].dataPtr = &sortMarkData[i]; + } + for (int i = 0; i < MARKBUFNUM; ++i) { + markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM]; + } + } + + SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM]; + + // for step-1 read + ReadData readData; + // for step-2 generate readends + GenREData genREData[GENBUFNUM]; + // for step-3 sort each thread frags and pairs + SortData sortData[SORTBUFNUM]; + // for step-4 mark duplicate + MarkDupData markDupData[MARKBUFNUM]; + // for step-5 deal with intersect data + IntersectData intersectData; + + size_t byteSize() { + size_t bytes = 0; + + size_t tmp = 0; + for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize(); + bytes += tmp; + spdlog::info("sortMarkData size : {}", tmp); + for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize(); + bytes += tmp; + spdlog::info("genREData size : {}", tmp); + for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize(); + bytes += tmp; + spdlog::info("markDupData size : {}", tmp); + tmp += intersectData.byteSize(); + bytes += tmp; + spdlog::info("intersectData size : {}", tmp); + + return bytes; + } +}; + +struct REArrIdIdx { + int arrId = 0; // 数组索引 + uint64_t arrIdx = 0; // 数组内部当前索引 + const ReadEnds *pE = nullptr; +}; + +struct REGreaterThan { + bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; } +}; + +struct ReadEndsHeap { + // 将冗余索引和他对应的task vector对应起来 + vector> *arr2d; + priority_queue, REGreaterThan> minHeap; + uint64_t popNum = 0; + + int Init(vector> *_arr2d) { + arr2d = _arr2d; + if (arr2d == nullptr) { + return -1; + } + for (int i = 0; i < arr2d->size(); ++i) { + auto &v = (*arr2d)[i]; + if (!v.empty()) { + minHeap.push({i, 1, &v[0]}); + } + } + return 0; + } + + const ReadEnds *Pop() { + const ReadEnds *ret = nullptr; + if (!minHeap.empty()) { + auto minVal = minHeap.top(); + minHeap.pop(); + ++popNum; + ret = minVal.pE; + auto &v = (*arr2d)[minVal.arrId]; + if (v.size() > minVal.arrIdx) { + minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]}); + } + } + return ret; + } + + uint64_t Size() { + uint64_t len = 0; + if (arr2d != nullptr) { + for (auto &v : *arr2d) { + len += v.size(); + } + } + return len - popNum; + } +}; + +// 并行运行mark duplicate +void NewPipeMarkDups(); \ No newline at end of file diff --git a/src/markdup/pipeline_md.cpp b/src/markdup/pipeline_md.cpp index 4c2424c..a1e9fd4 100644 --- a/src/markdup/pipeline_md.cpp +++ b/src/markdup/pipeline_md.cpp @@ -26,8 +26,8 @@ extern samFile *gInBamFp; // 输入的bam文件 extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息 extern DuplicationMetrics gMetrics; // 统计信息 extern vector gNameParsers; +extern DupResult gDupRes; extern PipelineArg gPipe; - }; // namespace nsgv /* 排序 */ @@ -304,9 +304,13 @@ static void refeshTaskDupInfo(DPSet &dupIdx, MDSet &opticalDup /* 最后合并数据并排序 */ template static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, - vector &cacheDupIdx, vector &midArr) { - midArr.resize(0); - cacheDupIdx.resize(0); + vector &cacheDupIdx1, vector &midArr1) { + //midArr.resize(0); + //cacheDupIdx.resize(0); + + vector cacheDupIdx; + vector midArr; + cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); @@ -341,7 +345,11 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupI midArr.push_back(val); } } - dupArr = midArr; + // spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); + //dupArr = midArr; + dupArr.clear(); + dupArr.assign(midArr.begin(), midArr.end()); + spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); } // for step 2 generate read ends @@ -1003,7 +1011,7 @@ static void mergeAllTask(PipelineArg &pipeArg) { g.unpairedPosArr.clear(); PROF_END(gprof[GP_merge_markdup], merge_markdup); - // 将dupidx放进全局数据 +// // 将dupidx放进全局数据 PROF_START(merge_update); vector cacheDupIdx; vector midArr; @@ -1039,7 +1047,8 @@ static void mergeAllTask(PipelineArg &pipeArg) { } static void parallelPipeline() { - PipelineArg &pipeArg = nsgv::gPipe; + PipelineArg pipeArg(&nsgv::gDupRes); + //PipelineArg &pipeArg = nsgv::gPipe; pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; pthread_t tidArr[5]; @@ -1052,6 +1061,14 @@ static void parallelPipeline() { PROF_START(merge_result); mergeAllTask(pipeArg); PROF_END(gprof[GP_merge_result], merge_result); + + spdlog::info("pipeArg size : {}", pipeArg.byteSize()); + + size_t repNum = 0; + for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); + spdlog::info("rep num : {}", repNum); + + spdlog::info("result size : {}", nsgv::gDupRes.byteSize()); } /* 并行流水线方式处理数据,标记冗余 */ @@ -1059,7 +1076,7 @@ void pipelineMarkDups() { if (nsgv::gMdArg.NUM_THREADS > 1) return parallelPipeline(); - PipelineArg &pipeArg = nsgv::gPipe; + PipelineArg pipeArg(&nsgv::gDupRes); pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); diff --git a/src/markdup/pipeline_md.h b/src/markdup/pipeline_md.h index ba925af..f5700d3 100644 --- a/src/markdup/pipeline_md.h +++ b/src/markdup/pipeline_md.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "md_types.h" @@ -24,6 +25,15 @@ struct GenREData { } UnpairedNameMap unpairedDic; // 代替sort step中一部分计算 UnpairedPositionMap unpairedPosArr; // + size_t byteSize() { + size_t bytes = 0; + for (auto &v : pairsArr) for (auto &r : v) bytes += sizeof(r); + for (auto &v : fragsArr) for (auto &r : v) bytes += sizeof(r); + for (auto &m : unpairedDicArr) bytes += m.size() * 100; + bytes += unpairedDic.size() * 100; + bytes += unpairedPosArr.size() * 1000; + return bytes; + } }; struct SortMarkData { @@ -31,6 +41,14 @@ struct SortMarkData { vector frags; // 暂未找到配对的reads UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 + size_t byteSize() { + size_t bytes = 0; + for (auto &r : pairs) bytes += sizeof(r); + for (auto &r : frags) bytes += sizeof(r); + bytes += unpairedDic.size() * 100; + bytes += unpairedPosArr.size() * 1000; + return bytes; + } }; struct SortData { @@ -45,6 +63,40 @@ struct MarkDupData { DPSet pairRepIdx; // pair的dupset代表read的索引 volatile void *dataPtr; // SortMarkData pointer + + size_t byteSize() { + size_t bytes = 0; + bytes += pairDupIdx.size() * 100; + bytes += pairOpticalDupIdx.size() * 100; + bytes += fragDupIdx.size() * 100; + bytes += pairRepIdx.size() * 100; + return bytes; + } +}; + +struct DupResult { + vector> dupIdxArr; + vector> opticalDupIdxArr; + vector> repIdxArr; + size_t byteSize() { + size_t bytes = 0; + size_t tmp = 0; + for (auto &v : dupIdxArr) for (auto &r : v) tmp += sizeof(r); + spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + tmp = 0; + for (auto &v : opticalDupIdxArr) + for (auto &r : v) tmp += sizeof(r); + spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + tmp = 0; + for (auto &v : repIdxArr) for (auto &r : v) tmp += sizeof(r); + spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); + bytes += tmp; + spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); + + return bytes; + } }; struct IntersectData { @@ -52,9 +104,9 @@ struct IntersectData { UnpairedPositionMap unpairedPosArr; // 每个task对应一个vector - vector> dupIdxArr; - vector> opticalDupIdxArr; - vector> repIdxArr; + vector> &dupIdxArr; + vector> &opticalDupIdxArr; + vector> &repIdxArr; // 用来存放后续计算的数据 vector> latterDupIdxArr; @@ -63,6 +115,31 @@ struct IntersectData { vector> latterNotDupIdxArr; vector> latterNotOpticalDupIdxArr; vector> latterNotRepIdxArr; + + IntersectData(DupResult *resPtr) : + dupIdxArr(resPtr->dupIdxArr), + opticalDupIdxArr(resPtr->opticalDupIdxArr), + repIdxArr(resPtr->repIdxArr) + {} + + size_t byteSize() { + size_t bytes = 0; + bytes += unpairedDic.size() * 100; + bytes += unpairedPosArr.size() * 1000; + for (auto &v : dupIdxArr) for (auto &r : v) bytes += sizeof(r); + for (auto &v : opticalDupIdxArr) for (auto &r : v) bytes += sizeof(r); + for (auto &v : repIdxArr) for (auto &r : v) bytes += sizeof(r); + spdlog::info("result size : {}", bytes); + + for (auto &s : latterDupIdxArr) bytes += s.size() * sizeof(DupInfo); + for (auto &s : latterOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo); + for (auto &s : latterRepIdxArr) bytes += s.size() * sizeof(DupInfo); + for (auto &s : latterNotDupIdxArr) bytes += s.size() * sizeof(DupInfo); + for (auto &s : latterNotOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo); + for (auto &s : latterNotRepIdxArr) bytes += s.size() * sizeof(DupInfo); + + return bytes; + } }; // 记录流水线状态,task的序号,以及某阶段是否结束 @@ -87,7 +164,7 @@ struct PipelineArg { yarn::lock_t *sortSig; yarn::lock_t *markDupSig; - PipelineArg() { + PipelineArg(DupResult *resPtr) : intersectData(resPtr) { readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明 genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer sortSig = yarn::NEW_LOCK(0); @@ -112,6 +189,26 @@ struct PipelineArg { MarkDupData markDupData[MARKBUFNUM]; // for step-5 deal with intersect data IntersectData intersectData; + + size_t byteSize() { + size_t bytes = 0; + + size_t tmp = 0; + for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize(); + bytes += tmp; + spdlog::info("sortMarkData size : {}", tmp); + for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize(); + bytes += tmp; + spdlog::info("genREData size : {}", tmp); + for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize(); + bytes += tmp; + spdlog::info("markDupData size : {}", tmp); + tmp += intersectData.byteSize(); + bytes += tmp; + spdlog::info("intersectData size : {}", tmp); + + return bytes; + } }; struct REArrIdIdx { diff --git a/src/markdup/read_ends.h b/src/markdup/read_ends.h index a1f68d4..cb6773a 100644 --- a/src/markdup/read_ends.h +++ b/src/markdup/read_ends.h @@ -78,7 +78,7 @@ struct ReadEnds : PhysicalLocation { /* zzh增加的成员变量 */ int64_t posKey = -1; // 根据位置信息生成的关键字 return (int64_t)tid << - // MAX_CONTIG_LEN_SHIFT | (int64_t)pos; + // MAX_CONTIG_LEN_SHIFT | (int64_t)pos; (包含clip的序列,也就是可能比map结果更偏左) /* 用来做一些判断,因为一些readends会做多次操作,比如task之间有重叠等等 */ int oprateTime = 0; @@ -148,12 +148,12 @@ struct ReadEnds : PhysicalLocation { comp = read2ReferenceIndex - o.read2ReferenceIndex; if (comp == 0) comp = read2Coordinate - o.read2Coordinate; - if (comp == 0) - comp = tile - o.tile; - if (comp == 0) - comp = x - o.x; - if (comp == 0) - comp - y - o.y; + //if (comp == 0) + // comp = tile - o.tile; + //if (comp == 0) + // comp = x - o.x; + //if (comp == 0) + // comp - y - o.y; if (comp == 0) comp = (int)(read1IndexInFile - o.read1IndexInFile); if (comp == 0)