diff --git a/run.sh b/run.sh index 8f3b6bc..72839c2 100755 --- a/run.sh +++ b/run.sh @@ -6,8 +6,8 @@ nthread=32 #nthread=64 #nthread=128 -#input=/home/zzh/data/bam/zy_normal.bam -input=/home/zzh/data/bam/zy_tumor.bam +input=/home/zzh/data/bam/zy_normal.bam +#input=/home/zzh/data/bam/zy_tumor.bam #input=/home/zzh/data/wgs_na12878.bam #input=~/data/bam/100w.bam #input=~/data/bam/t100w.sam diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 95f4e94..35e18be 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -44,7 +44,7 @@ using std::string; #define BAM_BLOCK_SIZE 32L * 1024 * 1024 #define NO_SUCH_INDEX INT64_MAX -Timer tm_arr[30]; // 用来测试性能 +Timer tm_arr[50]; // 用来测试性能 /* 全局本地变量 */ vector g_vRnParser; // 每个线程一个read name parser samFile *g_inBamFp; // 输入的bam文件 diff --git a/src/sam/markdups/pipeline_md.cpp b/src/sam/markdups/pipeline_md.cpp index 9ff11a3..36e6588 100644 --- a/src/sam/markdups/pipeline_md.cpp +++ b/src/sam/markdups/pipeline_md.cpp @@ -70,62 +70,24 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr, MDSet *notSingletonIdx = nullptr) { if (vpRe.size() < 2) { - if (vpRe.size() == 1) { - // 这个统计可能会有误差,因为当前位点可能还有没匹配上的read,导致当前位点的read(paired)数量为1 - // 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要 - // gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile); -// singletonIdx->insert(vpRe[0]->read1IndexInFile); - } return; } - for (auto pe : vpRe) { - // gMetrics.singletonReads.erase(pe->read1IndexInFile); - if (pe->read1IndexInFile == 1208593132 || pe->read2IndexInFile == 1208593132) { - cout << 1208593132 << "\t" << (notDupIdx == nullptr) << endl; - } - } - - if (notSingletonIdx != nullptr) { - for (auto pe : vpRe) { -// notSingletonIdx->insert(pe->read1IndexInFile); - } - } int maxScore = 0; const ReadEnds *pBest = nullptr; - // bool print = false; - // int maxOperateTime = 0; /** All read ends should have orientation FF, FR, RF, or RR **/ for (auto pe : vpRe) { // 找分数最高的readend - // maxOperateTime = max(maxOperateTime, pe->oprateTime); - // (const_cast(pe))->oprateTime ++; if (pe->score > maxScore || pBest == nullptr) { maxScore = pe->score; pBest = pe; } - /* - if (pe->read1IndexInFile == 255830545) { - print = true; - } - */ } - /* - if (print) { - cout << "mark pair find: " << endl; - for (auto pe : vpRe) { - cout << pe->read1IndexInFile << " " << pe->read2IndexInFile << " " << pe->score << endl; - } - cout << "mark pair end: " << endl; - } - */ - // cerr << zzhtestnum << " best: " << vpRe.size() << " " << pBest->read1IndexInFile << "-" << - // pBest->read2IndexInFile << endl; if (maxOperateTime == 0) ++zzhtestnum; + if (notDupIdx != nullptr) { notDupIdx->insert(pBest->read1IndexInFile); notDupIdx->insert(pBest->read2IndexInFile); } - // if (false) { - // if (true) { + if (!g_mdArg.READ_NAME_REGEX.empty()) { // 检查光学冗余 // trackOpticalDuplicates vector prevOpticalRe; @@ -149,21 +111,16 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup } for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best - // gMetrics.dupReads.insert(pe->read1IndexInFile); - // gMetrics.dupReads.insert(pe->read2IndexInFile); dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, - // 注意这里代表是read1的索引 + // 注意这里代表是read1的索引 // 检查是否optical dup if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) { opticalDupIdx->insert(pe->read1IndexInFile); if (pe->read2IndexInFile != pe->read1IndexInFile) opticalDupIdx->insert(pe->read2IndexInFile); } - } else { - // gMetrics.dupReads.erase(pe->read1IndexInFile); // for test - // gMetrics.dupReads.erase(pe->read2IndexInFile); } } // 在输出的bam文件中添加tag, best作为dupset的代表 @@ -189,7 +146,6 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (!pe->IsPaired()) { dupIdx->insert(pe->read1IndexInFile); - // gMetrics.dupReads.insert(pe->read1IndexInFile); } } } else { @@ -207,9 +163,6 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, for (auto pe : vpRe) { if (pe != pBest) { dupIdx->insert(pe->read1IndexInFile); - // gMetrics.dupReads.insert(pe->read1IndexInFile); - } else { - // gMetrics.dupReads.erase(pe->read1IndexInFile); } } } @@ -959,7 +912,7 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) { static void mtGenerateReadEnds(void *data, long idx, int tid) { auto &p = *(PipelineArg *)data; auto &rnParser = g_vRnParser[idx]; - int nThread = p.genREThreadNum; + int nThread = p.numThread; auto &bams = p.readData.bams; int64_t bamStartIdx = p.readData.bamStartIdx; int64_t taskSeq = p.readData.taskSeq; @@ -998,7 +951,7 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { } } } - // sortReadEndsArr(frags); + //sortReadEndsArr(frags); sort(frags.begin(), frags.end()); sort(pairs.begin(), pairs.end()); } @@ -1013,7 +966,7 @@ static void doGenRE(PipelineArg &pipeArg) { // cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl; // generate read ends - const int numThread = pipeArg.genREThreadNum; + const int numThread = pipeArg.numThread; // / 4; kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 @@ -1132,8 +1085,6 @@ static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDu static void doIntersect(PipelineArg &pipeArg) { // return; IntersectData &g = pipeArg.intersectData; - if (pipeArg.intersectOrder == 0) - return; // 要等第二部分数据才能进行计算 MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM]; SortMarkData &lpSM = *(SortMarkData*)lp.dataPtr; @@ -1429,7 +1380,7 @@ static void *pipeGenRE(void *data) { PipelineArg &pipeArg = *(PipelineArg *)data; auto &genREData = pipeArg.genREData; // init generate read ends data by num_thread - int genREThread = pipeArg.genREThreadNum; + int genREThread = pipeArg.numThread; // / 4; for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { genREData[i].Init(genREThread); @@ -1441,7 +1392,7 @@ static void *pipeGenRE(void *data) { WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 0); // 等待有数据 POSSESS(pipeArg.genRESig); tm_arr[11].acc_end(); - // cout << "gre: " << PEEK_LOCK(pipeArg.genRESig) << endl; + tm_arr[21].acc_start(); WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑 RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放 @@ -1458,6 +1409,7 @@ static void *pipeGenRE(void *data) { // cout << "genRE order: " << pipeArg.genREOrder << "\t" << pipeArg.readData.bamStartIdx << endl; tm_arr[1].acc_start(); doGenRE(pipeArg); + // usleep(200000); tm_arr[1].acc_end(); POSSESS(pipeArg.genRESig); @@ -1684,12 +1636,10 @@ static void mergeAllTask(PipelineArg &pipeArg) { // gSingletonNum += lp.pairSingletonIdx.size(); } -static void startPipeline() { +static void parallelPipeline() { Timer::log_time("pipeline start"); PipelineArg pipeArg; - pipeArg.genREThreadNum = g_gArg.num_threads; - // / 2; - // g_gArg.num_threads / 2; + pipeArg.numThread = g_gArg.num_threads; pthread_t tidArr[5]; pthread_create(&tidArr[0], 0, pipeRead, &pipeArg); @@ -1704,10 +1654,8 @@ static void startPipeline() { int64_t dupNum = 0; int64_t opticalDupNum = 0; - int64_t singletonNum = 0; for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size(); for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size(); - for (auto &arr : pipeArg.intersectData.singletonIdxArr) singletonNum += arr.size(); map dup; #if 0 @@ -1752,172 +1700,83 @@ static void startPipeline() { cout << "dup num : " << dupNum << "\t" << dup.size() << endl; cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; - cout << "singleton num : " << singletonNum << endl; Timer::log_time("pipeline end "); } /* 并行流水线方式处理数据,标记冗余 */ void pipelineMarkDups() { - startPipeline(); - return; + if (g_gArg.num_threads > 1) return parallelPipeline(); - tm_arr[5].acc_start(); Timer::log_time("pipeline start"); - // 读取缓存初始化 + PipelineArg pipeArg; + pipeArg.numThread = g_gArg.num_threads; BamBufType inBamBuf(g_gArg.use_asyncio); inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); - - - int64_t processedBamNum = 0; - - MarkDupDataArg smdArg1, smdArg2; - MarkDupDataArg *lastArgP = &smdArg1; - MarkDupDataArg *curArgP = &smdArg2; - ParallelDataArg pdArg; - pdArg.Init(g_gArg.num_threads); - - bool isFirstRound = true; - int roundNum = 0; int64_t readNumSum = 0; - while (inBamBuf.ReadStat() >= 0) { - Timer t_round; - // 读取bam文件中的read - tm_arr[4].acc_start(); - size_t readNum = inBamBuf.ReadBam(); - if (readNum < 1) - break; - readNumSum += readNum; - tm_arr[4].acc_end(); - cout << "read num: " << readNum << "\t" << readNumSum << '\t' << roundNum << endl; - // lastArgP = curArgP; - tm_arr[6].acc_start(); - curArgP->taskSeq = roundNum; - curArgP->bamStartIdx = processedBamNum; - curArgP->bams = inBamBuf.GetBamArr(); - tm_arr[6].acc_end(); - - tm_arr[0].acc_start(); - Timer t1; -// generateReadEnds(pdArg, curArgP); -// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl; - tm_arr[0].acc_end(); - - tm_arr[1].acc_start(); - t1.reinit(); -// markdups(curArgP); -// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl; - tm_arr[1].acc_end(); - - if (!isFirstRound) { - tm_arr[2].acc_start(); - t1.reinit(); -// handleIntersectData(lastArgP, curArgP, &gData); -// cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl; - // addTaskIdxToSet(lastArgP, &gData); - tm_arr[2].acc_end(); - } else { - isFirstRound = false; - } - inBamBuf.ClearAll(); // 清理上一轮读入的数据 - processedBamNum += readNum; - - // 交换 - auto tmp = lastArgP; - lastArgP = curArgP; - curArgP = tmp; - // cout << "round time: " << t_round.seconds_elapsed() << endl; - roundNum++; - if (roundNum % 100 == 0) { - // cout << "read sum: " << readNumSum << endl; - // cout << "round time: " << t_round.seconds_elapsed() * 100 << " s" << endl; - } + for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { + pipeArg.genREData[i].Init(pipeArg.numThread); } - tm_arr[3].acc_start(); - // 处理剩下的全局数据 -// handleLastTask(lastArgP, &gData); + pipeArg.intersectOrder = 1; // do intersect 从1开始 + while (1) { + size_t readNum = 0; + if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 + if (readNum < 1) { + break; + } + cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl; - // cout << "here 2" << endl; - tm_arr[3].acc_end(); + pipeArg.readData.bams = inBamBuf.GetBamArr(); + pipeArg.readData.bamStartIdx = readNumSum; + pipeArg.readData.taskSeq = pipeArg.readOrder; + // 1. do generate read ends + doGenRE(pipeArg); + pipeArg.genREOrder += 1; + // 2. do sort + doSort(pipeArg); + pipeArg.sortOrder += 1; + // 3. do markduplicate + doMarkDup(pipeArg); + pipeArg.markDupOrder += 1; + // 4. do intersect data + if (pipeArg.markDupOrder > 1) { + doIntersect(pipeArg); + pipeArg.intersectOrder += 1; + } + + readNumSum += readNum; + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + pipeArg.readOrder += 1; // for next + } + mergeAllTask(pipeArg); - tm_arr[5].acc_end(); - // 统计所有冗余index数量 int64_t dupNum = 0; int64_t opticalDupNum = 0; - int64_t singletonNum = 0; - - int64_t dupNumDic = 0; - int64_t singletonNumDic = 0; + for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size(); + for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size(); map dup; - +#if 0 int taskSeq = 0; - // for (auto &arr : gData.dupIdxArr) { - // for (auto idx : arr) { - // if (dup.find(idx.idx) != dup.end()) { - // // cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' - // // << idx << endl; - // } - // dup[idx.idx] = taskSeq; - // } - // // cout << taskSeq << "\t" << arr.size() << endl; - // taskSeq++; - // } - // dupNumDic = dup.size(); - // dup.clear(); - // int notInMetrics = 0; - // cout << "gmetrics single count: " << gMetrics.singletonReads.size() << endl; - // for (auto &arr : gData.singletonIdxArr) { - // for (auto idx : arr) { - // dup[idx] = 1; - // if (gMetrics.singletonReads.find(idx) == gMetrics.singletonReads.end()) { - //// cout << "not in gmetrics: " << idx << endl; - // ++notInMetrics; - // } else { - // gMetrics.singletonReads.erase(idx); - // } - // } - // } - // singletonNumDic = dup.size(); - // cout << "not in arr: " << endl; - // for (auto idx : gMetrics.singletonReads) { - //// cout << idx << endl; - // } - // cout << "count: " << notInMetrics << "\t" << gMetrics.singletonReads.size() << endl; + for (auto &arr : pipeArg.intersectData.dupIdxArr) { + for (auto idx : arr) { + if (dup.find(idx.idx) != dup.end()) { + cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl; + } + dup[idx.idx] = taskSeq; + } + taskSeq++; + } +#endif - // #include - // ofstream out("tumor_dup.txt"); - // for (auto idx : dup) - // { - // out << idx << endl; - // } - // out.close(); - - for (auto &arr : gData.dupIdxArr) dupNum += arr.size(); - for (auto &arr : gData.opticalDupIdxArr) opticalDupNum += arr.size(); - for (auto &arr : gData.singletonIdxArr) singletonNum += arr.size(); - - cout << "dup num : " << dupNum << '\t' << dupNumDic << "\t" << zzhtestnum << endl; - cout << "singleton : " << singletonNum << "\t" << singletonNumDic << "\t" << gSingletonNum << endl; - cout << "singleton size: " << gMetrics.singletonReads.size() << endl; - cout << "dup read size: " << gMetrics.dupReads.size() << endl; - - cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; - cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; - cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl; - cout << "handle last : " << tm_arr[3].acc_seconds_elapsed() << endl; - cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl; - cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl; - cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl; - cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; - cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; - cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; - cout << "t frags : " << tm_arr[11].acc_seconds_elapsed() << endl; - cout << "t pairs : " << tm_arr[12].acc_seconds_elapsed() << endl; - cout << "t unpair : " << tm_arr[13].acc_seconds_elapsed() << endl; - cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; - cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; + cout << "total reads: " << readNumSum << endl; + cout << "Final read order: " << pipeArg.readOrder << endl; + cout << "Final gen order: " << pipeArg.genREOrder << endl; + cout << "Final sort order: " << pipeArg.sortOrder << endl; + cout << "Final mark order: " << pipeArg.markDupOrder << endl; + cout << "Final inter order: " << pipeArg.intersectOrder << endl; + cout << "dup num : " << dupNum << "\t" << dup.size() << endl; + cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl; Timer::log_time("pipeline end "); - cout << "read num sum: " << readNumSum << endl; } \ No newline at end of file diff --git a/src/sam/markdups/pipeline_md.h b/src/sam/markdups/pipeline_md.h index b2c936c..47859d6 100644 --- a/src/sam/markdups/pipeline_md.h +++ b/src/sam/markdups/pipeline_md.h @@ -78,7 +78,7 @@ struct PipelineArg { uint64_t sortOrder = 0; uint64_t markDupOrder = 0; uint64_t intersectOrder = 0; - int genREThreadNum = 0; + int numThread = 0; volatile int readFinish = 0; volatile int genREFinish = 0; diff --git a/src/sam/markdups/shared_args.h b/src/sam/markdups/shared_args.h index c2479f1..9021e00 100644 --- a/src/sam/markdups/shared_args.h +++ b/src/sam/markdups/shared_args.h @@ -11,7 +11,7 @@ #include using std::set; -extern Timer tm_arr[30]; // 用来测试性能 +extern Timer tm_arr[50]; // 用来测试性能 /* 全局本地变量 */ extern vector g_vRnParser; // 每个线程一个read name parser extern samFile *g_inBamFp; // 输入的bam文件