diff --git a/.vscode/launch.json b/.vscode/launch.json index 1e487c1..0275f53 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -16,7 +16,7 @@ "--INPUT", "~/data/bam/100w.sam", "--OUTPUT", "./out.sam", "--METRICS_FILE", "metrics.txt", - "--num_threads", "1", + "--num_threads", "2", "--max_mem", "2M", "--verbosity", "DEBUG", "--asyncio", "true", diff --git a/.vscode/settings.json b/.vscode/settings.json index cc7054c..bc40fd8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -102,6 +102,7 @@ "stack": "cpp", "__bit_reference": "cpp", "__string": "cpp", - "filesystem": "cpp" + "filesystem": "cpp", + "ios": "cpp" } } \ No newline at end of file diff --git a/run.sh b/run.sh index 4e8d418..dcd8b97 100755 --- a/run.sh +++ b/run.sh @@ -6,26 +6,28 @@ nthread=32 #nthread=64 #nthread=128 -#input=~/data/bam/zy_normal.bam -input=~/data/bam/zy_tumor.bam +#input=/home/zzh/data/bam/zy_normal.bam +#input=/home/zzh/data/bam/zy_tumor.bam +input=/home/zzh/data/wgs_na12878.bam #input=~/data/bam/100w.bam #input=~/data/bam/t100w.sam #input=~/data/bam/1k.sam #input=~/data/bam/1kw.sam #input=~/data/bam/1kw.bam #input=~/data/bam/n1kw.sam +output=/home/zzh/data1/na12878_out.bam cd ./build/ && make -j 8 && cd .. #./out.sam \ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ MarkDuplicates \ --INPUT $input \ - --OUTPUT ./out.bam \ + --OUTPUT $output \ --INDEX_FORMAT BAI \ --num_threads $nthread \ --max_mem 2G \ --verbosity DEBUG \ - --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null # + --asyncio true -TAG_DUPLICATE_SET_MEMBERS true --READ_NAME_REGEX null # #--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX "" #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" #--READ_NAME_REGEX "" diff --git a/src/common/utils/yarn.h b/src/common/utils/yarn.h index ab5d31e..519ff3b 100644 --- a/src/common/utils/yarn.h +++ b/src/common/utils/yarn.h @@ -104,6 +104,7 @@ may exit to abort the application, or if it returns, the yarn error handler will exit (set to NULL by default for no action) */ +#pragma once extern char *yarn_prefix; extern void (*yarn_abort)(int); @@ -135,5 +136,6 @@ enum wait_op { void wait_for_(lock_t *, enum wait_op, long, char const *, long); #define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__) long peek_lock(lock_t *); +#define PEEK_LOCK(a) peek_lock(a) void free_lock_(lock_t *, char const *, long); #define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__) diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index e9e5194..1351e4c 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -22,15 +22,16 @@ Date : 2023/10/23 #include #include #include +#include #include #include #include -#include #include "dup_metrics.h" #include "markdups_arg.h" #include "md_funcs.h" #include "parallel_md.h" +#include "pipeline_md.h" #include "serial_md.h" #include "shared_args.h" @@ -176,12 +177,14 @@ int MarkDuplicates(int argc, char *argv[]) { hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); /* 冗余检查和标记 */ - if (g_gArg.num_threads == 1) { - // serialMarkDups(); // 串行运行 - parallelMarkDups(); // 并行运行 - } else { - parallelMarkDups(); // 并行运行 - } +// if (g_gArg.num_threads == 1) { +// // serialMarkDups(); // 串行运行 +// parallelMarkDups(); // 并行运行 +// } else { +// parallelMarkDups(); // 并行运行 +// } + pipelineMarkDups(); + //parallelMarkDups(); /* 标记冗余, 将处理后的结果写入文件 */ sam_close(g_inBamFp); // 重新打开bam文件 diff --git a/src/sam/markdups/md_types.h b/src/sam/markdups/md_types.h index 492c691..426e882 100644 --- a/src/sam/markdups/md_types.h +++ b/src/sam/markdups/md_types.h @@ -136,13 +136,16 @@ struct ParallelData { vector frags; // 暂未找到配对的reads UnpairedNameMap unpairedDic; // 用来寻找pair end }; + struct ParallelDataArg { vector threadData; vector*> pairs2d; // 成对的reads vector*> frags2d; // 暂未找到配对的reads MarkDupDataArg *mdArg; + int numThread; void Init(int nThread) { - for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组 + numThread = nThread; + for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组 threadData.push_back(ParallelData()); } } diff --git a/src/sam/markdups/parallel_md.cpp b/src/sam/markdups/parallel_md.cpp index 9519f15..d8ed332 100644 --- a/src/sam/markdups/parallel_md.cpp +++ b/src/sam/markdups/parallel_md.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -264,18 +265,61 @@ static void threadGenerateReadEnds(void *data, long idx, int tid) { // cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl; } +static void *threadSortParallelPairs(void *data) { + auto &pd = *(ParallelDataArg *)data; + vector &pairs = pd.threadData[pd.numThread].pairs; + sort(pairs.begin(), pairs.end()); + // 合并所有pairs + pd.pairs2d.clear(); + for (int i = 0; i < pd.numThread; ++i) { + pd.pairs2d.push_back(&pd.threadData[i].pairs); + } + pd.pairs2d.push_back(&pd.threadData[pd.numThread].pairs); + ReadEndsQueue pairsQue; + pairsQue.Init(&pd.pairs2d); + const ReadEnds *pRE; + while ((pRE = pairsQue.Pop()) != nullptr) { + pd.mdArg->pairs.push_back(*pRE); + } + return nullptr; +} + +static void *threadSortParallelFrags(void *data) { + auto &pd = *(ParallelDataArg *)data; + ReadEndsQueue fragsQue; + const ReadEnds *pRE; + pd.frags2d.clear(); + for (int i = 0; i < pd.numThread; ++i) { + pd.frags2d.push_back(&pd.threadData[i].frags); + } + fragsQue.Init(&pd.frags2d); + while ((pRE = fragsQue.Pop()) != nullptr) { + pd.mdArg->frags.push_back(*pRE); + } + return nullptr; +} + +static void *threadCreateUnpairInfo(void *data) { + auto &pd = *(ParallelDataArg *)data; + auto &p = *pd.mdArg; + for (auto &e : p.unpairedDic) { + auto posKey = e.second.unpairedRE.posKey; + auto &unpairArrInfo = p.unpairedPosArr[posKey]; + unpairArrInfo.unpairedNum++; + unpairArrInfo.taskSeq = p.taskSeq; + unpairArrInfo.readNameSet.insert(e.first); + } + return nullptr; +} + /* 单线程生成readends (第一步)*/ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { auto &p = *arg; - auto &rnParser = g_vRnParser[0]; - p.pairs.clear(); p.frags.clear(); p.unpairedDic.clear(); p.unpairedPosArr.clear(); -// cout << "read num: " << p.bams.size() << endl; - pdArg.mdArg = arg; tm_arr[8].acc_start(); kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads); @@ -283,7 +327,7 @@ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { // 合并各个线程数据 // 查找未匹配的frags - + tm_arr[9].acc_start(); vector &pairs = pdArg.threadData[g_gArg.num_threads].pairs; pairs.clear(); for (int i = 0; i < g_gArg.num_threads; ++i) { @@ -300,61 +344,21 @@ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { } } } - - - - sort(pairs.begin(), pairs.end()); - - // 合并所有frags和pairs - pdArg.frags2d.clear(); - pdArg.pairs2d.clear(); - for (int i = 0; i < g_gArg.num_threads; ++i) { - pdArg.frags2d.push_back(&pdArg.threadData[i].frags); - pdArg.pairs2d.push_back(&pdArg.threadData[i].pairs); - } - - if (false) { - cout << "taskseq: " << p.taskSeq << endl; - for (int i = 0; i < g_gArg.num_threads; ++i) { - cout << "thread pair num: " << pdArg.threadData[i].pairs.size() << endl; - cout << "thread frag num: " << pdArg.threadData[i].frags.size() << endl; - cout << "thread dic num: " << pdArg.threadData[i].unpairedDic.size() << endl; - } - } - - pdArg.pairs2d.push_back(&pdArg.threadData[g_gArg.num_threads].pairs); - - ReadEndsQueue fragsQue, pairsQue; - fragsQue.Init(&pdArg.frags2d); - pairsQue.Init(&pdArg.pairs2d); - - tm_arr[9].acc_start(); - const ReadEnds *pRE; - while((pRE = fragsQue.Pop()) != nullptr) { - p.frags.push_back(*pRE); - //cout << pRE->posKey << endl; - } tm_arr[9].acc_end(); - - tm_arr[10].acc_start(); - while ((pRE = pairsQue.Pop()) != nullptr) { - p.pairs.push_back(*pRE); - //cout << pRE->posKey << endl; - } - // 记录位点上的未匹配的read个数 - for (auto &e : p.unpairedDic) { - auto posKey = e.second.unpairedRE.posKey; - auto &unpairArrInfo = p.unpairedPosArr[posKey]; - unpairArrInfo.unpairedNum++; - unpairArrInfo.taskSeq = p.taskSeq; - unpairArrInfo.readNameSet.insert(e.first); - } - tm_arr[10].acc_end(); - // cout << "frags num: " << p.frags.size() << endl; - // cout << "pairs num: " << p.pairs.size() << endl; - // cout << "dic num : " << p.unpairedDic.size() << endl; - // cout << "generate end" << endl; + + tm_arr[11].acc_start(); + pthread_t tid; + pthread_create(&tid, nullptr, threadSortParallelFrags, &pdArg); +// threadSortParallelFrags(&pdArg); +// tm_arr[12].acc_start(); + threadSortParallelPairs(&pdArg); +// tm_arr[12].acc_end(); +// tm_arr[13].acc_start(); + threadCreateUnpairInfo(&pdArg); +// tm_arr[13].acc_end(); + pthread_join(tid, nullptr); + tm_arr[11].acc_end(); } /* 处理pairs */ @@ -1210,6 +1214,9 @@ void parallelMarkDups() { cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; + cout << "t frags : " << tm_arr[11].acc_seconds_elapsed() << endl; + cout << "t pairs : " << tm_arr[12].acc_seconds_elapsed() << endl; + cout << "t unpair : " << tm_arr[13].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; diff --git a/src/sam/markdups/pipeline_md.cpp b/src/sam/markdups/pipeline_md.cpp new file mode 100644 index 0000000..006a60e --- /dev/null +++ b/src/sam/markdups/pipeline_md.cpp @@ -0,0 +1,1903 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "dup_metrics.h" +#include "markdups_arg.h" +#include "md_funcs.h" +#include "pipeline_md.h" +#include "shared_args.h" + +using std::cout; +using std::vector; + +/* 查找 */ +// template +// static inline Itr binaryFind(Itr first, Itr last, const T &val) +// { +// first = std::lower_bound(first, last, val); +// return (first != last && *first == val) ? first : last; +// } + +/* 排序 */ +static inline void sortReadEndsArr(vector &arr) { + size_t blockSize = 64 * 1024; + if (arr.size() < blockSize) { + std::sort(arr.begin(), arr.end()); + return; + } + size_t blockNum = (arr.size() + blockSize - 1) / blockSize; + size_t crossNum = 1024; + size_t start, end, i, left, right; + std::sort(arr.begin(), arr.begin() + blockSize); + for (i = 1; i < blockNum; ++i) { + start = i * blockSize; + end = min(start + blockSize, arr.size()); + std::sort(arr.begin() + start, arr.begin() + end); + left = crossNum; + while (!(arr[start - left] < arr[start])) { + left = left << 1; + if (left >= blockSize) { + std::sort(arr.begin(), arr.end()); // 退化到普通排序 + return; + } + } + right = min(crossNum, end - start - 1); + + while (!(arr[start - 1] < arr[start + right])) { + right = min(right << 1, end - start - 1); + if (right == end - start - 1) + break; + } + std::sort(arr.begin() + start - left, arr.begin() + start + right); + } +} + +/* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ +static void markDupsForPairs(vector &vpRe, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *singletonIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr, + MDSet *notSingletonIdx = nullptr) { + if (vpRe.size() < 2) { + if (vpRe.size() == 1) { + // 这个统计可能会有误差,因为当前位点可能还有没匹配上的read,导致当前位点的read(paired)数量为1 + // 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要 + // gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile); + singletonIdx->insert(vpRe[0]->read1IndexInFile); + } + return; + } + for (auto pe : vpRe) { + // gMetrics.singletonReads.erase(pe->read1IndexInFile); + } + + if (notSingletonIdx != nullptr) { + for (auto pe : vpRe) { + notSingletonIdx->insert(pe->read1IndexInFile); + } + } + + int maxScore = 0; + const ReadEnds *pBest = nullptr; + // bool print = false; + // int maxOperateTime = 0; + /** All read ends should have orientation FF, FR, RF, or RR **/ + for (auto pe : vpRe) { // 找分数最高的readend + // maxOperateTime = max(maxOperateTime, pe->oprateTime); + // (const_cast(pe))->oprateTime ++; + if (pe->score > maxScore || pBest == nullptr) { + maxScore = pe->score; + pBest = pe; + } + /* + if (pe->read1IndexInFile == 255830545) { + print = true; + } + */ + } + /* + if (print) { + cout << "mark pair find: " << endl; + for (auto pe : vpRe) { + cout << pe->read1IndexInFile << " " << pe->read2IndexInFile << " " << pe->score << endl; + } + cout << "mark pair end: " << endl; + } + */ + // cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" << + // pBest->read2IndexInFile << endl; if (maxOperateTime == 0) ++zzhtestnum; + if (notDupIdx != nullptr) { + notDupIdx->insert(pBest->read1IndexInFile); + notDupIdx->insert(pBest->read2IndexInFile); + } + // if (false) { + // if (true) { + if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余 + // trackOpticalDuplicates + vector prevOpticalRe; + if (notOpticalDupIdx != nullptr) { + for (auto pe : vpRe) { + if (pe->isOpticalDuplicate) { + prevOpticalRe.push_back(pe); + } + } + } + trackOpticalDuplicates(vpRe, pBest); + // 由于重叠问题,可能会更新数据 + if (notOpticalDupIdx != nullptr) { + for (auto pe : prevOpticalRe) { + if (!pe->isOpticalDuplicate) { + notOpticalDupIdx->insert(pe->read1IndexInFile); + notOpticalDupIdx->insert(pe->read2IndexInFile); + } + } + } + } + for (auto pe : vpRe) { // 对非best read标记冗余 + if (pe != pBest) { // 非best + // gMetrics.dupReads.insert(pe->read1IndexInFile); + // gMetrics.dupReads.insert(pe->read2IndexInFile); + dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 + if (pe->read2IndexInFile != pe->read1IndexInFile) + dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, + // 注意这里代表是read1的索引 + // 检查是否optical dup + if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) { + opticalDupIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + opticalDupIdx->insert(pe->read2IndexInFile); + } + } else { + // gMetrics.dupReads.erase(pe->read1IndexInFile); // for test + // gMetrics.dupReads.erase(pe->read2IndexInFile); + } + } + // 在输出的bam文件中添加tag, best作为dupset的代表 + if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) { + repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); + if (notRepIdx != nullptr) { + for (auto pe : vpRe) { + if (pe != pBest) { + notRepIdx->insert(pe->read1IndexInFile); + if (pe->read2IndexInFile != pe->read1IndexInFile) + notRepIdx->insert(pe->read2IndexInFile); + } + } + } + } +} + +/* 处理一组非paired的readends,标记冗余 */ +static void markDupsForFrags(vector &vpRe, bool containsPairs, DPSet *dupIdx, + MDSet *notDupIdx = nullptr) { + if (containsPairs) { + for (auto pe : vpRe) { + if (!pe->IsPaired()) { + dupIdx->insert(pe->read1IndexInFile); + // gMetrics.dupReads.insert(pe->read1IndexInFile); + } + } + } else { + int maxScore = 0; + const ReadEnds *pBest = nullptr; + for (auto pe : vpRe) { + if (pe->score > maxScore || pBest == nullptr) { + maxScore = pe->score; + pBest = pe; + } + } + if (notDupIdx != nullptr) { + notDupIdx->insert(pBest->read1IndexInFile); + } + for (auto pe : vpRe) { + if (pe != pBest) { + dupIdx->insert(pe->read1IndexInFile); + // gMetrics.dupReads.insert(pe->read1IndexInFile); + } else { + // gMetrics.dupReads.erase(pe->read1IndexInFile); + } + } + } +} + +/* 找到与readend pos相等的所有readend */ +static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { + auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::PairsLittleThan); // 只比对位点 + dst->insert(dst->end(), range.first, range.second); +} + +#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread)) +#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread)) + +static void threadGenerateReadEnds(void *data, long idx, int tid) { + auto &p = (*(ParallelDataArg *)data).threadData[idx]; + auto &mdArg = *(*(ParallelDataArg *)data).mdArg; + auto &rnParser = g_vRnParser[idx]; + int nThread = g_gArg.num_threads; + p.pairs.clear(); + p.frags.clear(); + p.unpairedDic.clear(); + size_t start_id = LOWER_BOUND(idx, nThread, mdArg.bams.size()); + size_t end_id = UPPER_BOUND(idx, nThread, mdArg.bams.size()); + // if (mdArg.taskSeq == 163) + // cout << "tid: " << tid << "\t" << idx << "\t" << start_id << "\t" << end_id << "\t" << mdArg.bams.size() << + // endl; + + for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read + BamWrap *bw = mdArg.bams[i]; + const int64_t bamIdx = mdArg.bamStartIdx + i; + if (bw->GetReadUnmappedFlag()) { + if (bw->b->core.tid == -1) + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + break; + } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 + ReadEnds fragEnd; + buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); + p.frags.push_back(fragEnd); // 添加进frag集合 + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 + string key = bw->query_name(); + if (p.unpairedDic.find(key) == p.unpairedDic.end()) { + p.unpairedDic[key] = {mdArg.taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + p.pairs.push_back(pairedEnds); + p.unpairedDic.erase(key); // 删除找到的pairend + } + } + } + } + // sortReadEndsArr(p.frags); + sort(p.frags.begin(), p.frags.end()); + sort(p.pairs.begin(), p.pairs.end()); + // cout << tid << "\tthread end" << endl; + // if (mdArg.taskSeq == 163) + // cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl; +} + +static void *threadSortParallelPairs(void *data) { + auto &pd = *(ParallelDataArg *)data; + vector &pairs = pd.threadData[pd.numThread].pairs; + sort(pairs.begin(), pairs.end()); + // 合并所有pairs + pd.pairs2d.clear(); + for (int i = 0; i < pd.numThread; ++i) { + pd.pairs2d.push_back(&pd.threadData[i].pairs); + } + pd.pairs2d.push_back(&pd.threadData[pd.numThread].pairs); + ReadEndsQueue pairsQue; + pairsQue.Init(&pd.pairs2d); + const ReadEnds *pRE; + while ((pRE = pairsQue.Pop()) != nullptr) { + pd.mdArg->pairs.push_back(*pRE); + } + return nullptr; +} + +static void *threadSortParallelFrags(void *data) { + auto &pd = *(ParallelDataArg *)data; + ReadEndsQueue fragsQue; + const ReadEnds *pRE; + pd.frags2d.clear(); + for (int i = 0; i < pd.numThread; ++i) { + pd.frags2d.push_back(&pd.threadData[i].frags); + } + fragsQue.Init(&pd.frags2d); + while ((pRE = fragsQue.Pop()) != nullptr) { + pd.mdArg->frags.push_back(*pRE); + } + return nullptr; +} + +static void *threadCreateUnpairInfo(void *data) { + auto &pd = *(ParallelDataArg *)data; + auto &p = *pd.mdArg; + for (auto &e : p.unpairedDic) { + auto posKey = e.second.unpairedRE.posKey; + auto &unpairArrInfo = p.unpairedPosArr[posKey]; + unpairArrInfo.unpairedNum++; + unpairArrInfo.taskSeq = p.taskSeq; + unpairArrInfo.readNameSet.insert(e.first); + } + return nullptr; +} + +/* 单线程生成readends (第一步)*/ +static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { + auto &p = *arg; + p.pairs.clear(); + p.frags.clear(); + p.unpairedDic.clear(); + p.unpairedPosArr.clear(); + + pdArg.mdArg = arg; + tm_arr[8].acc_start(); + kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads); + // tm_arr[8].acc_end(); + // 合并各个线程数据 + // 查找未匹配的frags + + tm_arr[9].acc_start(); + vector &pairs = pdArg.threadData[g_gArg.num_threads].pairs; + pairs.clear(); + for (int i = 0; i < g_gArg.num_threads; ++i) { + for (auto &val : pdArg.threadData[i].unpairedDic) { + const string &key = val.first; + const ReadEnds &fragEnd = val.second.unpairedRE; + if (p.unpairedDic.find(key) == p.unpairedDic.end()) { + p.unpairedDic[key] = {p.taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + p.unpairedDic.erase(key); // 删除找到的pairend + } + } + } + threadCreateUnpairInfo(&pdArg); + threadSortParallelPairs(&pdArg); + tm_arr[9].acc_end(); + tm_arr[8].acc_end(); + + tm_arr[11].acc_start(); +// pthread_t tid; +// pthread_create(&tid, nullptr, threadSortParallelFrags, &pdArg); + threadSortParallelFrags(&pdArg); + // tm_arr[12].acc_start(); + // threadSortParallelPairs(&pdArg); + // tm_arr[12].acc_end(); + // tm_arr[13].acc_start(); + // threadCreateUnpairInfo(&pdArg); + // tm_arr[13].acc_end(); +// pthread_join(tid, nullptr); + tm_arr[11].acc_end(); +} + +/* 处理pairs */ +static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, + DPSet *repIdx, MDSet *singletonIdx, MDSet *notDupIdx = nullptr, + MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr, + MDSet *notSingletonIdx = nullptr) { + // return; + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 + vpCache.push_back(&re); // 处理一个潜在的冗余组 + else { + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, + notRepIdx, notSingletonIdx); // 不一样 + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + } + } + markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, + notSingletonIdx); +} + +/* 处理frags */ +static void processFrags(vector &readEnds, DPSet *dupIdx, MDSet *notDupIdx = nullptr) { + bool containsPairs = false; + bool containsFrags = false; + vector vpCache; // 有可能是冗余的reads + const ReadEnds *pReadEnd = nullptr; + for (auto &re : readEnds) { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { + vpCache.push_back(&re); + containsPairs = containsPairs || re.IsPaired(); + containsFrags = containsFrags || !re.IsPaired(); + } else { + if (vpCache.size() > 1 && containsFrags) { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + containsPairs = re.IsPaired(); + containsFrags = !re.IsPaired(); + } + } + if (vpCache.size() > 1 && containsFrags) { + markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); + } +} + +/* 单线程markdup (第二步)*/ +static void markdups(MarkDupDataArg *arg) { + auto &p = *arg; + p.fragDupIdx.clear(); + p.pairDupIdx.clear(); + p.pairOpticalDupIdx.clear(); + p.pairRepIdx.clear(); + p.pairSingletonIdx.clear(); + /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ + // 先处理 pair + processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx, &p.pairSingletonIdx); + // cout << p.pairDupIdx.size() << "\t" << p.pairRepIdx.size() << endl; + + // 再处理frag + processFrags(p.frags, &p.fragDupIdx); +} + +/* 获取交叉部分的数据 */ +static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, + bool isPairCmp = false) { + if (leftArr.empty() || rightArr.empty()) { + // cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl; + return; + } + const size_t leftEndIdx = leftArr.size() - 1; + const size_t rightStartIdx = 0; + size_t leftSpan = 0; + size_t rightSpan = 0; + + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { + leftSpan += 1; + if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了(可能会有bug,上上个也与下一个有交集呢?) + leftSpan = leftArr.size() - 1; + break; + } + } + + while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { + rightSpan += 1; + if (rightSpan == rightArr.size() - 1) // 同上,可能会有bug + break; + } + dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); + dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); + std::sort(dst->begin(), dst->end()); +} + +/* 将frags重叠部分的dup idx放进数据中 */ +static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬DupIdx, MarkDupDataArg *lastArg, + MarkDupDataArg *curArg) { + auto &lp = *lastArg; + auto &p = *curArg; + for (auto idx : dupIdx) { + lp.fragDupIdx.insert(idx); + p.fragDupIdx.erase(idx); + } + for (auto idx : notDupIdx) { + lp.fragDupIdx.erase(idx); + p.fragDupIdx.erase(idx); + } +} + +/* 将pairs重叠部分的dup idx放进数据中 */ +static inline void refreshPairDupIdx(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet &singletonIdx, MDSet ¬DupIdx, + MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + MDSet ¬SingletonIdx, MarkDupDataArg *lastArg, MarkDupDataArg *curArg) { + auto &lp = *lastArg; + auto &p = *curArg; + for (auto idx : dupIdx) { + lp.pairDupIdx.insert(idx); + p.pairDupIdx.erase(idx); + } + for (auto idx : opticalDupIdx) { + lp.pairOpticalDupIdx.insert(idx); + p.pairOpticalDupIdx.erase(idx); + } + for (auto idx : repIdx) { + lp.pairRepIdx.insert(idx); + p.pairRepIdx.erase(idx); + } + for (auto idx : singletonIdx) { + lp.pairSingletonIdx.insert(idx); + p.pairSingletonIdx.erase(idx); + } + for (auto idx : notDupIdx) { + // if (lp.pairDupIdx.find(idx) != lp.pairDupIdx.end()) cout << "find-1: " << idx << endl; + // if (lp.pairDupIdx.find({idx}) != lp.pairDupIdx.end()) cout << "find-2: " << idx << endl; + // if (p.pairDupIdx.find(idx) != p.pairDupIdx.end()) cout << "find-3: " << idx << endl; + // if (p.pairDupIdx.find({idx}) != p.pairDupIdx.end()) cout << "find-4: " << idx << endl; + lp.pairDupIdx.erase(idx); + p.pairDupIdx.erase(idx); + } + for (auto idx : notOpticalDupIdx) { + lp.pairOpticalDupIdx.erase(idx); + p.pairOpticalDupIdx.erase(idx); + } + for (auto idx : notRepIdx) { + lp.pairRepIdx.erase(idx); + p.pairRepIdx.erase(idx); + } + for (auto idx : notSingletonIdx) { + lp.pairSingletonIdx.erase(idx); + p.pairSingletonIdx.erase(idx); + } +} + +// 用来分别处理dup和optical dup +static void refeshTaskDupInfo(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet &singletonIdx, MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, + MDSet ¬RepIdx, MDSet ¬SingletonIdx, DPSet &latterDupIdx, + MDSet &latterOpticalDupIdx, DPSet &latterRepIdx, + MDSet &latterSingletonIdx, MDSet &latterNotDupIdx, + MDSet &latterNotOpticalDupIdx, MDSet &latterNotRepIdx, + MDSet &latterNotSingletonIdx) { + for (auto idx : dupIdx) { + latterDupIdx.insert(idx); + // latterNotDupIdx.erase(idx); // 后来的更新为准 + } + for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); + for (auto idx : repIdx) latterRepIdx.insert(idx); + for (auto idx : singletonIdx) latterSingletonIdx.insert(idx); + for (auto idx : notDupIdx) latterNotDupIdx.insert(idx); + for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx); + for (auto idx : notRepIdx) latterNotRepIdx.insert(idx); + for (auto idx : notSingletonIdx) latterNotSingletonIdx.insert(idx); +} + +/* 最后合并数据并排序 */ +template +static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, + vector &cacheDupIdx, vector &midArr) { + midArr.resize(0); + cacheDupIdx.resize(0); + cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); + std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); + + auto ai = dupArr.begin(); + auto ae = dupArr.end(); + auto bi = cacheDupIdx.begin(); + auto be = cacheDupIdx.end(); + // auto bi = dupIdx.begin(); + // auto be = dupIdx.end(); + + T val = 0; + while (ai != ae && bi != be) { + if (*ai < *bi) { + val = *ai++; + } else if (*bi < *ai) { + val = *bi++; + } else { + val = *bi++; // 相等的时候取后面的作为结果 + ai++; + } + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + while (ai != ae) { + val = *ai++; + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + while (bi != be) { + val = *bi++; + if (notDupIdx.find(val) == notDupIdx.end()) { + midArr.push_back(val); + } + } + dupArr = midArr; +} + +static int64_t llp_frag_end_pos = 0; +static int64_t llp_pair_end_pos = 0; +static int64_t gSingletonNum = 0; +/* 处理相邻的两个任务,有相交叉的数据 */ +static void handleIntersectData(MarkDupDataArg *lastArg, MarkDupDataArg *curArg, GlobalDataArg *gDataArg) { + auto &lp = *lastArg; + auto &p = *curArg; + auto &g = *gDataArg; + + vector reArr; + DPSet dupIdx; + MDSet opticalDupIdx; + DPSet repIdx; + MDSet singletonIdx; + MDSet notOpticalDupIdx; + MDSet notDupIdx; + MDSet notRepIdx; + MDSet notSingletonIdx; + // 先处理重叠的frags + getIntersectData(lp.frags, p.frags, &reArr); + processFrags(reArr, &dupIdx, ¬DupIdx); + refreshFragDupIdx(dupIdx, notDupIdx, &lp, &p); + // cout << "578" << endl; + if (!p.frags.empty()) + if (llp_frag_end_pos >= p.frags.begin()->posKey) { + cout << "frags : " << llp_frag_end_pos << "\t" << p.frags[0].posKey << "\t" << p.frags.rbegin()->posKey + << endl; + } + if (!p.pairs.empty()) + if (llp_pair_end_pos >= p.pairs.begin()->posKey) { + cout << "pairs : " << llp_pair_end_pos << "\t" << p.pairs[0].posKey << "\t" << p.pairs.rbegin()->posKey + << endl; + } + if (!p.frags.empty()) + llp_frag_end_pos = lp.frags.rbegin()->posKey; + if (!p.pairs.empty()) + llp_pair_end_pos = lp.pairs.rbegin()->posKey; + + // 再处理重叠的pairs + reArr.clear(); + dupIdx.clear(); + notDupIdx.clear(); + getIntersectData(lp.pairs, p.pairs, &reArr, true); + +// cout << "pairs: " << lp.pairs.size() << "\t" << p.pairs.size() << endl; + processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, + ¬SingletonIdx); +// cout << "size: " << reArr.size() << endl; + refreshPairDupIdx(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, + notSingletonIdx, &lp, &p); + +// cout << "pairs size: " << lp.pairs.size() << "\t" << p.pairs.size() << endl; +// cout << "range: " << lp.pairs.front().posKey << "\t" << lp.pairs.back().posKey << "\t" +// << p.pairs.front().posKey << "\t" << p.pairs.back().posKey << endl; +// cout << "intersect size: " << reArr.size() << endl; + + // 处理之前未匹配的部分 + map recalcPos; + CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 + MDSet addToGlobal; + int64_t prevLastPos = 0, nextFirstPos = 0; + if (lp.frags.size() > 0) + prevLastPos = lp.frags.back().posKey; + if (p.frags.size() > 0) + nextFirstPos = p.frags[0].posKey; + for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + + if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read + auto &nextPosInfo = p.unpairedDic[readName]; + auto &nextFragEnd = nextPosInfo.unpairedRE; + int64_t prevPosKey = prevFragEnd.posKey; + modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read + int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey); + CalcKey ck = {prevPosKey, nextPosKey}; + UnpairedPosInfo *prevUnpairInfoP = nullptr; + UnpairedPosInfo *nextUnpairInfoP = nullptr; + if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end()) + prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; + if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end()) + nextUnpairInfoP = &p.unpairedPosArr[prevPosKey]; + + // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 + // 1. + // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; + // 2. + // prevpos在交叉部分之前,nextpos在交叉部分,需要获取lp中的相等read pair进行重新计算 + // 复杂情况1. g中包含prevPosKey对应的unpair,p中有对应的pair,此时应该把这些pair考虑进去 + // 3. + // prevpos在交叉部分,nextpos在交叉部分之后,需要获取p中的相等read pair进行重新计算 + // 复杂情况2. p中是否包含prevPosKey对应的unpair + // 4. + // prevpos在交叉部分,nextpos在交叉部分,需要获取lp和p中的相等read pair进行重新计算 + + bool addDataToPos = true; + if (alreadyAdd.find(ck) != alreadyAdd.end()) { + // 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据 + addDataToPos = false; + } else + alreadyAdd.insert(ck); + + if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前 + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况 + getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); + } + // 第一种情况,第二种情况下都会出现,复杂情况一 + auto gPosInfo = g.unpairedPosArr.find(prevPosKey); + if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致 + auto &gUnpairInfo = gPosInfo->second; + auto pPosInfo = p.unpairedPosArr.find(nextPosKey); + if (pPosInfo != p.unpairedPosArr.end()) { + auto &pUnpairInfo = pPosInfo->second; + for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的 + if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) { + auto pe = g.unpairedDic[rn].unpairedRE; + auto fe = p.unpairedDic[rn].unpairedRE; + modifyPairedEnds(fe, &pe); + prevPairArr.push_back(pe); + g.unpairedDic.erase(rn); + p.unpairedDic.erase(rn); + } + } + } + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } else { // prevpos在交叉部分 + if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况 + if (nextUnpairInfoP != nullptr) { // 且在pos点,next task有unpair,这样才把这些数据放到next task里 + auto &nextPairArr = nextUnpairInfoP->pairArr; + nextPairArr.push_back(prevFragEnd); + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) { + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + getEqualRE(prevFragEnd, p.pairs, &nextPairArr); + } + // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) + recalcPos[ck] = nextPosInfo.taskSeq; + + std::sort(prevPairArr.begin(), prevPairArr.end()); + std::sort(nextPairArr.begin(), nextPairArr.end()); + } else { // next task在该位点没有unpair,那就把数据放到prev task里 + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) // 第二种情况 + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } else { // 第四种情况 + if (prevUnpairInfoP == nullptr) { + prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey]; + prevUnpairInfoP->taskSeq = lp.taskSeq; + } + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) { + getEqualRE(prevFragEnd, lp.pairs, &prevPairArr); + getEqualRE(prevFragEnd, p.pairs, &prevPairArr); + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } + p.unpairedDic.erase(readName); // 在next task里删除该read + } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + auto &remainPairArr = remainUnpairInfo.pairArr; + remainPairArr.push_back(remainFragEnd); + CalcKey ck = {remainPosKey, prevFragEnd.posKey}; + recalcPos[ck] = remainPosInfo.taskSeq; + std::sort(remainPairArr.begin(), remainPairArr.end()); + + g.unpairedDic.erase(readName); + } else { // 都没找到,那就保存到遗留数据里 + int64_t prevPosKey = prevFragEnd.posKey; + g.unpairedDic.insert(prevUnpair); + addToGlobal.insert(prevPosKey); + } + } + map taskChanged; + MDSet posProcessed; + for (auto &e : recalcPos) { + auto posKey = e.first.read1Pos; + if (posProcessed.find(posKey) != posProcessed.end()) + continue; + posProcessed.insert(posKey); + auto taskSeq = e.second; + auto &t = taskChanged[taskSeq]; + // 在对应的任务包含的dup idx里修改结果数据 + vector *pairArrP = nullptr; + if (taskSeq < lp.taskSeq) + pairArrP = &g.unpairedPosArr[posKey].pairArr; + else + pairArrP = &lp.unpairedPosArr[posKey].pairArr; + processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, + &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); + if (taskSeq < lp.taskSeq) + g.unpairedPosArr.erase(posKey); + } + + // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 + // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) + for (auto posKey : addToGlobal) { + g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey]; + } + + // 更新结果 + for (auto &e : taskChanged) { + auto taskSeq = e.first; + auto &t = e.second; + if (taskSeq < lp.taskSeq) { + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, + t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], + g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], + g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], + g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], + g.latterNotSingletonIdxArr[taskSeq]); + } else if (taskSeq == lp.taskSeq) { + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, + t.notRepIdx, t.notSingletonIdx, &lp, &p); + } else { + refreshPairDupIdx(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, + t.notRepIdx, t.notSingletonIdx, &p, &lp); // 把结果放到p中 + } + } + // 将dupidx放进全局数据 + g.latterDupIdxArr.push_back(DPSet()); + g.latterOpticalDupIdxArr.push_back(MDSet()); + g.latterRepIdxArr.push_back(DPSet()); + g.latterSingletonIdxArr.push_back(MDSet()); + g.latterNotDupIdxArr.push_back(MDSet()); + g.latterNotOpticalDupIdxArr.push_back(MDSet()); + g.latterNotRepIdxArr.push_back(MDSet()); + g.latterNotSingletonIdxArr.push_back(MDSet()); + + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); + + g.singletonIdxArr.push_back(vector()); + auto &vSingletonIdx = g.singletonIdxArr.back(); + vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); + std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); + gSingletonNum += lp.pairSingletonIdx.size(); +} + +/* 当所有任务结束后,global data里还有未处理的数据 */ +static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { + // cout << "last task start" << endl; + auto &lp = *task; + auto &g = *gDataArg; + // 遗留的未匹配的pair + for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + + if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + + remainUnpairInfo.pairArr.push_back(remainFragEnd); + g.unpairedDic.erase(readName); + } else { + g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 + } + } + + map taskChanged; + for (auto &e : g.unpairedPosArr) { + auto posKey = e.first; + auto taskSeq = e.second.taskSeq; + auto &t = taskChanged[taskSeq]; + auto &arr = g.unpairedPosArr[posKey].pairArr; + + if (arr.size() > 1) { + std::sort(arr.begin(), arr.end()); + processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, + &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); + } else if (arr.size() == 1) { + // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); + t.singletonIdx.insert(arr[0].read1IndexInFile); + } + } + // 更新结果 + vector addDup; + map ndPosVal; + for (auto &e : taskChanged) { + auto taskSeq = e.first; + auto &t = e.second; + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, + t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], + g.latterRepIdxArr[taskSeq], g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], + g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], + g.latterNotSingletonIdxArr[taskSeq]); + } + g.unpairedPosArr.clear(); + + // 将dupidx放进全局数据 + vector cacheDupIdx; + vector midArr; + vector intCacheDupIdx; + vector intMidArr; + for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) { + refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr); + } + for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], + intCacheDupIdx, intMidArr); + for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); + for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i], + intCacheDupIdx, intMidArr); + + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); + + g.singletonIdxArr.push_back(vector()); + auto &vSingletonIdx = g.singletonIdxArr.back(); + vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); + std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); + gSingletonNum += lp.pairSingletonIdx.size(); +} + +// for step 2 generate read ends + +// multi-thread generate read ends +static void mtGenerateReadEnds(void *data, long idx, int tid) { + auto &p = *(PipelineArg *)data; + auto &rnParser = g_vRnParser[idx]; + int nThread = g_gArg.num_threads; + auto &bams = p.readData.bams; + int64_t bamStartIdx = p.readData.bamStartIdx; + int64_t taskSeq = p.readData.taskSeq; + GenREData &genREData = p.genREData[p.genREOrder % p.GENBUFNUM]; + auto &pairs = genREData.pairsArr[idx]; + auto &frags = genREData.fragsArr[idx]; + auto &unpairedDic = genREData.unpairedDicArr[idx]; + + pairs.clear(); + frags.clear(); + unpairedDic.clear(); + + size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); + size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); + for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read + BamWrap *bw = bams[i]; + const int64_t bamIdx = bamStartIdx + i; + if (bw->GetReadUnmappedFlag()) { + if (bw->b->core.tid == -1) + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + break; + } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 + ReadEnds fragEnd; + buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); + frags.push_back(fragEnd); // 添加进frag集合 + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 + string key = bw->query_name(); + if (unpairedDic.find(key) == unpairedDic.end()) { + unpairedDic[key] = {taskSeq, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + unpairedDic.erase(key); // 删除找到的pairend + } + } + } + } + // sortReadEndsArr(frags); + sort(frags.begin(), frags.end()); + sort(pairs.begin(), pairs.end()); +} + +static void doGenRE(PipelineArg &pipeArg) { + GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; + ReadData &readData = pipeArg.readData; + // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 + SortData &sortData = pipeArg.sortData[pipeArg.genREOrder % pipeArg.SORTBUFNUM]; + +// cout << "dogen-gen buf : " << pipeArg.genREOrder % pipeArg.GENBUFNUM << endl; +// cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl; + +// sortData.pSmd->unpairedDic.clear(); +// sortData.pSmd->unpairedPosArr.clear(); + // generate read ends + kt_for(g_gArg.num_threads, mtGenerateReadEnds, &pipeArg, g_gArg.num_threads); + + // 轮询每个线程中未找到匹配的read,找到匹配的 +// vector &pairs = genREData.pairsArr[g_gArg.num_threads]; +// pairs.clear(); +// for (int i = 0; i < g_gArg.num_threads; ++i) { +// for (auto &val : genREData.unpairedDicArr[i]) { +// const string &key = val.first; +// const ReadEnds &fragEnd = val.second.unpairedRE; +// if (sortData.pSmd->unpairedDic.find(key) == sortData.pSmd->unpairedDic.end()) { +// sortData.pSmd->unpairedDic[key] = {readData.taskSeq, fragEnd}; +// } else { // 找到了pairend +// auto &pairedEnds = sortData.pSmd->unpairedDic.at(key).unpairedRE; +// modifyPairedEnds(fragEnd, &pairedEnds); +// pairs.push_back(pairedEnds); +// sortData.pSmd->unpairedDic.erase(key); // 删除找到的pairend +// } +// } +// } +// sort(pairs.begin(), pairs.end()); +// +// // create unpaired info +// for (auto &e : sortData.pSmd->unpairedDic) { +// auto posKey = e.second.unpairedRE.posKey; +// auto &unpairArrInfo = sortData.pSmd->unpairedPosArr[posKey]; +// unpairArrInfo.unpairedNum++; +// unpairArrInfo.taskSeq = readData.taskSeq; +// unpairArrInfo.readNameSet.insert(e.first); +// } +} +// end for step 2 generate read ends + +// for step-3 sort +static void doSort(PipelineArg &pipeArg) { + GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; + SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; +// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl; +// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl; + auto pSmd = (SortMarkData*) sortData.pSmd; + pSmd->pairs.clear(); + pSmd->frags.clear(); + pSmd->unpairedDic.clear(); + pSmd->unpairedPosArr.clear(); + + vector &pairs = genREData.pairsArr[g_gArg.num_threads]; + pairs.clear(); + for (int i = 0; i < g_gArg.num_threads; ++i) { + for (auto &val : genREData.unpairedDicArr[i]) { + const string &key = val.first; + const ReadEnds &fragEnd = val.second.unpairedRE; + if (pSmd->unpairedDic.find(key) == pSmd->unpairedDic.end()) { + pSmd->unpairedDic[key] = {(int64_t)pipeArg.sortOrder, fragEnd}; + } else { // 找到了pairend + auto &pairedEnds = pSmd->unpairedDic.at(key).unpairedRE; + modifyPairedEnds(fragEnd, &pairedEnds); + pairs.push_back(pairedEnds); + pSmd->unpairedDic.erase(key); // 删除找到的pairend + } + } + } + sort(pairs.begin(), pairs.end()); + + // create unpaired info + for (auto &e : pSmd->unpairedDic) { + auto posKey = e.second.unpairedRE.posKey; + auto &unpairArrInfo = pSmd->unpairedPosArr[posKey]; + unpairArrInfo.unpairedNum++; + unpairArrInfo.taskSeq = pipeArg.sortOrder; + unpairArrInfo.readNameSet.insert(e.first); + } + + const ReadEnds *pRE; + ReadEndsHeap pairsHeap, fragsHeap; + pairsHeap.Init(&genREData.pairsArr); + while ((pRE = pairsHeap.Pop()) != nullptr) { + pSmd->pairs.push_back(*pRE); + } + fragsHeap.Init(&genREData.fragsArr); + while ((pRE = fragsHeap.Pop()) != nullptr) { + pSmd->frags.push_back(*pRE); + } +} +// for step-4 sort +static void doMarkDup(PipelineArg &pipeArg) { + MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; + SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; + mdData.taskSeq = pipeArg.markDupOrder; + mdData.fragDupIdx.clear(); + mdData.pairDupIdx.clear(); + mdData.pairOpticalDupIdx.clear(); + mdData.pairRepIdx.clear(); + mdData.pairSingletonIdx.clear(); + + //cout << "doMarkDup-mark buf : " << pipeArg.markDupOrder % pipeArg.MARKBUFNUM << endl; + //cout << "doMarkDup-sort buf:" << pipeArg.markDupOrder % pipeArg.SORTBUFNUM << endl; + + tm_arr[5].acc_start(); + // auto tmpPt = sortData.pSmd; + // sortData.pSmd = mdData.pSmd; + // mdData.pSmd = tmpPt; + + auto mdpSmd = (SortMarkData *)mdData.pSmd; + auto stpSmd = (SortMarkData *)sortData.pSmd; + mdpSmd->frags = stpSmd->frags; + mdpSmd->pairs = stpSmd->pairs; + mdpSmd->unpairedDic = stpSmd->unpairedDic; + mdpSmd->unpairedPosArr = stpSmd->unpairedPosArr; + //cout << mdData.pSmd->frags.size() << "\t" << mdData.pSmd->pairs.size() << "\t" << mdData.pSmd->unpairedDic.size() + // << "\t" << mdData.pSmd->unpairedPosArr.size() << endl; + tm_arr[5].acc_end(); + + // 先处理 pair + processPairs(mdpSmd->pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx, + &mdData.pairSingletonIdx); + // 再处理frag + processFrags(mdpSmd->frags, &mdData.fragDupIdx); +} + +template +static void refreshDupIdx(T &srcArr, T &insArr, T &delArr) { + for (auto dup: srcArr) { + insArr.insert(dup); + delArr.erase(dup); + } +} +template +static void refreshNotDupIdx(T1 &srcArr, T2 &delArr1, T2 &delArr2) { + for (auto dup : srcArr) { + delArr1.erase(dup); + delArr2.erase(dup); + } +} + +static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, + MDSet &singletonIdx, MDSet ¬DupIdx, + MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, + MDSet ¬SingletonIdx, MarkDupData &lp, MarkDupData &p) { + refreshDupIdx(dupIdx, lp.pairDupIdx, p.pairDupIdx); + refreshNotDupIdx(notDupIdx, lp.pairDupIdx, p.pairDupIdx); + refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx); + refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx); + refreshDupIdx(repIdx, lp.pairRepIdx, p.pairRepIdx); + refreshNotDupIdx(notRepIdx, lp.pairRepIdx, p.pairRepIdx); + refreshDupIdx(singletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx); + refreshNotDupIdx(notSingletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx); +} + +// for step-5 sort +static void doIntersect(PipelineArg &pipeArg) { +// return; + IntersectData &g = pipeArg.intersectData; + if (pipeArg.intersectOrder == 0) + return; // 要等第二部分数据才能进行计算 + MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; + MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM]; + +// cout << "dointer-mark buf : " << (pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM << "\t" +// << pipeArg.intersectOrder % pipeArg.MARKBUFNUM << endl; + // cout << "sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl; + + vector reArr; + DPSet dupIdx; + MDSet opticalDupIdx; + DPSet repIdx; + MDSet singletonIdx; + MDSet notOpticalDupIdx; + MDSet notDupIdx; + MDSet notRepIdx; + MDSet notSingletonIdx; + auto lppSmd = (SortMarkData *)lp.pSmd; + auto ppSmd = (SortMarkData *)p.pSmd; + + // 先处理重叠的frags + // cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl; + getIntersectData(lppSmd->frags, ppSmd->frags, &reArr); + processFrags(reArr, &dupIdx, ¬DupIdx); + refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx); + refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx); + + + // 再处理重叠的pairs + reArr.clear(); + dupIdx.clear(); + notDupIdx.clear(); + getIntersectData(lppSmd->pairs, ppSmd->pairs, &reArr, true); +// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl; +// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t" +// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl; +// cout << "intersect size: " << reArr.size() << endl; + processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &singletonIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx, + ¬SingletonIdx); + refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx, + notSingletonIdx, lp, p); +/* +// return; + // 处理之前未匹配的部分 + map recalcPos; + CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 + MDSet addToGlobal; + int64_t prevLastPos = 0, nextFirstPos = 0; + if (lppSmd->frags.size() > 0) + prevLastPos = lppSmd->frags.back().posKey; + if (ppSmd->frags.size() > 0) + nextFirstPos = ppSmd->frags[0].posKey; + for (auto &prevUnpair : lppSmd->unpairedDic) { // 遍历上一个任务中的每个未匹配的read + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + + if (ppSmd->unpairedDic.find(readName) != ppSmd->unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read + auto &nextPosInfo = ppSmd->unpairedDic[readName]; + auto &nextFragEnd = nextPosInfo.unpairedRE; + int64_t prevPosKey = prevFragEnd.posKey; + modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read + int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey); + CalcKey ck = {prevPosKey, nextPosKey}; + UnpairedPosInfo *prevUnpairInfoP = nullptr; + UnpairedPosInfo *nextUnpairInfoP = nullptr; + if (lppSmd->unpairedPosArr.find(prevPosKey) != lppSmd->unpairedPosArr.end()) + prevUnpairInfoP = &lppSmd->unpairedPosArr[prevPosKey]; + if (ppSmd->unpairedPosArr.find(prevPosKey) != ppSmd->unpairedPosArr.end()) + nextUnpairInfoP = &ppSmd->unpairedPosArr[prevPosKey]; + + // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 + // 1. + // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; + // 2. + // prevpos在交叉部分之前,nextpos在交叉部分,需要获取lp中的相等read pair进行重新计算 + // 复杂情况1. g中包含prevPosKey对应的unpair,p中有对应的pair,此时应该把这些pair考虑进去 + // 3. + // prevpos在交叉部分,nextpos在交叉部分之后,需要获取p中的相等read pair进行重新计算 + // 复杂情况2. p中是否包含prevPosKey对应的unpair + // 4. + // prevpos在交叉部分,nextpos在交叉部分,需要获取lp和p中的相等read pair进行重新计算 + + bool addDataToPos = true; + if (alreadyAdd.find(ck) != alreadyAdd.end()) { + // 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据 + addDataToPos = false; + } else + alreadyAdd.insert(ck); + + if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前 + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况 + getEqualRE(prevFragEnd, lppSmd->pairs, &prevPairArr); + } + // 第一种情况,第二种情况下都会出现,复杂情况一 + auto gPosInfo = g.unpairedPosArr.find(prevPosKey); + if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致 + auto &gUnpairInfo = gPosInfo->second; + auto pPosInfo = ppSmd->unpairedPosArr.find(nextPosKey); + if (pPosInfo != ppSmd->unpairedPosArr.end()) { + auto &pUnpairInfo = pPosInfo->second; + for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的 + if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) { + auto pe = g.unpairedDic[rn].unpairedRE; + auto fe = ppSmd->unpairedDic[rn].unpairedRE; + modifyPairedEnds(fe, &pe); + prevPairArr.push_back(pe); + g.unpairedDic.erase(rn); + ppSmd->unpairedDic.erase(rn); + } + } + } + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } else { // prevpos在交叉部分 + if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况 + if (nextUnpairInfoP != nullptr) { // 且在pos点,next task有unpair,这样才把这些数据放到next task里 + auto &nextPairArr = nextUnpairInfoP->pairArr; + nextPairArr.push_back(prevFragEnd); + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) { + getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr); + getEqualRE(prevFragEnd, ppSmd->pairs, &nextPairArr); + } + // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) + recalcPos[ck] = nextPosInfo.taskSeq; + + std::sort(prevPairArr.begin(), prevPairArr.end()); + std::sort(nextPairArr.begin(), nextPairArr.end()); + } else { // next task在该位点没有unpair,那就把数据放到prev task里 + auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) // 第二种情况 + getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr); + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } else { // 第四种情况 + if (prevUnpairInfoP == nullptr) { + prevUnpairInfoP = &lppSmd->unpairedPosArr[prevPosKey]; + prevUnpairInfoP->taskSeq = lp.taskSeq; + } + auto &prevPairArr = prevUnpairInfoP->pairArr; + prevPairArr.push_back(prevFragEnd); + if (addDataToPos) { + getEqualRE(prevFragEnd, lppSmd->pairs, &prevPairArr); + getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr); + } + recalcPos[ck] = prevPosInfo.taskSeq; + std::sort(prevPairArr.begin(), prevPairArr.end()); + } + } + ppSmd->unpairedDic.erase(readName); // 在next task里删除该read + } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + auto &remainPairArr = remainUnpairInfo.pairArr; + remainPairArr.push_back(remainFragEnd); + CalcKey ck = {remainPosKey, prevFragEnd.posKey}; + recalcPos[ck] = remainPosInfo.taskSeq; + std::sort(remainPairArr.begin(), remainPairArr.end()); + + g.unpairedDic.erase(readName); + } else { // 都没找到,那就保存到遗留数据里 + int64_t prevPosKey = prevFragEnd.posKey; + g.unpairedDic.insert(prevUnpair); + addToGlobal.insert(prevPosKey); + } + } + map taskChanged; + MDSet posProcessed; + for (auto &e : recalcPos) { + auto posKey = e.first.read1Pos; + if (posProcessed.find(posKey) != posProcessed.end()) + continue; + posProcessed.insert(posKey); + auto taskSeq = e.second; + auto &t = taskChanged[taskSeq]; + // 在对应的任务包含的dup idx里修改结果数据 + vector *pairArrP = nullptr; + if (taskSeq < lp.taskSeq) + pairArrP = &g.unpairedPosArr[posKey].pairArr; + else + pairArrP = &lppSmd->unpairedPosArr[posKey].pairArr; + processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, + &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); + if (taskSeq < lp.taskSeq) + g.unpairedPosArr.erase(posKey); + } + // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 + // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) + for (auto posKey : addToGlobal) { + g.unpairedPosArr[posKey] = lppSmd->unpairedPosArr[posKey]; + } + + // 更新结果 +// for (auto &e : taskChanged) { +// auto taskSeq = e.first; +// auto &t = e.second; +// if (taskSeq < lp.taskSeq) { +// refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, +// t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], +// g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], +// g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], +// g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], +// g.latterNotSingletonIdxArr[taskSeq]); +// } else if (taskSeq == lp.taskSeq) { +// refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, +// t.notRepIdx, t.notSingletonIdx, lp, p); +// } else { +// refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, +// t.notRepIdx, t.notSingletonIdx, p, lp); // 把结果放到p中 +// } +// } +*/ + // 将dupidx放进全局数据 + g.latterDupIdxArr.push_back(DPSet()); + g.latterOpticalDupIdxArr.push_back(MDSet()); + g.latterRepIdxArr.push_back(DPSet()); + g.latterSingletonIdxArr.push_back(MDSet()); + g.latterNotDupIdxArr.push_back(MDSet()); + g.latterNotOpticalDupIdxArr.push_back(MDSet()); + g.latterNotRepIdxArr.push_back(MDSet()); + g.latterNotSingletonIdxArr.push_back(MDSet()); + + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); + vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); + + g.singletonIdxArr.push_back(vector()); + auto &vSingletonIdx = g.singletonIdxArr.back(); + vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end()); + std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); + gSingletonNum += lp.pairSingletonIdx.size(); +} + +static void *pipeRead(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + BamBufType inBamBuf(g_gArg.use_asyncio); + inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); + int64_t readNumSum = 0; + while (1) { + POSSESS(pipeArg.readSig); + WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 1); // 只有一个坑,因为在bambuf内部支持异步读取 + size_t readNum = 0; + tm_arr[0].acc_start(); + if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 + tm_arr[0].acc_end(); + if (readNum < 1) { + pipeArg.readFinish = 1; + TWIST(pipeArg.readSig, BY, 1); // 读入了一轮数据 + break; + } + cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl; + + pipeArg.readData.bams = inBamBuf.GetBamArr(); + + pipeArg.readData.bamStartIdx = readNumSum; + pipeArg.readData.taskSeq = pipeArg.readOrder; + + readNumSum += readNum; + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + pipeArg.readOrder += 1; // for next + TWIST(pipeArg.readSig, BY, 1); // 读入了一轮数据 + } + cout << "total reads: " << readNumSum << endl; + return 0; +} +static void *pipeGenRE(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + auto &genREData = pipeArg.genREData; + // init generate read ends data by num_thread + for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { + genREData[i].Init(g_gArg.num_threads); + } + + while(1) { + POSSESS(pipeArg.readSig); + WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 0); // 等待有数据 + POSSESS(pipeArg.genRESig); + // cout << "gre: " << PEEK_LOCK(pipeArg.genRESig) << endl; + WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑 + RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放 + + if (pipeArg.readFinish) { + POSSESS(pipeArg.genRESig); + pipeArg.genREFinish = 1; + TWIST(pipeArg.genRESig, BY, 1); + TWIST(pipeArg.readSig, BY, -1); + break; + } + /* 处理bam,生成readends */ +// cout << "genRE order: " << pipeArg.genREOrder << "\t" << pipeArg.readData.bamStartIdx << endl; + tm_arr[1].acc_start(); + doGenRE(pipeArg); + tm_arr[1].acc_end(); + + POSSESS(pipeArg.genRESig); + pipeArg.genREOrder += 1; + TWIST(pipeArg.genRESig, BY, 1); + TWIST(pipeArg.readSig, BY, -1); // 使用了一次读入的数据 + } + return 0; +} +static void *pipeSort(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + + while (1) { + POSSESS(pipeArg.genRESig); + WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, 0); // 等待有数据 + RELEASE(pipeArg.genRESig); + + POSSESS(pipeArg.sortSig); + WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置 + RELEASE(pipeArg.sortSig); + + if (pipeArg.genREFinish) { + // 处理完剩余数据 + while (pipeArg.sortOrder < pipeArg.genREOrder) { + tm_arr[2].acc_start(); + doSort(pipeArg); + tm_arr[2].acc_start(); + pipeArg.sortOrder += 1; + } + POSSESS(pipeArg.sortSig); + pipeArg.sortFinish = 1; + TWIST(pipeArg.sortSig, BY, 1); + break; + } + /* 排序 readends */ +// cout << "sort order: " << pipeArg.sortOrder << endl; + tm_arr[2].acc_start(); + doSort(pipeArg); + tm_arr[2].acc_end(); + + POSSESS(pipeArg.genRESig); + TWIST(pipeArg.genRESig, BY, -1); + + POSSESS(pipeArg.sortSig); + pipeArg.sortOrder += 1; + TWIST(pipeArg.sortSig, BY, 1); + } + return 0; +} +static void *pipeMarkDup(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + + while (1) { + POSSESS(pipeArg.sortSig); + WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, 0); // 等待有数据 + RELEASE(pipeArg.sortSig); + + POSSESS(pipeArg.markDupSig); + WAIT_FOR(pipeArg.markDupSig, NOT_TO_BE, pipeArg.MARKBUFNUM); + RELEASE(pipeArg.markDupSig); + + if (pipeArg.sortFinish) { + // 应该还得处理剩余的数据 + while (pipeArg.markDupOrder < pipeArg.sortOrder) { + tm_arr[3].acc_start(); + doMarkDup(pipeArg); + tm_arr[3].acc_end(); + pipeArg.markDupOrder += 1; + } + POSSESS(pipeArg.markDupSig); + pipeArg.markDupFinish = 1; + TWIST(pipeArg.markDupSig, BY, 1); + break; + } + /* 冗余检测 readends */ +// cout << "markdup order: " << pipeArg.markDupOrder << endl; + tm_arr[3].acc_start(); + doMarkDup(pipeArg); + tm_arr[3].acc_end(); + POSSESS(pipeArg.sortSig); + TWIST(pipeArg.sortSig, BY, -1); + + POSSESS(pipeArg.markDupSig); + pipeArg.markDupOrder += 1; + TWIST(pipeArg.markDupSig, BY, 1); + } + return 0; +} +static void *pipeIntersect(void *data) { + PipelineArg &pipeArg = *(PipelineArg *)data; + + while (1) { + POSSESS(pipeArg.markDupSig); + WAIT_FOR(pipeArg.markDupSig, NOT_TO_BE, 0); // 等待有数据 + RELEASE(pipeArg.markDupSig); + + if (pipeArg.markDupFinish) { + while (pipeArg.intersectOrder < pipeArg.markDupOrder) { + tm_arr[4].acc_start(); + doIntersect(pipeArg); + tm_arr[4].acc_end(); + pipeArg.intersectOrder += 1; + } + break; + } + /* 交叉数据处理 readends */ +// cout << "intersect order: " << pipeArg.intersectOrder << endl; + tm_arr[4].acc_start(); + doIntersect(pipeArg); + tm_arr[4].acc_end(); + + POSSESS(pipeArg.markDupSig); + TWIST(pipeArg.markDupSig, BY, -1); + pipeArg.intersectOrder += 1; + } + return 0; +} + +/* 当所有任务结束后,global data里还有未处理的数据 */ +static void mergeAllTask(PipelineArg &pipeArg) { + // cout << "last task start" << endl; + MarkDupData &mdData = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM]; + IntersectData &g = pipeArg.intersectData; + auto &lp = *(SortMarkData *)mdData.pSmd; + + // 遗留的未匹配的pair + for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read + auto &readName = prevUnpair.first; + auto &prevPosInfo = prevUnpair.second; + auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end + + if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read + auto &remainPosInfo = g.unpairedDic[readName]; + auto remainFragEnd = remainPosInfo.unpairedRE; + int64_t remainPosKey = remainFragEnd.posKey; + modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read + auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; + + remainUnpairInfo.pairArr.push_back(remainFragEnd); + g.unpairedDic.erase(readName); + } else { + g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 + } + } + + map taskChanged; + for (auto &e : g.unpairedPosArr) { + auto posKey = e.first; + auto taskSeq = e.second.taskSeq; + auto &t = taskChanged[taskSeq]; + auto &arr = g.unpairedPosArr[posKey].pairArr; + + if (arr.size() > 1) { + std::sort(arr.begin(), arr.end()); + processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx, + &t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx); + } else if (arr.size() == 1) { + // gMetrics.singletonReads.insert(arr[0].read1IndexInFile); + t.singletonIdx.insert(arr[0].read1IndexInFile); + } + } + // 更新结果 + vector addDup; + map ndPosVal; + for (auto &e : taskChanged) { + auto taskSeq = e.first; + auto &t = e.second; + refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx, + t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], + g.latterRepIdxArr[taskSeq], g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq], + g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq], + g.latterNotSingletonIdxArr[taskSeq]); + } + g.unpairedPosArr.clear(); + + // 将dupidx放进全局数据 + vector cacheDupIdx; + vector midArr; + vector intCacheDupIdx; + vector intMidArr; + for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) { + refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr); + } + for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], + intCacheDupIdx, intMidArr); + for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); + for (int i = 0; i < (int)g.singletonIdxArr.size() - 1; ++i) + refeshFinalTaskDupInfo(g.latterSingletonIdxArr[i], g.latterNotSingletonIdxArr[i], g.singletonIdxArr[i], + intCacheDupIdx, intMidArr); + + g.dupIdxArr.push_back(vector()); + auto &vIdx = g.dupIdxArr.back(); + mdData.pairDupIdx.insert(mdData.fragDupIdx.begin(), mdData.fragDupIdx.end()); + vIdx.insert(vIdx.end(), mdData.pairDupIdx.begin(), mdData.pairDupIdx.end()); + std::sort(vIdx.begin(), vIdx.end()); + + g.opticalDupIdxArr.push_back(vector()); + auto &vOpticalIdx = g.opticalDupIdxArr.back(); + vOpticalIdx.insert(vOpticalIdx.end(), mdData.pairOpticalDupIdx.begin(), mdData.pairOpticalDupIdx.end()); + std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); + + g.repIdxArr.push_back(vector()); + auto &vRepIdx = g.repIdxArr.back(); + vRepIdx.insert(vRepIdx.end(), mdData.pairRepIdx.begin(), mdData.pairRepIdx.end()); + std::sort(vRepIdx.begin(), vRepIdx.end()); + + g.singletonIdxArr.push_back(vector()); + auto &vSingletonIdx = g.singletonIdxArr.back(); + vSingletonIdx.insert(vSingletonIdx.end(), mdData.pairSingletonIdx.begin(), mdData.pairSingletonIdx.end()); + std::sort(vSingletonIdx.begin(), vSingletonIdx.end()); + gSingletonNum += mdData.pairSingletonIdx.size(); +} + +static void startPipeline() { + Timer::log_time("pipeline start"); + PipelineArg pipeArg; + + pthread_t tidArr[5]; + pthread_create(&tidArr[0], 0, pipeRead, &pipeArg); + pthread_create(&tidArr[1], 0, pipeGenRE, &pipeArg); + pthread_create(&tidArr[2], 0, pipeSort, &pipeArg); + pthread_create(&tidArr[3], 0, pipeMarkDup, &pipeArg); + pthread_create(&tidArr[4], 0, pipeIntersect, &pipeArg); + for (int i = 0; i < 5; ++i) pthread_join(tidArr[i], 0); + tm_arr[6].acc_start(); + mergeAllTask(pipeArg); + tm_arr[6].acc_end(); + + int64_t dupNum = 0; + int64_t opticalDupNum = 0; + int64_t singletonNum = 0; + for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size(); + for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size(); + for (auto &arr : pipeArg.intersectData.singletonIdxArr) singletonNum += arr.size(); + + cout << "Final read order: " << pipeArg.readOrder << endl; + cout << "Final gen order: " << pipeArg.genREOrder << endl; + cout << "Final sort order: " << pipeArg.sortOrder << endl; + cout << "Final mark order: " << pipeArg.markDupOrder << endl; + cout << "Final inter order: " << pipeArg.intersectOrder << endl; + + cout << "read time : " << tm_arr[0].acc_seconds_elapsed() << endl; + cout << "gen time : " << tm_arr[1].acc_seconds_elapsed() << endl; + cout << "sort time : " << tm_arr[2].acc_seconds_elapsed() << endl; + cout << "markdup time : " << tm_arr[3].acc_seconds_elapsed() << endl; + cout << "intersect time: " << tm_arr[4].acc_seconds_elapsed() << endl; + cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl; + cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl; + + cout << "dup num : " << dupNum << endl; + cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; + cout << "singleton num : " << singletonNum << endl; + + Timer::log_time("pipeline end "); +} + +/* 并行流水线方式处理数据,标记冗余 */ +void pipelineMarkDups() { + startPipeline(); + return; + + tm_arr[5].acc_start(); + Timer::log_time("pipeline start"); + // 读取缓存初始化 + BamBufType inBamBuf(g_gArg.use_asyncio); + inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); + + + int64_t processedBamNum = 0; + + MarkDupDataArg smdArg1, smdArg2; + MarkDupDataArg *lastArgP = &smdArg1; + MarkDupDataArg *curArgP = &smdArg2; + ParallelDataArg pdArg; + pdArg.Init(g_gArg.num_threads); + + bool isFirstRound = true; + int roundNum = 0; + int64_t readNumSum = 0; + while (inBamBuf.ReadStat() >= 0) { + Timer t_round; + // 读取bam文件中的read + tm_arr[4].acc_start(); + size_t readNum = inBamBuf.ReadBam(); + if (readNum < 1) + break; + readNumSum += readNum; + tm_arr[4].acc_end(); + cout << "read num: " << readNum << "\t" << readNumSum << '\t' << roundNum << endl; + // lastArgP = curArgP; + tm_arr[6].acc_start(); + curArgP->taskSeq = roundNum; + curArgP->bamStartIdx = processedBamNum; + curArgP->bams = inBamBuf.GetBamArr(); + tm_arr[6].acc_end(); + + tm_arr[0].acc_start(); + Timer t1; + generateReadEnds(pdArg, curArgP); +// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; + tm_arr[0].acc_end(); + + tm_arr[1].acc_start(); + t1.reinit(); + markdups(curArgP); +// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; + tm_arr[1].acc_end(); + + if (!isFirstRound) { + tm_arr[2].acc_start(); + t1.reinit(); + handleIntersectData(lastArgP, curArgP, &gData); +// cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; + // addTaskIdxToSet(lastArgP, &gData); + tm_arr[2].acc_end(); + } else { + isFirstRound = false; + } + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + processedBamNum += readNum; + + // 交换 + auto tmp = lastArgP; + lastArgP = curArgP; + curArgP = tmp; + // cout << "round time: " << t_round.seconds_elapsed() << endl; + roundNum++; + if (roundNum % 100 == 0) { + // cout << "read sum: " << readNumSum << endl; + // cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl; + } + } + tm_arr[3].acc_start(); + // 处理剩下的全局数据 + handleLastTask(lastArgP, &gData); + + // cout << "here 2" << endl; + tm_arr[3].acc_end(); + + tm_arr[5].acc_end(); + // 统计所有冗余index数量 + int64_t dupNum = 0; + int64_t opticalDupNum = 0; + int64_t singletonNum = 0; + + int64_t dupNumDic = 0; + int64_t singletonNumDic = 0; + + map dup; + + int taskSeq = 0; + // for (auto &arr : gData.dupIdxArr) { + // for (auto idx : arr) { + // if (dup.find(idx.idx) != dup.end()) { + // // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' + // // << idx << endl; + // } + // dup[idx.idx] = taskSeq; + // } + // // cout << taskSeq << "\t" << arr.size() << endl; + // taskSeq++; + // } + // dupNumDic = dup.size(); + // dup.clear(); + // int notInMetrics = 0; + // cout << "gmetrics single count: " << gMetrics.singletonReads.size() << endl; + // for (auto &arr : gData.singletonIdxArr) { + // for (auto idx : arr) { + // dup[idx] = 1; + // if (gMetrics.singletonReads.find(idx) == gMetrics.singletonReads.end()) { + //// cout << "not in gmetrics: " << idx << endl; + // ++notInMetrics; + // } else { + // gMetrics.singletonReads.erase(idx); + // } + // } + // } + // singletonNumDic = dup.size(); + // cout << "not in arr: " << endl; + // for (auto idx : gMetrics.singletonReads) { + //// cout << idx << endl; + // } + // cout << "count: " << notInMetrics << "\t" << gMetrics.singletonReads.size() << endl; + + // #include + // ofstream out("tumor_dup.txt"); + // for (auto idx : dup) + // { + // out << idx << endl; + // } + // out.close(); + + for (auto &arr : gData.dupIdxArr) dupNum += arr.size(); + for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); + for (auto &arr : gData.singletonIdxArr) singletonNum += arr.size(); + + cout << "dup num : " << dupNum << '\t' << dupNumDic << "\t" << zzhtestnum << endl; + cout << "singleton : " << singletonNum << "\t" << singletonNumDic << "\t" << gSingletonNum << endl; + cout << "singleton size: " << gMetrics.singletonReads.size() << endl; + cout << "dup read size: " << gMetrics.dupReads.size() << endl; + + cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; + cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; + cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl; + cout << "handle last : " << tm_arr[3].acc_seconds_elapsed() << endl; + cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl; + cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl; + cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl; + cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; + cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; + cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; + cout << "t frags : " << tm_arr[11].acc_seconds_elapsed() << endl; + cout << "t pairs : " << tm_arr[12].acc_seconds_elapsed() << endl; + cout << "t unpair : " << tm_arr[13].acc_seconds_elapsed() << endl; + cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; + cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; + + Timer::log_time("pipeline end "); + cout << "read num sum: " << readNumSum << endl; +} \ No newline at end of file diff --git a/src/sam/markdups/pipeline_md.h b/src/sam/markdups/pipeline_md.h new file mode 100644 index 0000000..eb6d97b --- /dev/null +++ b/src/sam/markdups/pipeline_md.h @@ -0,0 +1,170 @@ +#pragma once + +#include +#include +#include "md_types.h" + +struct ReadData { + vector bams; // read step output + int64_t bamStartIdx = 0; // 每轮读入的bam数组,起始位置在全局bam中的索引 + int64_t taskSeq = 0; // 任务序号 +}; + +class SortData; +struct GenREData { + vector> pairsArr; // 成对的reads + vector> fragsArr; // 暂未找到配对的reads + vector unpairedDicArr; // 用来寻找pair end + void Init(int nThread) { + for (int i = 0; i <= nThread; ++i) { // 比线程多一个,主要是pairs多一个,其他没用 + pairsArr.push_back(vector()); + fragsArr.push_back(vector()); + unpairedDicArr.push_back(UnpairedNameMap()); + } + } +}; + +struct SortMarkData { + vector pairs; // 成对的reads + vector frags; // 暂未找到配对的reads + UnpairedNameMap unpairedDic; // 用来寻找pair end + UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 +}; + +struct SortData { + volatile void *pSmd; +}; + +struct MarkDupData { + int64_t taskSeq = 0; // 任务序号 + DPSet pairDupIdx; // pair的冗余read的索引 + MDSet pairOpticalDupIdx; // optical冗余read的索引 + DPSet fragDupIdx; // frag的冗余read的索引 + DPSet pairRepIdx; // pair的dupset代表read的索引 + MDSet pairSingletonIdx; // 某位置只有一对read的所有read pair个数 + volatile void *pSmd; +}; + +struct IntersectData { + UnpairedNameMap unpairedDic; // 用来寻找pair end + UnpairedPositionMap unpairedPosArr; + + // 每个task对应一个vector + vector> dupIdxArr; + vector> opticalDupIdxArr; + vector> repIdxArr; + vector> singletonIdxArr; + + // 用来存放后续计算的数据 + vector> latterDupIdxArr; + vector> latterOpticalDupIdxArr; + vector> latterRepIdxArr; + vector> latterSingletonIdxArr; + vector> latterNotDupIdxArr; + vector> latterNotOpticalDupIdxArr; + vector> latterNotRepIdxArr; + vector> latterNotSingletonIdxArr; +}; + +// 记录流水线状态,task的序号,以及某阶段是否结束 +struct PipelineArg { + static const int GENBUFNUM = 2; + static const int SORTBUFNUM = 2; + static const int MARKBUFNUM = 3; + uint64_t readOrder = 0; + uint64_t genREOrder = 0; + uint64_t sortOrder = 0; + uint64_t markDupOrder = 0; + uint64_t intersectOrder = 0; + + volatile int readFinish = 0; + volatile int genREFinish = 0; + volatile int sortFinish = 0; + volatile int markDupFinish = 0; + + lock_t *readSig; + lock_t *genRESig; + lock_t *sortSig; + lock_t *markDupSig; + + PipelineArg() { + readSig = NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明 + genRESig = NEW_LOCK(0); // 最大值2, 双buffer + sortSig = NEW_LOCK(0); + markDupSig = NEW_LOCK(0); + for (int i = 0; i < SORTBUFNUM; ++i) sortData[i].pSmd = &smData[i]; + for (int i = 0; i < MARKBUFNUM; ++i) markDupData[i].pSmd = &smData[i + SORTBUFNUM]; + } + + // sort mark shared data + SortMarkData smData[SORTBUFNUM + MARKBUFNUM]; + + // for step-1 read + ReadData readData; + // for step-2 generate readends + GenREData genREData[GENBUFNUM]; + // for step-3 sort each thread frags and pairs + SortData sortData[SORTBUFNUM]; + // for step-4 mark duplicate + MarkDupData markDupData[MARKBUFNUM]; + // for step-5 deal with intersect data + IntersectData intersectData; +}; + +struct REArrIdIdx { + int arrId = 0; // 数组索引 + uint64_t arrIdx = 0; // 数组内部当前索引 + const ReadEnds *pE = nullptr; +}; + +struct REGreaterThan { + bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; } +}; +struct ReadEndsHeap { + // 将冗余索引和他对应的task vector对应起来 + vector> *arr2d; + priority_queue, REGreaterThan> minHeap; + uint64_t popNum = 0; + + int Init(vector> *_arr2d) { + arr2d = _arr2d; + if (arr2d == nullptr) { + return -1; + } + for (int i = 0; i < arr2d->size(); ++i) { + auto &v = (*arr2d)[i]; + if (!v.empty()) { + minHeap.push({i, 1, &v[0]}); + } + } + return 0; + } + + const ReadEnds *Pop() { + const ReadEnds *ret = nullptr; + if (!minHeap.empty()) { + auto minVal = minHeap.top(); + minHeap.pop(); + ++popNum; + ret = minVal.pE; + auto &v = (*arr2d)[minVal.arrId]; + if (v.size() > minVal.arrIdx) { + minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]}); + } + } + return ret; + } + + uint64_t Size() { + uint64_t len = 0; + if (arr2d != nullptr) { + for (auto &v : *arr2d) { + len += v.size(); + } + } + return len - popNum; + } +}; + +// 并行运行mark duplicate +void pipelineMarkDups(); \ No newline at end of file